Dunwu Blog

大道至简,知易行难

网络技术之 VPN

img

简介

虚拟专用网络(VPN)的功能是:在公用网络上建立专用网络,进行加密通讯。在企业网络中有广泛应用。VPN 网关通过对数据包的加密和数据包目标地址的转换实现远程访问。VPN 可通过服务器、硬件、软件等多种方式实现。

VPN 属于远程访问技术,简单地说就是利用公用网络架设专用网络。例如某公司员工出差到外地,他想访问企业内网的服务器资源,这种访问就属于远程访问。
在传统的企业网络配置中,要进行远程访问,传统的方法是租用 DDN(数字数据网)专线或帧中继,这样的通讯方案必然导致高昂的网络通讯和维护费用。对于移动用户(移动办公人员)与远端个人用户而言,一般会通过拨号线路(Internet)进入企业的局域网,但这样必然带来安全上的隐患。
让外地员工访问到内网资源,利用 VPN 的解决方法就是在内网中架设一台 VPN 服务器。外地员工在当地连上互联网后,通过互联网连接 VPN 服务器,然后通过 VPN 服务器进入企业内网。为了保证数据安全,VPN 服务器和客户机之间的通讯数据都进行了加密处理。有了数据加密,就可以认为数据是在一条专用的数据链路上进行安全传输,就如同专门架设了一个专用网络一样,但实际上 VPN 使用的是互联网上的公用链路,因此 VPN 称为虚拟专用网络,其实质上就是利用加密技术在公网上封装出一个数据通讯隧道。有了 VPN 技术,用户无论是在外地出差还是在家中办公,只要能上互联网就能利用 VPN 访问内网资源,这就是 VPN 在企业中应用得如此广泛的原因。

VPN 的作用

隐藏 IP 和位置

img

VPN 可以隐藏使用者的 IP 地址和位置。

使用 VPN 的最常见原因之一是屏蔽您的真实 IP 地址。

您的 IP 地址是由 ISP 分配的唯一数字地址。 您在线上所做的所有事情都链接到您的 IP 地址,因此可以用来将您与在线活动进行匹配。 大多数网站记录其访问者的 IP 地址。

广告商还可以使用您的 IP 地址,根据您的身份和浏览历史为您提供有针对性的广告。

连接到 VPN 服务器时,您将使用该 VPN 服务器的 IP 地址。 您访问的任何网站都会看到 VPN 服务器的 IP 地址,而不是您自己的。

您将能够绕过 IP 地址阻止并浏览网站,而不会将您的活动作为一个个人追溯到您。

通信加密

img

使用 VPN 时,可以对信息进行加密,使得密码,电子邮件,照片,银行数据和其他敏感信息不会被拦截。

如果在公共场所使用公共 WiFi 连接网络时,敏感数据有被盗的风险。黑客可以利用开放和未加密的网络来窃取重要数据,例如您的密码,电子邮件,照片,银行数据和其他敏感信息。

VPN 可以加密信息,使黑客更难以拦截和窃取数据。

翻墙

img

轻松解除对 Facebook 和 Twitter,Skype,YouTube 和 Gmail 等网站和服务的阻止。 即使您被告知您所在的国家/地区不可用它,或者您所在的学校或办公室网络限制访问,也可以获取所需的东西。

某些服务(例如 Netflix 或 BBC iPlayer)会根据您访问的国家/地区限制访问内容。使用 VPN 可以绕过这些地理限制并解锁“隐藏”内容的唯一可靠方法。

避免被监听

img

使用 VPN 可以向政府、ISP、黑客隐藏通信信息。

您的 Internet 服务提供商(ISP)可以看到您访问的所有网站,并且几乎可以肯定会记录该信息。

在某些国家/地区,ISP 需要长时间收集和存储用户数据,并且政府能够访问,存储和搜索该信息。

在美国,英国,澳大利亚和欧洲大部分地区就是这种情况,仅举几例。

由于 VPN 会加密从设备到 VPN 服务器的互联网流量,因此您的 ISP 或任何其他第三方将无法监视您的在线活动。

要了解有关监视技术和全球大规模监视问题的更多信息,请访问 EFF 和 Privacy International。 您还可以在此处找到全球监视披露的更新列表。

工作原理

VPN 会在您的设备和私人服务器之间建立私人和加密的互联网连接。 这意味着您的数据无法被 ISP 或任何其他第三方读取或理解。 然后,私有服务器将您的流量发送到您要访问的网站或服务上。

img

VPN 的基本处理过程如下:

  1. 要保护主机发送明文信息到其他 VPN 设备。
  2. VPN 设备根据网络管理员设置的规则,确定是对数据进行加密还是直接传输。
  3. 对需要加密的数据,VPN 设备将其整个数据包(包括要传输的数据、源 IP 地址和目的 lP 地址)进行加密并附上数据签名,加上新的数据报头(包括目的地 VPN 设备需要的安全信息和一些初始化参数)重新封装。
  4. 将封装后的数据包通过隧道在公共网络上传输。
  5. 数据包到达目的 VPN 设备后,将其解封,核对数字签名无误后,对数据包解密。

VPN 协议

img

  • OpenVPN

  • IKEv2 / IPSec

  • SSTP

  • PPTP

  • Wireguard

VPN 服务

你可以选择付费 VPN 或自行搭建 VPN。

VPN 服务商:

开源 VPN:

参考资料

深入剖析共识性算法 Paxos

Paxos 是一种基于消息传递且具有容错性的共识性(consensus)算法

Paxos 算法解决了分布式一致性问题:在一个节点数为 2N+1 的分布式集群中,只要半数以上的节点(N + 1)还正常工作,整个系统仍可以正常工作。

Paxos 背景

Paxos 是 Leslie Lamport 于 1990 年提出的一种基于消息传递且具有高度容错特性的共识(consensus)算法

为描述 Paxos 算法,Lamport 虚拟了一个叫做 Paxos 的希腊城邦,这个岛按照议会民主制的政治模式制订法律,但是没有人愿意将自己的全部时间和精力放在这种事情上。所以无论是议员,议长或者传递纸条的服务员都不能承诺别人需要时一定会出现,也无法承诺批准决议或者传递消息的时间。

Paxos 算法解决的问题正是分布式共识性问题,即一个分布式系统中的各个节点如何就某个值(决议)达成一致。

Paxos 算法运行在允许宕机故障的异步系统中,不要求可靠的消息传递,可容忍消息丢失、延迟、乱序以及重复。它利用大多数 (Majority) 机制保证了一定的容错能力,即 N 个节点的系统最多允许 N / 2 - 1 个节点同时出现故障。

Paxos 算法包含 2 个部分:

  • Basic Paxos 算法:描述的多节点之间如何就某个值达成共识。
  • Multi Paxos 思想:描述的是执行多个 Basic Paxos 实例,就一系列值达成共识。

Basic Paxos 算法

角色

Paxos 将分布式系统中的节点分 Proposer、Acceptor、Learner 三种角色。

  • 提议者(Proposer):发出提案(Proposal),用于投票表决。Proposal 信息包括提案编号 (Proposal ID) 和提议的值 (Value)。在绝大多数场景中,集群中收到客户端请求的节点,才是提议者。这样做的好处是,对业务代码没有入侵性,也就是说,我们不需要在业务代码中实现算法逻辑。
  • 接受者(Acceptor):对每个 Proposal 进行投票,若 Proposal 获得多数 Acceptor 的接受,则称该 Proposal 被批准。一般来说,集群中的所有节点都在扮演接受者的角色,参与共识协商,并接受和存储数据。
  • 学习者(Learner):不参与接受,从 Proposers/Acceptors 学习、记录最新达成共识的提案(Value)。一般来说,学习者是数据备份节点,比如主从架构中的从节点,被动地接受数据,容灾备份。

在多副本状态机中,每个副本都同时具有 Proposer、Acceptor、Learner 三种角色。

这三种角色,在本质上代表的是三种功能:

  • 提议者代表的是接入和协调功能,收到客户端请求后,发起二阶段提交,进行共识协商;
  • 接受者代表投票协商和存储数据,对提议的值进行投票,并接受达成共识的值,存储保存;
  • 学习者代表存储数据,不参与共识协商,只接受达成共识的值,存储保存。

算法

Paxos 算法有 3 个阶段,其中,前 2 个阶段负责协商并达成共识:

  1. 准备(Prepare)阶段:Proposer 向 Acceptors 发出 Prepare 请求,Acceptors 针对收到的 Prepare 请求进行 Promise 承诺。
  2. 接受(Accept)阶段:Proposer 收到多数 Acceptors 承诺的 Promise 后,向 Acceptors 发出 Propose 请求,Acceptors 针对收到的 Propose 请求进行 Accept 处理。
  3. 学习(Learn)阶段:Proposer 在收到多数 Acceptors 的 Accept 之后,标志着本次 Accept 成功,决议形成,将形成的决议发送给所有 Learners。

看到这里,了解过分布式事务的读者,想必会觉得眼熟:这不就是两阶段提交嘛!没错,这里采用的正式两阶段提交的思想。两阶段提交是达成共识的常用方式。

Paxos 算法流程中的每条消息描述如下:

  • Prepare: Proposer 生成全局唯一且递增的 Proposal ID (可使用时间戳加 Server ID),向所有 Acceptors 发送 Prepare 请求,这里无需携带提案内容,只携带 Proposal ID 即可。
  • Promise: Acceptors 收到 Prepare 请求后,做出“两个承诺,一个应答”。
    • 两个承诺:
      • 不再接受 Proposal ID 小于等于当前请求的 Prepare 请求。
      • 不再接受 Proposal ID 小于当前请求的 Propose 请求。
    • 一个应答:
      • 不违背以前作出的承诺下,回复已经 Accept 过的提案中 Proposal ID 最大的那个提案的 Value 和 Proposal ID,没有则返回空值。
  • Propose: Proposer 收到多数 Acceptors 的 Promise 应答后,从应答中选择 Proposal ID 最大的提案的 Value,作为本次要发起的提案。如果所有应答的提案 Value 均为空值,则可以自己随意决定提案 Value。然后携带当前 Proposal ID,向所有 Acceptors 发送 Propose 请求。
  • Accept: Acceptor 收到 Propose 请求后,在不违背自己之前作出的承诺下,接受并持久化当前 Proposal ID 和提案 Value。
  • Learn: Proposer 收到多数 Acceptors 的 Accept 后,决议形成,将形成的决议发送给所有 Learners。

Prepare 阶段

在准备请求中是不需要指定提议的值的,只需要携带提案编号就可以了。

下图的示例中,首先客户端 1、2 作为提议者,分别向所有接受者发送包含提案编号的准备请求:

接着,当节点 A、B 收到提案编号为 1 的准备请求,节点 C 收到提案编号为 5 的准备请求后,将进行这样的处理:

  • 由于之前没有通过任何提案,所以节点 A、B 将返回一个 “尚无提案” 的响应。也就是说节点 A 和 B 在告诉提议者,我之前没有通过任何提案呢,并承诺以后不再响应提案编号小于等于 1 的准备请求,不会通过编号小于 1 的提案。
  • 节点 C 也是如此,它将返回一个 “尚无提案”的响应,并承诺以后不再响应提案编号小于等于 5 的准备请求,不会通过编号小于 5 的提案。

另外,当节点 A、B 收到提案编号为 5 的准备请求,和节点 C 收到提案编号为 1 的准备请求的时候,将进行这样的处理过程:

  • 当节点 A、B 收到提案编号为 5 的准备请求的时候,因为提案编号 5 大于它们之前响应的准备请求的提案编号 1,而且两个节点都没有通过任何提案,所以它将返回一个 “尚无提案”的响应,并承诺以后不再响应提案编号小于等于 5 的准备请求,不会通过编号小于 5 的提案。

  • 当节点 C 收到提案编号为 1 的准备请求的时候,由于提案编号 1 小于它之前响应的准备请求的提案编号 5,所以丢弃该准备请求,不做响应。

Accept 阶段

首先客户端 1、2 在收到大多数节点的准备响应之后,会分别发送接受请求:

  • 当客户端 1 收到大多数的接受者(节点 A、B)的准备响应后,根据响应中提案编号最大的提案的值,设置接受请求中的值。因为该值在来自节点 A、B 的准备响应中都为空(也就是图 5 中的“尚无提案”),所以就把自己的提议值 3 作为提案的值,发送接受请求[1, 3]。

  • 当客户端 2 收到大多数的接受者的准备响应后(节点 A、B 和节点 C),根据响应中提案编号最大的提案的值,来设置接受请求中的值。因为该值在来自节点 A、B、C 的准备响应中都为空(也就是图 5 和图 6 中的“尚无提案”),所以就把自己的提议值 7 作为提案的值,发送接受请求[5, 7]。

当三个节点收到 2 个客户端的接受请求时,会进行这样的处理:

  • 当节点 A、B、C 收到接受请求[1, 3]的时候,由于提案的提案编号 1 小于三个节点承诺能通过的提案的最小提案编号 5,所以提案[1, 3]将被拒绝。
  • 当节点 A、B、C 收到接受请求[5, 7]的时候,由于提案的提案编号 5 不小于三个节点承诺能通过的提案的最小提案编号 5,所以就通过提案[5, 7],也就是接受了值 7,三个节点就 X 值为 7 达成了共识。

小结

Basic Paxos 是通过二阶段提交的方式来达成共识的。

除了共识,Basic Paxos 还实现了容错,在少于一半的节点出现故障时,集群也能工作。它不像分布式事务算法那样,必须要所有节点都同意后才提交操作,因为“所有节点都同意”这个原则,在出现节点故障的时候会导致整个集群不可用。也就是说,“大多数节点都同意”的原则,赋予了 Basic Paxos 容错的能力,让它能够容忍少于一半的节点的故障。

本质上而言,提案编号的大小代表着优先级,你可以这么理解,根据提案编号的大小,接受者保证三个承诺,具体来说:

  • 如果准备请求的提案编号,小于等于接受者已经响应的准备请求的提案编号,那么接受者将承诺不响应这个准备请求;
  • 如果接受请求中的提案的提案编号,小于接受者已经响应的准备请求的提案编号,那么接受者将承诺不通过这个提案;
  • 如果接受者之前有通过提案,那么接受者将承诺,会在准备请求的响应中,包含已经通过的最大编号的提案信息。

Multi Paxos 思想

兰伯特提到的 Multi-Paxos 是一种思想,不是算法。而 Multi-Paxos 算法是一个统称,它是指基于 Multi-Paxos 思想,通过多个 Basic Paxos 实例实现一系列值的共识的算法(比如 Chubby 的 Multi-Paxos 实现、Raft 算法等)。

Basic Paxos 的问题

Basic Paxos 有以下问题,导致它不能应用于实际:

  • Basic Paxos 算法只能对一个值形成决议
  • Basic Paxos 算法会消耗大量网络带宽。Basic Paxos 中,决议的形成至少需要两次网络通信,在高并发情况下可能需要更多的网络通信,极端情况下甚至可能形成活锁。如果想连续确定多个值,Basic Paxos 搞不定了。

Multi Paxos 的改进

Multi Paxos 正是为解决以上问题而提出。Multi Paxos 基于 Basic Paxos 做了两点改进:

  • 针对每一个要确定的值,运行一次 Paxos 算法实例(Instance),形成决议。每一个 Paxos 实例使用唯一的 Instance ID 标识。
  • 在所有 Proposer 中选举一个 Leader,由 Leader 唯一地提交 Proposal 给 Acceptor 进行表决。这样没有 Proposer 竞争,解决了活锁问题。在系统中仅有一个 Leader 进行 Value 提交的情况下,Prepare 阶段就可以跳过,从而将两阶段变为一阶段,提高效率。

Multi Paxos 首先需要选举 Leader,Leader 的确定也是一次决议的形成,所以可执行一次 Basic Paxos 实例来选举出一个 Leader。选出 Leader 之后只能由 Leader 提交 Proposal,在 Leader 宕机之后服务临时不可用,需要重新选举 Leader 继续服务。在系统中仅有一个 Leader 进行 Proposal 提交的情况下,Prepare 阶段可以跳过。

Multi Paxos 通过改变 Prepare 阶段的作用范围至后面 Leader 提交的所有实例,从而使得 Leader 的连续提交只需要执行一次 Prepare 阶段,后续只需要执行 Accept 阶段,将两阶段变为一阶段,提高了效率。为了区分连续提交的多个实例,每个实例使用一个 Instance ID 标识,Instance ID 由 Leader 本地递增生成即可。

Multi Paxos 允许有多个自认为是 Leader 的节点并发提交 Proposal 而不影响其安全性,这样的场景即退化为 Basic Paxos。

Chubby 和 Boxwood 均使用 Multi Paxos。ZooKeeper 使用的 Zab 也是 Multi Paxos 的变形。

参考资料

Java 并发之容器

同步容器

同步容器简介

在 Java 中,同步容器主要包括 2 类:

  • VectorStackHashtable
    • Vector - Vector 实现了 List 接口。Vector 实际上就是一个数组,和 ArrayList 类似。但是 Vector 中的方法都是 synchronized 方法,即进行了同步措施。
    • Stack - Stack 也是一个同步容器,它的方法也用 synchronized 进行了同步,它实际上是继承于 Vector 类。
    • Hashtable- Hashtable 实现了 Map 接口,它和 HashMap 很相似,但是 Hashtable 进行了同步处理,而 HashMap 没有。
  • Collections 类中提供的静态工厂方法创建的类(由 Collections.synchronizedXXX 等方法)

同步容器的问题

同步容器的同步原理就是在其 getsetsize 等主要方法上用 synchronized 修饰。 synchronized 可以保证在同一个时刻,只有一个线程可以执行某个方法或者某个代码块

想详细了解 synchronized 用法和原理可以参考:Java 并发核心机制

性能问题

synchronized 的互斥同步会产生阻塞和唤醒线程的开销。显然,这种方式比没有使用 synchronized 的容器性能要差很多。

注:尤其是在 Java 1.6 没有对 synchronized 进行优化前,阻塞开销很高。

安全问题

同步容器真的绝对安全吗?

其实也未必。在做复合操作(非原子操作)时,仍然需要加锁来保护。常见复合操作如下:

  • 迭代:反复访问元素,直到遍历完全部元素;
  • 跳转:根据指定顺序寻找当前元素的下一个(下 n 个)元素;
  • 条件运算:例如若没有则添加等;

❌ 不安全的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class VectorDemo {

static Vector<Integer> vector = new Vector<>();

public static void main(String[] args) {
while (true) {
vector.clear();

for (int i = 0; i < 10; i++) {
vector.add(i);
}

Thread thread1 = new Thread() {
@Override
public void run() {
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
};

Thread thread2 = new Thread() {
@Override
public void run() {
for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
}
};

thread1.start();
thread2.start();

while (Thread.activeCount() > 10) {
System.out.println("同时存在 10 个以上线程,退出");
return;
}
}
}

}

以上程序执行时可能会出现数组越界错误。

Vector 是线程安全的,那为什么还会报这个错?

这是因为,对于 Vector,虽然能保证每一个时刻只能有一个线程访问它,但是不排除这种可能:

当某个线程在某个时刻执行这句时:

1
2
for(int i=0;i<vector.size();i++)
vector.get(i);

假若此时 vector 的 size 方法返回的是 10,i 的值为 9

然后另外一个线程执行了这句:

1
2
for(int i=0;i<vector.size();i++)
vector.remove(i);

将下标为 9 的元素删除了。

那么通过 get 方法访问下标为 9 的元素肯定就会出问题了。

✔️️️ 安全示例

因此为了保证线程安全,必须在方法调用端做额外的同步措施,如下面所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class VectorDemo2 {

static Vector<Integer> vector = new Vector<Integer>();

public static void main(String[] args) {
while (true) {
for (int i = 0; i < 10; i++) {
vector.add(i);
}

Thread thread1 = new Thread() {
@Override
public void run() {
synchronized (VectorDemo2.class) { //进行额外的同步
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
}
};

Thread thread2 = new Thread() {
@Override
public void run() {
synchronized (VectorDemo2.class) {
for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
}
}
};

thread1.start();
thread2.start();

while (Thread.activeCount() > 10) {
System.out.println("同时存在 10 个以上线程,退出");
return;
}
}
}

}

ConcurrentModificationException 异常

在对 Vector 等容器并发地进行迭代修改时,会报 ConcurrentModificationException 异常,关于这个异常将会在后续文章中讲述。

但是在并发容器中不会出现这个问题。

并发容器简介

同步容器将所有对容器状态的访问都串行化,以保证线程安全性,这种策略会严重降低并发性。

Java 1.5 后提供了多种并发容器,使用并发容器来替代同步容器,可以极大地提高伸缩性并降低风险

J.U.C 包中提供了几个非常有用的并发容器作为线程安全的容器:

并发容器 对应的普通容器 描述
ConcurrentHashMap HashMap Java 1.8 之前采用分段锁机制细化锁粒度,降低阻塞,从而提高并发性;Java 1.8 之后基于 CAS 实现。
ConcurrentSkipListMap SortedMap 基于跳表实现的
CopyOnWriteArrayList ArrayList
CopyOnWriteArraySet Set 基于 CopyOnWriteArrayList 实现。
ConcurrentSkipListSet SortedSet 基于 ConcurrentSkipListMap 实现。
ConcurrentLinkedQueue Queue 线程安全的无界队列。底层采用单链表。支持 FIFO。
ConcurrentLinkedDeque Deque 线程安全的无界双端队列。底层采用双向链表。支持 FIFO 和 FILO。
ArrayBlockingQueue Queue 数组实现的阻塞队列。
LinkedBlockingQueue Queue 链表实现的阻塞队列。
LinkedBlockingDeque Deque 双向链表实现的双端阻塞队列。

J.U.C 包中提供的并发容器命名一般分为三类:

  • Concurrent
    • 这类型的锁竞争相对于 CopyOnWrite 要高一些,但写操作代价要小一些。
    • 此外,Concurrent 往往提供了较低的遍历一致性,即:当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历。代价就是,在获取容器大小 size() ,容器是否为空等方法,不一定完全精确,但这是为了获取并发吞吐量的设计取舍,可以理解。与之相比,如果是使用同步容器,就会出现 fail-fast 问题,即:检测到容器在遍历过程中发生了修改,则抛出 ConcurrentModificationException,不再继续遍历。
  • CopyOnWrite - 一个线程写,多个线程读。读操作时不加锁,写操作时通过在副本上加锁保证并发安全,空间开销较大。
  • Blocking - 内部实现一般是基于锁,提供阻塞队列的能力。

:x: 错误示例,产生 ConcurrentModificationException 异常:

1
2
3
public void removeKeys(Map<String, Object> map, final String... keys) {
map.keySet().removeIf(key -> ArrayUtil.contains(keys, key));
}

:x: 错误示例,产生 ConcurrentModificationException 异常:

1
2
3
4
5
6
public static <K, V> Map<K, V> removeKeys(Map<String, Object> map, final String... keys) {
for (K key : keys) {
map.remove(key);
}
return map;
}

并发场景下的 Map

如果对数据有强一致要求,则需使用 Hashtable;在大部分场景通常都是弱一致性的情况下,使用 ConcurrentHashMap 即可;如果数据量在千万级别,且存在大量增删改操作,则可以考虑使用 ConcurrentSkipListMap

并发场景下的 List

读多写少用 CopyOnWriteArrayList

写多读少用 ConcurrentLinkedQueue ,但由于是无界的,要有容量限制,避免无限膨胀,导致内存溢出。

Map

Map 接口的两个实现是 ConcurrentHashMap 和 ConcurrentSkipListMap,它们从应用的角度来看,主要区别在于ConcurrentHashMap 的 key 是无序的,而 ConcurrentSkipListMap 的 key 是有序的。所以如果你需要保证 key 的顺序,就只能使用 ConcurrentSkipListMap。

使用 ConcurrentHashMap 和 ConcurrentSkipListMap 需要注意的地方是,它们的 key 和 value 都不能为空,否则会抛出NullPointerException这个运行时异常。

ConcurrentHashMap

ConcurrentHashMap 是线程安全的 HashMap ,用于替代 Hashtable

ConcurrentHashMap 的特性

ConcurrentHashMap 实现了 ConcurrentMap 接口,而 ConcurrentMap 接口扩展了 Map 接口。

1
2
3
4
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
// ...
}

ConcurrentHashMap 的实现包含了 HashMap 所有的基本特性,如:数据结构、读写策略等。

ConcurrentHashMap 没有实现对 Map 加锁以提供独占访问。因此无法通过在客户端加锁的方式来创建新的原子操作。但是,一些常见的复合操作,如:“若没有则添加”、“若相等则移除”、“若相等则替换”,都已经实现为原子操作,并且是围绕 ConcurrentMap 的扩展接口而实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface ConcurrentMap<K, V> extends Map<K, V> {

// 仅当 K 没有相应的映射值才插入
V putIfAbsent(K key, V value);

// 仅当 K 被映射到 V 时才移除
boolean remove(Object key, Object value);

// 仅当 K 被映射到 oldValue 时才替换为 newValue
boolean replace(K key, V oldValue, V newValue);

// 仅当 K 被映射到某个值时才替换为 newValue
V replace(K key, V value);
}

不同于 HashtableConcurrentHashMap 提供的迭代器不会抛出 ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。

:bell: 注意:一些需要对整个 Map 进行计算的方法,如 sizeisEmpty ,由于返回的结果在计算时可能已经过期,所以并非实时的精确值。这是一种策略上的权衡,在并发环境下,这类方法由于总在不断变化,所以获取其实时精确值的意义不大。ConcurrentHashMap 弱化这类方法,以换取更重要操作(如:getputcontainesKeyremove 等)的性能。

ConcurrentHashMap 的用法

示例:不会出现 ConcurrentModificationException

ConcurrentHashMap 的基本操作与 HashMap 的用法基本一样。不同于 HashMapHashtableConcurrentHashMap 提供的迭代器不会抛出 ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ConcurrentHashMapDemo {

public static void main(String[] args) throws InterruptedException {

// HashMap 在并发迭代访问时会抛出 ConcurrentModificationException 异常
// Map<Integer, Character> map = new HashMap<>();
Map<Integer, Character> map = new ConcurrentHashMap<>();

Thread wthread = new Thread(() -> {
System.out.println("写操作线程开始执行");
for (int i = 0; i < 26; i++) {
map.put(i, (char) ('a' + i));
}
});
Thread rthread = new Thread(() -> {
System.out.println("读操作线程开始执行");
for (Integer key : map.keySet()) {
System.out.println(key + " - " + map.get(key));
}
});
wthread.start();
rthread.start();
Thread.sleep(1000);
}
}

ConcurrentHashMap 的原理

ConcurrentHashMap 一直在演进,尤其在 Java 1.7 和 Java 1.8,其数据结构和并发机制有很大的差异。

  • Java 1.7
    • 数据结构:数组+单链表
    • 并发机制:采用分段锁机制细化锁粒度,降低阻塞,从而提高并发性。
  • Java 1.8
    • 数据结构:数组+单链表+红黑树
    • 并发机制:取消分段锁,之后基于 CAS + synchronized 实现。
Java 1.7 的实现

分段锁,是将内部进行分段(Segment),里面是 HashEntry 数组,和 HashMap 类似,哈希相同的条目也是以链表形式存放。
HashEntry 内部使用 volatilevalue 字段来保证可见性,也利用了不可变对象的机制,以改进利用 Unsafe 提供的底层能力,比如 volatile access,去直接完成部分操作,以最优化性能,毕竟 Unsafe 中的很多操作都是 JVM intrinsic 优化过的。

img

在进行并发写操作时,ConcurrentHashMap 会获取可重入锁(ReentrantLock),以保证数据一致性。所以,在并发修改期间,相应 Segment 是被锁定的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {

// 将整个hashmap分成几个小的map,每个segment都是一个锁;与hashtable相比,这么设计的目的是对于put, remove等操作,可以减少并发冲突,对
// 不属于同一个片段的节点可以并发操作,大大提高了性能
final Segment<K,V>[] segments;

// 本质上Segment类就是一个小的hashmap,里面table数组存储了各个节点的数据,继承了ReentrantLock, 可以作为互拆锁使用
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile HashEntry<K,V>[] table;
transient int count;
}

// 基本节点,存储Key, Value值
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
}
Java 1.8 的实现
  • 数据结构改进:与 HashMap 一样,将原先 数组+单链表 的数据结构,变更为 数组+单链表+红黑树 的结构。当出现哈希冲突时,数据会存入数组指定桶的单链表,当链表长度达到 8,则将其转换为红黑树结构,这样其查询的时间复杂度可以降低到 $$O(logN)$$,以改进性能。
  • 并发机制改进:
    • 取消 segments 字段,直接采用 transient volatile HashEntry<K,V>[] table 保存数据,采用 table 数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率
    • 使用 CAS + sychronized 操作,在特定场景进行无锁并发操作。使用 Unsafe、LongAdder 之类底层手段,进行极端情况的优化。现代 JDK 中,synchronized 已经被不断优化,可以不再过分担心性能差异,另外,相比于 ReentrantLock,它可以减少内存消耗,这是个非常大的优势。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果table为空,初始化;否则,根据hash值计算得到数组索引i,如果tab[i]为空,直接新建节点Node即可。注:tab[i]实质为链表或者红黑树的首节点。
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果tab[i]不为空并且hash值为MOVED,说明该链表正在进行transfer操作,返回扩容完成后的table。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 针对首个节点进行加锁操作,而不是segment,进一步减少线程冲突
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果在链表中找到值为key的节点e,直接设置e.val = value即可。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 如果没有找到值为key的节点,直接新建Node并加入链表即可。
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果首节点为TreeBin类型,说明为红黑树结构,执行putTreeVal操作。
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果节点数>=8,那么转换链表结构为红黑树结构。
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 计数增加1,有可能触发transfer操作(扩容)。
addCount(1L, binCount);
return null;
}

ConcurrentHashMap 的实战

示例摘自:极客时间教程 - Java 业务开发常见错误 100 例

ConcurrentHashMap 错误示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//线程个数
private static int THREAD_COUNT = 10;
//总元素数量
private static int ITEM_COUNT = 1000;

public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
//初始900个元素
System.out.println("init size:" + concurrentHashMap.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
//使用线程池并发处理逻辑
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
//查询还需要补充多少个元素
int gap = ITEM_COUNT - concurrentHashMap.size();
System.out.println("gap size:" + gap);
//补充元素
concurrentHashMap.putAll(getData(gap));
}));
//等待所有任务完成
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
//最后元素个数会是1000吗?
System.out.println("finish size:" + concurrentHashMap.size());
}

private static ConcurrentHashMap<String, Long> getData(int count) {
return LongStream.rangeClosed(1, count)
.boxed()
.collect(
Collectors.toConcurrentMap(
i -> UUID.randomUUID().toString(),
i -> i,
(o1, o2) -> o1,
ConcurrentHashMap::new));
}

初始大小 900 符合预期,还需要填充 100 个元素。

预期结果为 1000 个元素,实际大于 1000 个元素。

【分析】

ConcurrentHashMap 对外提供的方法或能力的限制:

  • 使用了 ConcurrentHashMap,不代表对它的多个操作之间的状态是一致的,是没有其他线程在操作它的,如果需要确保需要手动加锁。
  • 诸如 size、isEmpty 和 containsValue 等聚合方法,在并发情况下可能会反映 ConcurrentHashMap 的中间状态。因此在并发情况下,这些方法的返回值只能用作参考,而不能用于流程控制。显然,利用 size 方法计算差异值,是一个流程控制。
  • 诸如 putAll 这样的聚合方法也不能确保原子性,在 putAll 的过程中去获取数据可能会获取到部分数据。
ConcurrentHashMap 错误示例修正 1.0 版

通过 synchronized 加锁,当然可以保证数据一致性,但是牺牲了 ConcurrentHashMap 的性能,没哟真正发挥出 ConcurrentHashMap 的特性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

//线程个数
private static int THREAD_COUNT = 10;
//总元素数量
private static int ITEM_COUNT = 1000;

public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
//初始900个元素
System.out.println("init size:" + concurrentHashMap.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
//使用线程池并发处理逻辑
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
//查询还需要补充多少个元素
synchronized (concurrentHashMap) {
int gap = ITEM_COUNT - concurrentHashMap.size();
System.out.println("gap size:" + gap);
//补充元素
concurrentHashMap.putAll(getData(gap));
}
}));
//等待所有任务完成
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
//最后元素个数会是1000吗?
System.out.println("finish size:" + concurrentHashMap.size());
}

private static ConcurrentHashMap<String, Long> getData(int count) {
return LongStream.rangeClosed(1, count)
.boxed()
.collect(
Collectors.toConcurrentMap(
i -> UUID.randomUUID().toString(),
i -> i,
(o1, o2) -> o1,
ConcurrentHashMap::new));
}
ConcurrentHashMap 错误示例修正 2.0 版
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

//循环次数
private static int LOOP_COUNT = 10000000;
//线程个数
private static int THREAD_COUNT = 10;
//总元素数量
private static int ITEM_COUNT = 1000;

public static void main(String[] args) throws InterruptedException {
StopWatch stopWatch = new StopWatch();
stopWatch.start("normaluse");
Map<String, Long> normaluse = normaluse();
stopWatch.stop();
Assert.isTrue(normaluse.size() == ITEM_COUNT, "normaluse size error");
Assert.isTrue(normaluse.values().stream()
.mapToLong(aLong -> aLong).reduce(0, Long::sum) == LOOP_COUNT
, "normaluse count error");
stopWatch.start("gooduse");
Map<String, Long> gooduse = gooduse();
stopWatch.stop();
Assert.isTrue(gooduse.size() == ITEM_COUNT, "gooduse size error");
Assert.isTrue(gooduse.values().stream()
.mapToLong(l -> l)
.reduce(0, Long::sum) == LOOP_COUNT
, "gooduse count error");
System.out.println(stopWatch.prettyPrint());
}

private static Map<String, Long> normaluse() throws InterruptedException {
ConcurrentHashMap<String, Long> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
synchronized (freqs) {
if (freqs.containsKey(key)) {
freqs.put(key, freqs.get(key) + 1);
} else {
freqs.put(key, 1L);
}
}
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
return freqs;
}

private static Map<String, Long> gooduse() throws InterruptedException {
ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
return freqs.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().longValue())
);
}

List

CopyOnWriteArrayList

CopyOnWriteArrayList 是线程安全的 ArrayListCopyOnWrite 字面意思为写的时候会将共享变量新复制一份出来。复制的好处在于读操作是无锁的(也就是无阻塞)。

CopyOnWriteArrayList 仅适用于写操作非常少的场景,而且能够容忍读写的短暂不一致。如果读写比例均衡或者有大量写操作的话,使用 CopyOnWriteArrayList 的性能会非常糟糕。

CopyOnWriteArrayList 原理

CopyOnWriteArrayList 内部维护了一个数组,成员变量 array 就指向这个内部数组,所有的读操作都是基于 array 进行的,如下图所示,迭代器 Iterator 遍历的就是 array 数组。

img

  • lock - 执行写时复制操作,需要使用可重入锁加锁
  • array - 对象数组,用于存放元素
1
2
3
4
5
/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock();

/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;

img

(1)读操作

CopyOnWriteAarrayList 中,读操作不同步,因为它们在内部数组的快照上工作,所以多个迭代器可以同时遍历而不会相互阻塞(图 1,2,4)。

CopyOnWriteArrayList 的读操作是不用加锁的,性能很高。

1
2
3
4
5
6
public E get(int index) {
return get(getArray(), index);
}
private E get(Object[] a, int index) {
return (E) a[index];
}

(2)写操作

所有的写操作都是同步的。他们在备份数组(图 3)的副本上工作。写操作完成后,后备阵列将被替换为复制的阵列,并释放锁定。支持数组变得易变,所以替换数组的调用是原子(图 5)。

写操作后创建的迭代器将能够看到修改的结构(图 6,7)。

写时复制集合返回的迭代器不会抛出 ConcurrentModificationException,因为它们在数组的快照上工作,并且无论后续的修改(2,4)如何,都会像迭代器创建时那样完全返回元素。

添加操作 - 添加的逻辑很简单,先将原容器 copy 一份,然后在新副本上执行写操作,之后再切换引用。当然此过程是要加锁的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean add(E e) {
//ReentrantLock加锁,保证线程安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
//拷贝原容器,长度为原容器长度加一
Object[] newElements = Arrays.copyOf(elements, len + 1);
//在新副本上执行添加操作
newElements[len] = e;
//将原容器引用指向新副本
setArray(newElements);
return true;
} finally {
//解锁
lock.unlock();
}
}

删除操作 - 删除操作同理,将除要删除元素之外的其他元素拷贝到新副本中,然后切换引用,将原容器引用指向新副本。同属写操作,需要加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public E remove(int index) {
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
if (numMoved == 0)
//如果要删除的是列表末端数据,拷贝前len-1个数据到新副本上,再切换引用
setArray(Arrays.copyOf(elements, len - 1));
else {
//否则,将除要删除元素之外的其他元素拷贝到新副本中,并切换引用
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
setArray(newElements);
}
return oldValue;
} finally {
//解锁
lock.unlock();
}
}

CopyOnWriteArrayList 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class CopyOnWriteArrayListDemo {

static class ReadTask implements Runnable {

List<String> list;

ReadTask(List<String> list) {
this.list = list;
}

public void run() {
for (String str : list) {
System.out.println(str);
}
}
}

static class WriteTask implements Runnable {

List<String> list;
int index;

WriteTask(List<String> list, int index) {
this.list = list;
this.index = index;
}

public void run() {
list.remove(index);
list.add(index, "write_" + index);
}
}

public void run() {
final int NUM = 10;
// ArrayList 在并发迭代访问时会抛出 ConcurrentModificationException 异常
// List<String> list = new ArrayList<>();
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < NUM; i++) {
list.add("main_" + i);
}
ExecutorService executorService = Executors.newFixedThreadPool(NUM);
for (int i = 0; i < NUM; i++) {
executorService.execute(new ReadTask(list));
executorService.execute(new WriteTask(list, i));
}
executorService.shutdown();
}

public static void main(String[] args) {
new CopyOnWriteArrayListDemo().run();
}
}

CopyOnWriteArrayList 实战

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Slf4j
public class WrongCopyOnWriteList {

public static void main(String[] args) {
testRead();
testWrite();
}

public static Map testWrite() {
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
StopWatch stopWatch = new StopWatch();
int loopCount = 100000;
stopWatch.start("Write:copyOnWriteArrayList");
IntStream.rangeClosed(1, loopCount)
.parallel()
.forEach(__ -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
stopWatch.start("Write:synchronizedList");
IntStream.rangeClosed(1, loopCount)
.parallel()
.forEach(__ -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
Map result = new HashMap();
result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
result.put("synchronizedList", synchronizedList.size());
return result;
}

private static void addAll(List<Integer> list) {
list.addAll(IntStream.rangeClosed(1, 1000000).boxed().collect(Collectors.toList()));
}

public static Map testRead() {
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
addAll(copyOnWriteArrayList);
addAll(synchronizedList);
StopWatch stopWatch = new StopWatch();
int loopCount = 1000000;
int count = copyOnWriteArrayList.size();
stopWatch.start("Read:copyOnWriteArrayList");
IntStream.rangeClosed(1, loopCount)
.parallel()
.forEach(__ -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
stopWatch.start("Read:synchronizedList");
IntStream.range(0, loopCount)
.parallel()
.forEach(__ -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
Map result = new HashMap();
result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
result.put("synchronizedList", synchronizedList.size());
return result;
}

}

读性能差不多是写性能的一百倍。

Set

Set 接口的两个实现是 CopyOnWriteArraySet 和 ConcurrentSkipListSet,使用场景可以参考前面讲述的 CopyOnWriteArrayList 和 ConcurrentSkipListMap,它们的原理都是一样的。

Queue

Java 并发包里面 Queue 这类并发容器是最复杂的,你可以从以下两个维度来分类。一个维度是阻塞与非阻塞,所谓阻塞指的是:当队列已满时,入队操作阻塞;当队列已空时,出队操作阻塞。另一个维度是单端与双端,单端指的是只能队尾入队,队首出队;而双端指的是队首队尾皆可入队出队。Java 并发包里阻塞队列都用 Blocking 关键字标识,单端队列使用 Queue 标识,双端队列使用 Deque 标识

BlockingQueue

BlockingQueue 顾名思义,是一个阻塞队列。**BlockingQueue 基本都是基于锁实现。在 BlockingQueue 中,当队列已满时,入队操作阻塞;当队列已空时,出队操作阻塞**。

BlockingQueue 接口定义如下:

1
public interface BlockingQueue<E> extends Queue<E> {}

核心 API:

1
2
3
4
// 获取并移除队列头结点,如果必要,其会等待直到队列出现元素
E take() throws InterruptedException;
// 插入元素,如果队列已满,则等待直到队列出现空闲空间
void put(E e) throws InterruptedException;

BlockingQueue 对插入操作、移除操作、获取元素操作提供了四种不同的方法用于不同的场景中使用:

  • 抛出异常;
  • 返回特殊值(nulltrue/false,取决于具体的操作);
  • 阻塞等待此操作,直到这个操作成功;
  • 阻塞等待此操作,直到成功或者超时指定时间。

总结如下:

Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable

BlockingQueue 的各个实现类都遵循了这些规则。

BlockingQueue 不接受 null 值元素。

JDK 提供了以下阻塞队列:

  • ArrayBlockingQueue - 一个由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue - 一个由链表结构组成的有界阻塞队列
  • PriorityBlockingQueue - 一个支持优先级排序的无界阻塞队列
  • SynchronousQueue - 一个不存储元素的阻塞队列
  • DelayQueue - 一个使用优先级队列实现的无界阻塞队列。
  • LinkedTransferQueue - 一个由链表结构组成的无界阻塞队列

BlockingQueue 基本都是基于锁实现。

PriorityBlockingQueue 类

PriorityBlockingQueue 类定义如下:

1
2
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {}

PriorityBlockingQueue 要点

  • PriorityBlockingQueue 可以视为 PriorityQueue 的线程安全版本。
  • PriorityBlockingQueue 实现了 BlockingQueue,也是一个阻塞队列。
  • PriorityBlockingQueue 实现了 Serializable,支持序列化。
  • PriorityBlockingQueue 不接受 null 值元素。
  • PriorityBlockingQueue 的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。

PriorityBlockingQueue 原理

PriorityBlockingQueue 有两个重要成员:

1
2
private transient Object[] queue;
private final ReentrantLock lock;
  • queue 是一个 Object 数组,用于保存 PriorityBlockingQueue 的元素。
  • 而可重入锁 lock 则用于在执行插入、删除操作时,保证这个方法在当前线程释放锁之前,其他线程不能访问。

PriorityBlockingQueue 的容量虽然有初始化大小,但是不限制大小,如果当前容量已满,插入新元素时会自动扩容。

ArrayBlockingQueue 类

ArrayBlockingQueue 是由数组结构组成的有界阻塞队列

ArrayBlockingQueue 要点

ArrayBlockingQueue 类定义如下:

1
2
3
4
5
6
7
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 数组的大小就决定了队列的边界,所以初始化时必须指定容量
public ArrayBlockingQueue(int capacity) { //... }
public ArrayBlockingQueue(int capacity, boolean fair) { //... }
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { //... }
}

说明:

  • ArrayBlockingQueue 实现了 BlockingQueue,也是一个阻塞队列。
  • ArrayBlockingQueue 实现了 Serializable,支持序列化。
  • ArrayBlockingQueue 是基于数组实现的有界阻塞队列。所以初始化时必须指定容量。

ArrayBlockingQueue 原理

ArrayBlockingQueue 的重要成员如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 用于存放元素的数组
final Object[] items;
// 下一次读取操作的位置
int takeIndex;
// 下一次写入操作的位置
int putIndex;
// 队列中的元素数量
int count;

// 以下几个就是控制并发用的同步器
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

ArrayBlockingQueue 内部以 final 的数组保存数据,数组的大小就决定了队列的边界。

ArrayBlockingQueue 实现并发同步的原理就是,读操作和写操作都需要获取到 AQS 独占锁才能进行操作。

  • 如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程。
  • 如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除,然后唤醒写线程队列的第一个等待线程。

对于 ArrayBlockingQueue,我们可以在构造的时候指定以下三个参数:

  • 队列容量,其限制了队列中最多允许的元素个数;
  • 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁;
  • 可以指定用一个集合来初始化,将此集合中的元素在构造方法期间就先添加到队列中。

LinkedBlockingQueue 类

LinkedBlockingQueue 是由链表结构组成的有界阻塞队列。容易被误解为无边界,但其实其行为和内部代码都是基于有界的逻辑实现的,只不过如果我们没有在创建队列时就指定容量,那么其容量限制就自动被设置为 Integer.MAX_VALUE,成为了无界队列。

LinkedBlockingQueue 要点

LinkedBlockingQueue 类定义如下:

1
2
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {}
  • LinkedBlockingQueue 实现了 BlockingQueue,也是一个阻塞队列。
  • LinkedBlockingQueue 实现了 Serializable,支持序列化。
  • LinkedBlockingQueue 是基于单链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。
  • LinkedBlockingQueue 中元素按照插入顺序保存(FIFO)。

LinkedBlockingQueue 原理

LinkedBlockingQueue 中的重要数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 队列容量
private final int capacity;
// 队列中的元素数量
private final AtomicInteger count = new AtomicInteger(0);
// 队头
private transient Node<E> head;
// 队尾
private transient Node<E> last;

// take, poll, peek 等读操作的方法需要获取到这个锁
private final ReentrantLock takeLock = new ReentrantLock();
// 如果读操作的时候队列是空的,那么等待 notEmpty 条件
private final Condition notEmpty = takeLock.newCondition();
// put, offer 等写操作的方法需要获取到这个锁
private final ReentrantLock putLock = new ReentrantLock();
// 如果写操作的时候队列是满的,那么等待 notFull 条件
private final Condition notFull = putLock.newCondition();

参考资料

深入剖析共识性算法 Raft

Raft 简介

Raft 是一种为了管理日志复制的分布式共识性算法。从本质上说,Raft 算法是通过一切以领导者为准的方式,实现一系列值的共识和各节点日志的一致

Raft 出现之前,Paxos 一直是分布式共识性算法的标准。Paxos 难以理解,更难以实现。Raft 的设计目标是简化 Paxos,使得算法既容易理解,也容易实现

Paxos 和 Raft 都是分布式共识性算法,这个过程如同投票选举领袖(Leader),参选者(Candidate)需要说服大多数投票者(Follower)投票给他,一旦选举出领袖,就由领袖发号施令。Paxos 和 Raft 的区别在于选举的具体过程不同。

Raft 可以解决分布式 CAP 理论中的 CP,即 一致性(C:Consistency) 和 _分区容忍性(P:Partition Tolerance)_,并不能解决 可用性(A:Availability) 的问题。

分布式共识性

分布式共识性 (distributed consensus) 是分布式系统中最基本的问题,用来保证一个分布式系统的可靠性以及容错能力。简单来说,**分布式共识性是指多个服务器的保持状态一致**。

在分布式系统中,可能出现各种意外(断电、网络拥塞、CPU/内存耗尽等等),使得服务器宕机或无法访问,最终导致无法和其他服务器保持状态一致。为了应对这种情况,就需要有一种一致性协议来进行容错,使得分布式系统中即使有部分服务器宕机或无法访问,整体依然可以对外提供服务。

以容错方式达成一致,自然不能要求所有服务器都达成一致状态,只要超过半数以上的服务器达成一致就可以了。假设有 N 台服务器, 大于等于 N/2 + 1 台服务器就算是半数以上了 。

复制状态机

复制状态机(Replicated State Machines) 是指一组服务器上的状态机产生相同状态的副本,并且在一些机器宕掉的情况下也可以继续运行。一致性算法管理着来自客户端指令的复制日志。状态机从日志中处理相同顺序的相同指令,所以产生的结果也是相同的。

复制状态机通常都是基于复制日志实现的,如上图。每一个服务器存储一个包含一系列指令的日志,并且按照日志的顺序进行执行。每一个日志都按照相同的顺序包含相同的指令,所以每一个服务器都执行相同的指令序列。因为每个状态机都是确定的,每一次执行操作都产生相同的状态和同样的序列。

保证复制日志相同就是一致性算法的工作了。在一台服务器上,一致性模块接收客户端发送来的指令然后增加到自己的日志中去。它和其他服务器上的一致性模块进行通信来保证每一个服务器上的日志最终都以相同的顺序包含相同的请求,尽管有些服务器会宕机。一旦指令被正确的复制,每一个服务器的状态机按照日志顺序处理他们,然后输出结果被返回给客户端。因此,服务器集群看起来形成一个高可靠的状态机。

实际系统中使用的一致性算法通常含有以下特性:

  • 安全性保证(绝对不会返回一个错误的结果):在非拜占庭错误情况下,包括网络延迟、分区、丢包、冗余和乱序等错误都可以保证正确。
  • 可用性:集群中只要有大多数的机器可运行并且能够相互通信、和客户端通信,就可以保证可用。因此,一个典型的包含 5 个节点的集群可以容忍两个节点的失败。服务器被停止就认为是失败。他们当有稳定的存储的时候可以从状态中恢复回来并重新加入集群。
  • 不依赖时序来保证一致性:物理时钟错误或者极端的消息延迟只有在最坏情况下才会导致可用性问题。
  • 通常情况下,一条指令可以尽可能快的在集群中大多数节点响应一轮远程过程调用时完成。小部分比较慢的节点不会影响系统整体的性能。

RAFT 应用

RAFT 可以做什么?

通过 RAFT 提供的复制状态机,可以解决分布式系统的复制、修复、节点管理等问题。Raft 极大的简化当前分布式系统的设计与实现,让开发者只关注于业务逻辑,将其抽象实现成对应的状态机即可。基于这套框架,可以构建很多分布式应用:

  • 分布式存储系统:比如分布式消息队列、分布式块系统、分布式文件系统、分布式表格系统等。代表有:Redis、Etcd、Consul
  • 高可靠元信息管理:比如各类 Master 模块的 HA

Raft 基础

Raft 将一致性问题分解成了三个子问题:

  • 选举 Leader
  • 日志复制
  • 安全性

在后续章节,会详细讲解这个子问题。现在,先了解一下 Raft 的一些核心概念。

服务器角色

在 Raft 中,任何时刻,每个服务器都处于这三个角色之一 :

  • Leader - 领导者,通常一个系统中是一主(Leader)多从(Follower)。Leader 负责处理所有的客户端请求
  • Follower - 跟随者,不会发送任何请求,只是简单的 响应来自 Leader 或者 Candidate 的请求
  • Candidate - 参选者,选举新 Leader 时的临时角色。

:bulb: 图示说明:

  • Follower 只响应来自其他服务器的请求。在一定时限内,如果 Follower 接收不到消息,就会转变成 Candidate,并发起选举。
  • Candidate 向 Follower 发起投票请求,如果获得集群中半数以上的选票,就会转变为 Leader。
  • 在一个 Term 内,Leader 始终保持不变,直到下线了。Leader 需要周期性向所有 Follower 发送心跳消息,以阻止 Follower 转变为 Candidate。

任期

Raft 把时间分割成任意长度的 任期(Term),任期用连续的整数标记。每一段任期从一次选举开始。Raft 保证了在一个给定的任期内,最多只有一个领导者

  • 如果选举成功,Leader 会管理整个集群直到任期结束。
  • 如果选举失败,那么这个任期就会因为没有 Leader 而结束。

不同服务器节点观察到的任期转换状态可能不一样

  • 服务器节点可能观察到多次的任期转换。
  • 服务器节点也可能观察不到任何一次任期转换。

任期在 Raft 算法中充当逻辑时钟的作用,使得服务器节点可以查明一些过期的信息(比如过期的 Leader)。每个服务器节点都会存储一个当前任期号,这一编号在整个时期内单调的增长。当服务器之间通信的时候会交换当前任期号。

  • 如果一个服务器的当前任期号比其他人小,那么他会更新自己的编号到较大的编号值。
  • 如果一个 Candidate 或者 Leader 发现自己的任期号过期了,那么他会立即恢复成跟随者状态。
  • 如果一个节点接收到一个包含过期的任期号的请求,那么他会直接拒绝这个请求。

RPC

Raft 算法中服务器节点之间的通信使用 **_远程过程调用(RPC)_**。

基本的一致性算法只需要两种 RPC:

  • RequestVote RPC - 请求投票 RPC,由 Candidate 在选举期间发起。
  • AppendEntries RPC - 附加条目 RPC,由 Leader 发起,用来复制日志和提供一种心跳机制。

选举 Leader

选举规则

领导者心跳消息:Raft 使用一种心跳机制来触发 Leader 选举。Leader 需要周期性的向所有 Follower 发送心跳消息,以此维持 Leader 身份。

随机的竞选超时时间:每个 Follower 都设置了一个随机的竞选超时时间,一般为 150ms ~ 300ms,如果在竞选超时时间内没有收到 Leader 的心跳消息,就会认为当前 Term 没有可用的 Leader,并发起选举来选出新的 Leader。开始一次选举过程,Follower 先要增加自己的当前 Term 号,并转换为 Candidate

Candidate 会并行的向集群中的所有服务器节点发送投票请求(RequestVote RPC,它会保持当前状态直到以下三件事情之一发生:

  • 自己成为 Leader
  • 其他的服务器成为 Leader
  • 没有任何服务器成为 Leader

自己成为 Leader

  • 当一个 Candidate 从整个集群半数以上的服务器节点获得了针对同一个 Term 的选票,那么它就赢得了这次选举并成为 Leader。每个服务器最多会对一个 Term 投出一张选票,按照先来先服务(FIFO)的原则。_要求半数以上选票的规则确保了最多只会有一个 Candidate 赢得此次选举_。
  • 一旦 Candidate 赢得选举,就立即成为 Leader。然后它会向其他的服务器发送心跳消息来建立自己的权威并且阻止新的领导人的产生。

其他的服务器成为 Leader

等待投票期间,Candidate 可能会从其他的服务器接收到声明它是 Leader 的 AppendEntries RPC

  • 如果这个 Leader 的 Term 号(包含在此次的 RPC 中)不小于 Candidate 当前的 Term,那么 Candidate 会承认 Leader 合法并回到 Follower 状态。
  • 如果此次 RPC 中的 Term 号比自己小,那么 Candidate 就会拒绝这个消息并继续保持 Candidate 状态。

没有任何服务器成为 Leader

如果有多个 Follower 同时成为 Candidate,那么选票可能会被瓜分以至于没有 Candidate 可以赢得半数以上的投票。当这种情况发生的时候,每一个 Candidate 都会竞选超时,然后通过增加当前 Term 号来开始一轮新的选举。然而,没有其他机制的话,选票可能会被无限的重复瓜分。

Raft 算法使用随机选举超时时间的方法来确保很少会发生选票瓜分的情况,就算发生也能很快的解决。为了阻止选票起初就被瓜分,竞选超时时间是一个随机的时间,在一个固定的区间(例如 150-300 毫秒)随机选择,这样可以把选举都分散开。

  • 以至于在大多数情况下,只有一个服务器会超时,然后它赢得选举,成为 Leader,并在其他服务器超时之前发送心跳包。
  • 同样的机制也被用在选票瓜分的情况下:每一个 Candidate 在开始一次选举的时候会重置一个随机的选举超时时间,然后在超时时间内等待投票的结果;这样减少了在新的选举中另外的选票瓜分的可能性。

理解了上面的选举规则后,我们通过动图来加深认识。

单 Candidate 选举

(1)下图表示一个分布式系统的最初阶段,此时只有 Follower,没有 Leader。Follower A 等待一个随机的选举超时时间之后,没收到 Leader 发来的心跳消息。因此,将 Term 由 0 增加为 1,转换为 Candidate,进入选举状态。

(2)此时,A 向所有其他节点发送投票请求。

(3)其它节点会对投票请求进行回复,如果超过半数以上的节点投票了,那么该 Candidate 就会立即变成 Term 为 1 的 Leader。

(4)Leader 会周期性地发送心跳消息给所有 Follower,Follower 接收到心跳包,会重新开始计时。

多 Candidate 选举

(1)如果有多个 Follower 成为 Candidate,并且所获得票数相同,那么就需要重新开始投票。例如下图中 Candidate B 和 Candidate D 都发起 Term 为 4 的选举,且都获得两票,因此需要重新开始投票。

(2)当重新开始投票时,由于每个节点设置的随机竞选超时时间不同,因此能下一次再次出现多个 Candidate 并获得同样票数的概率很低。

小结

Raft 算法通过:领导者心跳消息、随机选举超时时间、得到大多数选票才通过原则、任期最新者优先、先来先服务等投票原则,保证了一个任期只有一位领导,也极大地减少了选举失败的情况。

日志复制

日志格式

日志由含日志索引(log index)的日志条目(log entry)组成。每个日志条目包含它被创建时的 Term 号(下图中方框中的数字),和一个复制状态机需要执行的指令。如果一个日志条目被复制到半数以上的服务器上,就被认为可以提交(Commit)了。

  • 日志条目中的 Term 号被用来检查是否出现不一致的情况,它实际上是创建这条日志的领导者的任期编号。
  • 日志条目中的日志索引用来表明它在日志中的位置,它是一个单调递增的整数。

Raft 日志同步保证如下两点:

  • 如果不同日志中的两个日志条目有着相同的日志索引和 Term,则它们所存储的命令是相同的
    • 这个特性基于这条原则:Leader 最多在一个 Term 内、在指定的一个日志索引上创建一条日志条目,同时日志条目在日志中的位置也从来不会改变。
  • 如果不同日志中的两个日志条目有着相同的日志索引和 Term,则它们之前的所有条目都是完全一样的
    • 这个特性由 AppendEntries RPC 的一个简单的一致性检查所保证。在发送 AppendEntries RPC 时,Leader 会把新日志条目之前的日志条目的日志索引和 Term 号一起发送。如果 Follower 在它的日志中找不到包含相同日志索引和 Term 号的日志条目,它就会拒绝接收新的日志条目。

日志复制流程

  1. Leader 负责处理所有客户端的请求。
  2. Leader 把请求作为日志条目加入到它的日志中,然后并行的向其他服务器发送 AppendEntries RPC 请求,要求 Follower 复制日志条目。
  3. Follower 复制成功后,返回确认消息。
  4. 当这个日志条目被半数以上的服务器复制后,Leader 提交这个日志条目到它的复制状态机,并向客户端返回执行结果。

注意:如果 Follower 崩溃或者运行缓慢,再或者网络丢包,Leader 会不断的重复尝试发送 AppendEntries RPC 请求 (尽管已经回复了客户端),直到所有的跟随者都最终复制了所有的日志条目。

下面,通过一组动图来加深认识:

(1)来自客户端的修改都会被传入 Leader。注意该修改还未被提交,只是写入日志中。

(2)Leader 会把修改复制到所有 Follower。

(3)Leader 会等待大多数的 Follower 也进行了修改,然后才将修改提交。

(4)此时 Leader 会通知的所有 Follower 让它们也提交修改,此时所有节点的值达成一致。

日志一致性

一般情况下,Leader 和 Followers 的日志保持一致,因此日志条目一致性检查通常不会失败。然而,Leader 崩溃可能会导致日志不一致:旧的 Leader 可能没有完全复制完日志中的所有条目。

Leader 和 Follower 日志不一致的可能

Leader 和 Follower 可能存在多种日志不一致的可能。

:bulb: 图示说明:

上图阐述了 Leader 和 Follower 可能存在多种日志不一致的可能,每一个方框表示一个日志条目,里面的数字表示任期号 。

当一个 Leader 成功当选时,Follower 可能出现以下情况(a-f):

  • 存在未更新日志条目,如(a、b)。
  • 存在未提交日志条目,如(c、d)。
  • 两种情况都存在,如(e、f)。

_例如,场景 f 可能会这样发生,某服务器在 Term2 的时候是 Leader,已附加了一些日志条目到自己的日志中,但在提交之前就崩溃了;很快这个机器就被重启了,在 Term3 重新被选为 Leader,并且又增加了一些日志条目到自己的日志中;在 Term 2 和 Term 3 的日志被提交之前,这个服务器又宕机了,并且在接下来的几个任期里一直处于宕机状态_。

Leader 和 Follower 日志一致的保证

Leader 通过强制 Followers 复制它的日志来处理日志的不一致,Followers 上的不一致的日志会被 Leader 的日志覆盖

  • Leader 为了使 Followers 的日志同自己的一致,Leader 需要找到 Followers 同它的日志一致的地方,然后覆盖 Followers 在该位置之后的条目。
  • Leader 会从后往前试,每次日志条目失败后尝试前一个日志条目,直到成功找到每个 Follower 的日志一致位点,然后向后逐条覆盖 Followers 在该位置之后的条目。

安全性

前面描述了 Raft 算法是如何选举 Leader 和复制日志的。

Raft 还增加了一些限制来完善 Raft 算法,以保证安全性:保证了任意 Leader 对于给定的 Term,都拥有了之前 Term 的所有被提交的日志条目。

选举限制

拥有最新的已提交的日志条目的 Follower 才有资格成为 Leader。

Raft 使用投票的方式来阻止一个 Candidate 赢得选举除非这个 Candidate 包含了所有已经提交的日志条目。 Candidate 为了赢得选举必须联系集群中的大部分节点,这意味着每一个已经提交的日志条目在这些服务器节点中肯定存在于至少一个节点上。如果 Candidate 的日志至少和大多数的服务器节点一样新(这个新的定义会在下面讨论),那么他一定持有了所有已经提交的日志条目。

RequestVote RPC 实现了这样的限制:RequestVote RPC 中包含了 Candidate 的日志信息, Follower 会拒绝掉那些日志没有自己新的投票请求

如何判断哪个日志条目比较新?

Raft 通过比较两份日志中最后一条日志条目的日志索引和 Term 来判断哪个日志比较新。

  • 先判断 Term,哪个数值大即代表哪个日志比较新。
  • 如果 Term 相同,再比较 日志索引,哪个数值大即代表哪个日志比较新。

提交旧任期的日志条目

一个当前 Term 的日志条目被复制到了半数以上的服务器上,Leader 就认为它是可以被提交的。如果这个 Leader 在提交日志条目前就下线了,后续的 Leader 可能会覆盖掉这个日志条目。

💡 图示说明:

上图解释了为什么 Leader 无法对旧 Term 的日志条目进行提交。

  • 阶段 (a) ,S1 是 Leader,且 S1 写入日志条目为 (Term 2,日志索引 2),只有 S2 复制了这个日志条目。
  • 阶段 (b),S1 下线,S5 被选举为 Term3 的 Leader。S5 写入日志条目为 (Term 3,日志索引 2)。
  • 阶段 (c),S5 下线,S1 重新上线,并被选举为 Term4 的 Leader。此时,Term 2 的那条日志条目已经被复制到了集群中的大多数节点上,但是还没有被提交。
  • 阶段 (d),S1 再次下线,S5 重新上线,并被重新选举为 Term3 的 Leader。然后 S5 覆盖了日志索引 2 处的日志。
  • 阶段 (e),如果阶段 (d) 还未发生,即 S1 再次下线之前,S1 把自己主导的日志条目复制到了大多数节点上,那么在后续 Term 里面这些新日志条目就会被提交。这样在同一时刻就同时保证了,之前的所有旧日志条目就会被提交。

Raft 永远不会通过计算副本数目的方式去提交一个之前 Term 内的日志条目。只有 Leader 当前 Term 里的日志条目通过计算副本数目可以被提交;一旦当前 Term 的日志条目以这种方式被提交,那么由于日志匹配特性,之前的日志条目也都会被间接的提交。

当 Leader 复制之前任期里的日志时,Raft 会为所有日志保留原始的 Term,这在提交规则上产生了额外的复杂性。在其他的一致性算法中,如果一个新的领导人要重新复制之前的任期里的日志时,它必须使用当前新的任期号。Raft 使用的方法更加容易辨别出日志,因为它可以随着时间和日志的变化对日志维护着同一个任期编号。另外,和其他的算法相比,Raft 中的新领导人只需要发送更少日志条目(其他算法中必须在他们被提交之前发送更多的冗余日志条目来为他们重新编号)。

日志压缩

在实际的系统中,不能让日志无限膨胀,否则系统重启时需要花很长的时间进行恢复,从而影响可用性。Raft 采用对整个系统进行快照来解决,快照之前的日志都可以丢弃。

每个副本独立的对自己的系统状态生成快照,并且只能对已经提交的日志条目生成快照。

快照包含以下内容:

  • 日志元数据。最后一条已提交的日志条目的日志索引和 Term。这两个值在快照之后的第一条日志条目的 AppendEntries RPC 的完整性检查的时候会被用上。
  • 系统当前状态。

当 Leader 要发送某个日志条目,落后太多的 Follower 的日志条目会被丢弃,Leader 会将快照发给 Follower。或者新上线一台机器时,也会发送快照给它。

生成快照的频率要适中,频率过高会消耗大量 I/O 带宽;频率过低,一旦需要执行恢复操作,会丢失大量数据,影响可用性。推荐当日志达到某个固定的大小时生成快照。

生成一次快照可能耗时过长,影响正常日志同步。可以通过使用 copy-on-write 技术避免快照过程影响正常日志同步。

说明:本文仅阐述 Raft 算法的核心内容,不包括算法论证、评估等

参考资料

Redis 事务

Redis 仅支持“非严格”的事务。所谓“非严格”是指:Redis 事务保证“全部执行命令”;但是,Redis 事务“不支持回滚”。

关键词:事务ACIDMULTIEXECDISCARDWATCH

Redis 事务简介

什么是 ACID

ACID 是数据库事务正确执行的四个基本要素。

  • 原子性(Atomicity)
    • 事务被视为不可分割的最小单元,事务中的所有操作要么全部提交成功,要么全部失败回滚
    • 回滚可以用日志来实现,日志记录着事务所执行的修改操作,在回滚时反向执行这些修改操作即可。
  • 一致性(Consistency)
    • 数据库在事务执行前后都保持一致性状态。
    • 在一致性状态下,所有事务对一个数据的读取结果都是相同的。
  • 隔离性(Isolation)
    • 一个事务所做的修改在最终提交以前,对其它事务是不可见的。
  • 持久性(Durability)
    • 一旦事务提交,则其所做的修改将会永远保存到数据库中。即使系统发生崩溃,事务执行的结果也不能丢失。
    • 可以通过数据库备份和恢复来实现,在系统发生奔溃时,使用备份的数据库进行数据恢复。

一个支持事务(Transaction)中的数据库系统,必需要具有这四种特性,否则在事务过程(Transaction processing)当中无法保证数据的正确性。

  • 只有满足一致性,事务的执行结果才是正确的。
  • 在无并发的情况下,事务串行执行,隔离性一定能够满足。此时只要能满足原子性,就一定能满足一致性。
  • 在并发的情况下,多个事务并行执行,事务不仅要满足原子性,还需要满足隔离性,才能满足一致性。
  • 事务满足持久化是为了能应对系统崩溃的情况。

ACID

Redis 事务的特性

Redis 的事务总是支持 ACID 中的原子性、一致性和隔离性, 当服务器运行在 AOF 持久化模式下, 并且 appendfsync 选项的值为 always 时, 事务也具有持久性。

但需要注意的是:Redis 仅支持“非严格”的事务。这里的“非严格”,其实指的是 Redis 事务只能部分保证 ACID 中的原子性。

  • Redis 事务保证全部执行命令 - Redis 事务中的多个命令会被打包到事务队列中,然后按先进先出(FIFO)的顺序执行。事务在执行过程中不会被中断,当事务队列中的所有命令都被执行完毕之后,事务才会结束。
  • Redis 事务不支持回滚 - 如果命令执行失败不会回滚,而是会继续执行下去。

Redis 官方的事务特性文档给出的不支持回滚的理由是:

  • Redis 命令只会因为错误的语法而失败,或是命令用在了错误类型的键上面。
  • 因为不需要对回滚进行支持,所以 Redis 的内部可以保持简单且快速。

Redis 事务应用

MULTIEXECDISCARDWATCH 是 Redis 事务相关的命令。

事务可以一次执行多个命令, 并且有以下两个重要的保证:

  • 事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
  • 事务是一个原子操作:事务中的命令要么全部被执行,要么全部都不执行。

Redis 有天然解决这个并发竞争问题的类 CAS 乐观锁方案:每次要写之前,先判断一下当前这个 value 的时间戳是否比缓存里的 value 的时间戳要新。如果是的话,那么可以写,否则,就不能用旧的数据覆盖新的数据。

MULTI

MULTI 命令用于开启一个事务,它总是返回 OK 。

MULTI 执行之后, 客户端可以继续向服务器发送任意多条命令, 这些命令不会立即被执行, 而是被放到一个队列中, 当 EXEC 命令被调用时, 所有队列中的命令才会被执行。

以下是一个事务例子, 它原子地增加了 foobar 两个键的值:

1
2
3
4
5
6
7
8
9
> MULTI
OK
> INCR foo
QUEUED
> INCR bar
QUEUED
> EXEC
1) (integer) 1
2) (integer) 1

EXEC

EXEC 命令负责触发并执行事务中的所有命令。

  • 如果客户端在使用 MULTI 开启了一个事务之后,却因为断线而没有成功执行 EXEC ,那么事务中的所有命令都不会被执行。
  • 另一方面,如果客户端成功在开启事务之后执行 EXEC ,那么事务中的所有命令都会被执行。

MULTIEXEC 中的操作将会一次性发送给服务器,而不是一条一条发送,这种方式称为流水线,它可以减少客户端与服务器之间的网络通信次数从而提升性能。

DISCARD

当执行 DISCARD 命令时, 事务会被放弃, 事务队列会被清空, 并且客户端会从事务状态中退出。

示例:

1
2
3
4
5
6
7
8
9
10
> SET foo 1
OK
> MULTI
OK
> INCR foo
QUEUED
> DISCARD
OK
> GET foo
"1"

WATCH

WATCH 命令可以为 Redis 事务提供 check-and-set (CAS)行为。

WATCH 的键会被监视,并会发觉这些键是否被改动过了。 如果有至少一个被监视的键在 EXEC 执行之前被修改了, 那么整个事务都会被取消, EXEC 返回 nil-reply 来表示事务已经失败。

1
2
3
4
5
6
WATCH mykey
val = GET mykey
val = val + 1
MULTI
SET mykey $val
EXEC

使用上面的代码, 如果在 WATCH 执行之后, EXEC 执行之前, 有其他客户端修改了 mykey 的值, 那么当前客户端的事务就会失败。 程序需要做的, 就是不断重试这个操作, 直到没有发生碰撞为止。

这种形式的锁被称作乐观锁, 它是一种非常强大的锁机制。 并且因为大多数情况下, 不同的客户端会访问不同的键, 碰撞的情况一般都很少, 所以通常并不需要进行重试。

WATCH 使得 EXEC 命令需要有条件地执行:事务只能在所有被监视键都没有被修改的前提下执行,如果这个前提不能满足的话,事务就不会被执行。

WATCH 命令可以被调用多次。对键的监视从 WATCH 执行之后开始生效,直到调用 EXEC 为止。

用户还可以在单个 WATCH 命令中监视任意多个键,例如:

1
2
redis> WATCH key1 key2 key3
OK

取消 WATCH 的场景

EXEC 被调用时, 不管事务是否成功执行, 对所有键的监视都会被取消。另外, 当客户端断开连接时, 该客户端对键的监视也会被取消。

使用无参数的 UNWATCH 命令可以手动取消对所有键的监视。 对于一些需要改动多个键的事务, 有时候程序需要同时对多个键进行加锁, 然后检查这些键的当前值是否符合程序的要求。 当值达不到要求时, 就可以使用 UNWATCH 命令来取消目前对键的监视, 中途放弃这个事务, 并等待事务的下次尝试。

使用 WATCH 创建原子操作

WATCH 可以用于创建 Redis 没有内置的原子操作。

举个例子,以下代码实现了原创的 ZPOP 命令,它可以原子地弹出有序集合中分值(score)最小的元素:

1
2
3
4
5
WATCH zset
element = ZRANGE zset 0 0
MULTI
ZREM zset element
EXEC

Lua 脚本

为什么使用 Lua

前面提到了,Redis 仅支持“非严格”的事务

Lua 是一种轻量小巧的脚本语言,用标准 C 语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

Redis 从 2.6 版本开始支持执行 Lua 脚本,它的功能和事务非常类似。我们可以利用 Lua 脚本来批量执行多条 Redis 命令,这些 Redis 命令会被提交到 Redis 服务器一次性执行完成,大幅减小了网络开销。

Redis 执行 Lua 是原子操作。因为 Redis 使用串行化的方式来执行 Redis 命令, 所以在任何特定时间里, 最多都只会有一个脚本能够被放进 Lua 环境里面运行, 因此, 整个 Redis 服务器只需要创建一个 Lua 环境即可。由于,Redis 执行 Lua 具有原子性,所以常被用于需要原子性执行多命令的场景。

不过,如果 Lua 脚本运行时出错并中途结束,出错之后的命令是不会被执行的。并且,出错之前执行的命令是无法被撤销的,无法实现类似关系型数据库执行失败可以回滚的那种原子性效果。因此, 严格来说的话,通过 Lua 脚本来批量执行 Redis 命令实际也是不完全满足原子性的。

如果想要让 Lua 脚本中的命令全部执行,必须保证语句语法和命令都是对的。

另外,Redis 7.0 新增了 Redis functions 特性,你可以将 Redis functions 看作是比 Lua 更强大的脚本。

Redis 脚本命令

命令 说明
EVAL EVAL 命令为客户端输入的脚本在 Lua 环境中定义一个函数, 并通过调用这个函数来执行脚本。
EVALSHA EVALSHA 命令通过直接调用 Lua 环境中已定义的函数来执行脚本。
SCRIPT_FLUSH SCRIPT_FLUSH 命令会清空服务器 lua_scripts 字典中保存的脚本, 并重置 Lua 环境。
SCRIPT_EXISTS SCRIPT_EXISTS 命令接受一个或多个 SHA1 校验和为参数, 并通过检查 lua_scripts 字典来确认校验和对应的脚本是否存在。
SCRIPT_LOAD SCRIPT_LOAD 命令接受一个 Lua 脚本为参数, 为该脚本在 Lua 环境中创建函数, 并将脚本保存到 lua_scripts 字典中。
SCRIPT_KILL SCRIPT_KILL 命令用于停止正在执行的脚本。

Redis 执行 Lua 的工作流程

为了在 Redis 服务器中执行 Lua 脚本, Redis 在服务器内嵌了一个 Lua 环境(environment), 并对这个 Lua 环境进行了一系列修改, 从而确保这个 Lua 环境可以满足 Redis 服务器的需要。

Redis 服务器创建并修改 Lua 环境的整个过程由以下步骤组成:

  1. 创建一个基础的 Lua 环境, 之后的所有修改都是针对这个环境进行的。
  2. 载入多个函数库到 Lua 环境里面, 让 Lua 脚本可以使用这些函数库来进行数据操作。
  3. 创建全局表格 redis , 这个表格包含了对 Redis 进行操作的函数, 比如用于在 Lua 脚本中执行 Redis 命令的 redis.call 函数。
  4. 使用 Redis 自制的随机函数来替换 Lua 原有的带有副作用的随机函数, 从而避免在脚本中引入副作用。
  5. 创建排序辅助函数, Lua 环境使用这个辅佐函数来对一部分 Redis 命令的结果进行排序, 从而消除这些命令的不确定性。
  6. 创建 redis.pcall 函数的错误报告辅助函数, 这个函数可以提供更详细的出错信息。
  7. 对 Lua 环境里面的全局环境进行保护, 防止用户在执行 Lua 脚本的过程中, 将额外的全局变量添加到了 Lua 环境里面。
  8. 将完成修改的 Lua 环境保存到服务器状态的 lua 属性里面, 等待执行服务器传来的 Lua 脚本。

Redis 执行 Lua 的要点

  • Redis 服务器专门使用一个伪客户端来执行 Lua 脚本中包含的 Redis 命令。
  • Redis 使用脚本字典来保存所有被 EVAL 命令执行过, 或者被 SCRIPT_LOAD 命令载入过的 Lua 脚本, 这些脚本可以用于实现 SCRIPT_EXISTS 命令, 以及实现脚本复制功能。
  • 服务器在执行脚本之前, 会为 Lua 环境设置一个超时处理钩子, 当脚本出现超时运行情况时, 客户端可以通过向服务器发送 SCRIPT_KILL 命令来让钩子停止正在执行的脚本, 或者发送 SHUTDOWN nosave 命令来让钩子关闭整个服务器。
  • 主服务器复制 EVALSCRIPT_FLUSHSCRIPT_LOAD 三个命令的方法和复制普通 Redis 命令一样 —— 只要将相同的命令传播给从服务器就可以了。
  • 主服务器在复制 EVALSHA 命令时, 必须确保所有从服务器都已经载入了 EVALSHA 命令指定的 SHA1 校验和所对应的 Lua 脚本, 如果不能确保这一点的话, 主服务器会将 EVALSHA 命令转换成等效的 EVAL 命令, 并通过传播 EVAL 命令来获得相同的脚本执行效果。

Redis + Lua 实现分布式锁

Redis 应用 Lua 的一个经典使用场景是实现分布式锁。其实现有 3 个重要的考量点:

  1. 互斥(只能有一个客户端获取锁)
  2. 不能死锁
  3. 容错(只要大部分 redis 节点创建了这把锁就可以)

对应的 Redis 指令如下:

  • setnx - setnx key val:当且仅当 key 不存在时,set 一个 key 为 val 的字符串,返回 1;若 key 存在,则什么都不做,返回 0。
  • expire - expire key timeout:为 key 设置一个超时时间,单位为 second,超过这个时间锁会自动释放,避免死锁。
  • delete - delete key:删除 key

注意:

不要将 setnxexpire 作为两个命令组合实现加锁,这样就无法保证原子性。如果客户端在 setnx 之后崩溃,那么将导致锁无法释放。正确的做法应是在 setnx 命令中指定 expire 时间。

(1)申请锁

1
SET resource_name my_random_value NX PX 30000

执行这个命令就 ok。

  • NX:表示只有 key 不存在的时候才会设置成功。(如果此时 redis 中存在这个 key,那么设置失败,返回 nil
  • PX 30000:意思是 30s 后锁自动释放。别人创建的时候如果发现已经有了就不能加锁了。

(2)释放锁

释放锁就是删除 key ,但是一般可以用 lua 脚本删除,判断 value 一样才删除:

1
2
3
4
5
6
-- 删除锁的时候,找到 key 对应的 value,跟自己传过去的 value 做比较,如果是一样的才删除。
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

参考资料

Redis 脚本

Redis 脚本使用 Lua 解释器来执行脚本。 Redis 2.6 版本通过内嵌支持 Lua 环境。

关键词:Lua

为什么使用 Lua

Lua 是一种轻量小巧的脚本语言,用标准 C 语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

在 Redis 中,执行单一命令是原子性操作,所以不会出现并发问题。但有的业务场景下,需要执行多个命令,同时确保不出现并发问题,这就需要用到 Lua 脚本了。

Redis 执行 Lua 是原子操作。因为 Redis 使用串行化的方式来执行 Redis 命令, 所以在任何特定时间里, 最多都只会有一个脚本能够被放进 Lua 环境里面运行, 因此, 整个 Redis 服务器只需要创建一个 Lua 环境即可。

由于,Redis 执行 Lua 具有原子性,所以常被用于需要原子性执行多命令的场景。

Redis 脚本命令

命令 说明
EVAL EVAL 命令为客户端输入的脚本在 Lua 环境中定义一个函数, 并通过调用这个函数来执行脚本。
EVALSHA EVALSHA 命令通过直接调用 Lua 环境中已定义的函数来执行脚本。
SCRIPT_FLUSH SCRIPT_FLUSH 命令会清空服务器 lua_scripts 字典中保存的脚本, 并重置 Lua 环境。
SCRIPT_EXISTS SCRIPT_EXISTS 命令接受一个或多个 SHA1 校验和为参数, 并通过检查 lua_scripts 字典来确认校验和对应的脚本是否存在。
SCRIPT_LOAD SCRIPT_LOAD 命令接受一个 Lua 脚本为参数, 为该脚本在 Lua 环境中创建函数, 并将脚本保存到 lua_scripts 字典中。
SCRIPT_KILL SCRIPT_KILL 命令用于停止正在执行的脚本。

Redis 执行 Lua 的工作流程

为了在 Redis 服务器中执行 Lua 脚本, Redis 在服务器内嵌了一个 Lua 环境(environment), 并对这个 Lua 环境进行了一系列修改, 从而确保这个 Lua 环境可以满足 Redis 服务器的需要。

Redis 服务器创建并修改 Lua 环境的整个过程由以下步骤组成:

  1. 创建一个基础的 Lua 环境, 之后的所有修改都是针对这个环境进行的。
  2. 载入多个函数库到 Lua 环境里面, 让 Lua 脚本可以使用这些函数库来进行数据操作。
  3. 创建全局表格 redis , 这个表格包含了对 Redis 进行操作的函数, 比如用于在 Lua 脚本中执行 Redis 命令的 redis.call 函数。
  4. 使用 Redis 自制的随机函数来替换 Lua 原有的带有副作用的随机函数, 从而避免在脚本中引入副作用。
  5. 创建排序辅助函数, Lua 环境使用这个辅佐函数来对一部分 Redis 命令的结果进行排序, 从而消除这些命令的不确定性。
  6. 创建 redis.pcall 函数的错误报告辅助函数, 这个函数可以提供更详细的出错信息。
  7. 对 Lua 环境里面的全局环境进行保护, 防止用户在执行 Lua 脚本的过程中, 将额外的全局变量添加到了 Lua 环境里面。
  8. 将完成修改的 Lua 环境保存到服务器状态的 lua 属性里面, 等待执行服务器传来的 Lua 脚本。

Redis 执行 Lua 的要点

  • Redis 服务器专门使用一个伪客户端来执行 Lua 脚本中包含的 Redis 命令。
  • Redis 使用脚本字典来保存所有被 EVAL 命令执行过, 或者被 SCRIPT_LOAD 命令载入过的 Lua 脚本, 这些脚本可以用于实现 SCRIPT_EXISTS 命令, 以及实现脚本复制功能。
  • 服务器在执行脚本之前, 会为 Lua 环境设置一个超时处理钩子, 当脚本出现超时运行情况时, 客户端可以通过向服务器发送 SCRIPT_KILL 命令来让钩子停止正在执行的脚本, 或者发送 SHUTDOWN nosave 命令来让钩子关闭整个服务器。
  • 主服务器复制 EVALSCRIPT_FLUSHSCRIPT_LOAD 三个命令的方法和复制普通 Redis 命令一样 —— 只要将相同的命令传播给从服务器就可以了。
  • 主服务器在复制 EVALSHA 命令时, 必须确保所有从服务器都已经载入了 EVALSHA 命令指定的 SHA1 校验和所对应的 Lua 脚本, 如果不能确保这一点的话, 主服务器会将 EVALSHA 命令转换成等效的 EVAL 命令, 并通过传播 EVAL 命令来获得相同的脚本执行效果。

参考资料

Markdown 极简教程

目录

标题

Markdown 支持六个级别的标题。

1
2
3
4
5
6
7
语法:
# 一级标题
## 二级标题
### 三级标题
#### 四级标题
##### 五级标题
###### 六级标题

文本样式

:bulb: 粗体、斜体、删除线可以混合使用。

在 Markdown 中,粗体文本、斜体文本可以使用 *_ 符号标记。建议统一风格,始终只用一种符号。

语法 效果
普通文本 普通文本
*斜体文本* _斜体文本_ 斜体文本 斜体文本
**粗体文本** __粗体文本__ 粗体文本 粗体文本
~~删除文本~~ 删除文本
***粗斜体文本*** ___粗斜体文本___ 粗斜体文本 粗斜体文本

列表

无序列表

  • RED
  • YELLOW
  • BLUE

有序列表

  1. 第一步
  2. 第二步
  3. 第三步

任务列表

  • 完成任务
  • 计划任务

多级列表

  • 数据结构
    • 线性表
      • 顺序表
      • 链表
        • 单链表
        • 双链表
      • 二叉树
        • 二叉平衡树

分割线

***---___ 都可以作为分割线。




链接

普通链接

语法:

1
[钝悟的博客](https://dunwu.github.io/waterdrop/)
  • [] 中标记链接名。类似 HTML 中 <a> 元素的 title 属性。
  • () 中标记链接的 url,也支持相对路径(前提是资源可以访问)。类似 HTML 中 <a> 元素的 href 属性。

效果:

图片

Markdown 引用图片的语法:

1
![alt](url title)

alt 和 title 即对应 HTML 中 img 元素的 alt 和 title 属性(都可省略):

  • alt - 表示图片显示失败时的替换文本。

  • title - 表示鼠标悬停在图片时的显示文本(注意这里要加引号)

  • url - 即图片的 url 地址

logo

图片链接

可以将图片和链接混合使用。

logo

锚点

其实呢,每一个标题都是一个锚点,和 HTML 的锚点(#)类似,比如:回到顶部

引用

普通引用:

:question: 什么是 Markdown

Markdown是一种轻量级标记语言,创始人为约翰·格鲁伯(英语:John Gruber)。它允许人们“使用易读易写的纯文本格式编写文档,然后转换成有效的XHTML(或者HTML)文档”。[4]这种语言吸收了很多在电子邮件中已有的纯文本标记的特性。 —— 摘自 Wiki

嵌套引用:

数据结构

二叉树

平衡二叉树

满二叉树

代码高亮

标签

语法:

1
`Markdown` `Doc`

效果:

Markdown, Doc

代码块

语法一:在文本前后都使用三个反引号进行标记。【✔️️️️ 推荐】

1
2
3
这是一个文本块。
这是一个文本块。
这是一个文本块。

语法二:在连续几行的文本开头加入 1 个 Tab 或者 4 个空格。【❌ 不推荐】

这是一个文本块。
这是一个文本块。
这是一个文本块。

语法

在三个反引号后面加上编程语言的名字,另起一行开始写代码,最后一行再加上三个反引号。

1
public static void main(String[]args){} //Java
1
int main(int argc, char *argv[]) //C
1
echo "hello GitHub" #Bash
1
document.getElementById('myH1').innerHTML = 'Welcome to my Homepage' //javascipt
1
string &operator+(const string& A,const string& B) //cpp

表格

一般表格:

表头 1 表头 2
表格单元 表格单元
表格单元 表格单元

表格可以指定对齐方式:

序号 商品 价格
1 电脑 6000.0
2 鼠标 100.0
3 键盘 200.0

Emoji 表情

:bulb: 注意:部分 Markdown 引擎支持 Emoji。

合理使用 Emoji 表情,往往可以使得文章内容更加丰富生动。例如::heavy_check_mark: :x: :bulb: :bell: :heavy_exclamation_mark: :question:

更多 Emoji 表情请参考:

注脚

:bulb: 注意:部分 Markdown 引擎支持注脚。

一个具有注脚的文本。^1

数学公式

:bulb: 注意:部分 Markdown 引擎支持 Latex。

很多文档中,需要引入一些数学符号、特殊符号,其排版问题比较头疼。这种问题,可以用 Latex 来解决,大部分 Markdown 引擎都支持 Latex。

Latex 可以使用 $ 符号来标记 Latex 表达式,下面是一个数学公式示例:

$$
\Gamma(z) = \int_0^\infty t^{z-1}e^{-t}dt,.
$$

列举一些常用数学符号:

符号 语法 描述
$\leq$ $\leq$ 小于等于
$\geq$ $\geq$ 大于等于
$\neq$ $\neq$ 不等于
$\approx$ $\approx$ 约等于
$\infty$ $\infty$ 无穷
$\prod_{x}^{y}$ $\prod_{x}^{y}$ 累乘
$\sum_{i=0}^n$ $\sum_{i=0}^n$ 求和
$\int$ $\int$ 积分
$\iint$ $\iint$ 双重积分
$\log_x{y}$ $\log_x{y}$ 对数
$x^{y+1}$ $x^{y+1}$ 上标
$x_{y+1}$ $x_{y+1}$ 下标
$\frac{x}{y}$ $\frac{x}{y}$ 分数
$\sqrt[y]{x}$ $\sqrt[y]{x}$ 开方
$\sin$ $\sin$ 正弦
$\cos$ $\cos$ 余弦
$\tan$ $\tan$ 正切

更多数学符号支持请参考:

Diff

:bulb: 注意:部分 Markdown 引擎支持 Diff。

版本控制的系统中都少不了 diff 的功能,即展示一个文件内容的增加与删除。
GFM 中可以显示的展示 diff 效果。可以用 + 开头表示新增,- 开头表示删除。

1
2
+ 新增内容
- 删除内容

UML 图

💡 注意:部分 Markdown 引擎支持 mermaid

mermaid 提供了多种 UML 图。详情请参考:mermaid 文档

流程图

1
2
3
4
5
graph LR
A[Hard edge] -->|Link text| B(Round edge)
B --> C{Decision}
C -->|One| D[Result one]
C -->|Two| E[Result two]

时序图

1
2
3
4
5
6
7
8
9
10
sequenceDiagram
Alice->>Bob: Hello Bob, how are you?
alt is sick
Bob->>Alice: Not so good :(
else is well
Bob->>Alice: Feeling fresh like a daisy
end
opt Extra response
Bob->>Alice: Thanks for asking
end

甘特图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
gantt
dateFormat YYYY-MM-DD
title Adding GANTT diagram functionality to mermaid

section A section
Completed task :done, des1, 2014-01-06,2014-01-08
Active task :active, des2, 2014-01-09, 3d
Future task : des3, after des2, 5d
Future task2 : des4, after des3, 5d

section Critical tasks
Completed task in the critical line :crit, done, 2014-01-06,24h
Implement parser and jison :crit, done, after des1, 2d
Create tests for parser :crit, active, 3d
Future task in critical line :crit, 5d
Create tests for renderer :2d
Add to mermaid :1d

section Documentation
Describe gantt syntax :active, a1, after des1, 3d
Add gantt diagram to demo page :after a1 , 20h
Add another diagram to demo page :doc1, after a1 , 48h

section Last section
Describe gantt syntax :after doc1, 3d
Add gantt diagram to demo page :20h
Add another diagram to demo page :48h

HTML

有些 Markdown 引擎支持在文档中嵌入的 html 元素。

有些 Markdown 语法所不支持的特性,可以使用 html 元素来支持。

折叠

折叠内容一

展开才能看到的内容

折叠内容二

展开才能看到的内容

居中

居中显示的文本

图片尺寸

编辑器

推荐 Markdown 编辑器

  • Typora - 个人认为是功能最强的 Markdown 编辑器。
  • Visual Studio Code - 可以通过安装插件,量身打造 Markdown 编辑器。
  • marktext - 一款简单优雅的 Markdown 编辑器。
  • StackEdit - 在线 Markdown 编辑器。
  • Editor.md - 在线 Markdown 编辑器。
  • Marxico - 一款专为印象笔记(Evernote)打造的 Markdown 编辑器。

想了解更多 Markdown 编辑器可以参考:主流 Markdown 编辑器推荐

参考资料

流量控制

在高并发场景下,为了应对瞬时海量请求的压力,保障系统的平稳运行,必须预估系统的流量阈值,通过限流规则阻断处理不过来的请求。

流量控制简介

什么是流量控制

流量控制(Flow Control),根据流量、并发线程数、响应时间等指标,把随机到来的流量调整成合适的形状,即流量塑形。避免应用被瞬时的流量高峰冲垮,从而保障应用的高可用性。

为什么需要流量控制

复杂的分布式系统架构中的应用程序往往具有数十个依赖项,每个依赖项都会不可避免地在某个时刻失败。 如果主机应用程序未与这些外部故障隔离开来,则可能会被波及。

例如,对于依赖于 30 个服务的应用程序,假设每个服务的正常运行时间为 99.99%,则可以期望:

99.9930 = 99.7% 的正常运行时间

10 亿个请求中的 0.3%= 3,000,000 个失败

即使所有依赖项都具有出色的正常运行时间,每月也会有 2 个小时以上的停机时间。

然而,现实情况一般比这种估量情况更糟糕。


当一切正常时,整体系统如下所示:

img

图片来自 Hystrix Wiki

在分布式系统架构下,这些强依赖的子服务稳定与否对系统的影响非常大。但是,依赖的子服务可能有很多不可控问题:如网络连接、资源繁忙、服务宕机等。例如:下图中有一个 QPS 为 50 的依赖服务 I 出现不可用,但是其他依赖服务是可用的。

img

图片来自 Hystrix Wiki

当流量很大的情况下,某个依赖的阻塞,会导致上游服务请求被阻塞。当这种级联故障愈演愈烈,就可能造成整个线上服务不可用的雪崩效应,如下图。这种情况若持续恶化,如果上游服务本身还被其他服务所依赖,就可能出现多米洛骨牌效应,导致多个服务都无法正常工作。

img

图片来自 Hystrix Wiki

流量控制有哪些保护机制

流量控制常见的手段就是限流、熔断、降级。

什么是降级?

降级是保障服务能够稳定运行的一种保护方式:面对突增的流量,牺牲一些吞吐量以换取系统的稳定。常见的降级实现方式有:开关降级、限流降级、熔断降级。

什么是限流?

限流一般针对下游服务,当上游流量较大时,避免被上游服务的请求撑爆。

限流就是限制系统的输入和输出流量,以达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。

限流规则包含三个部分:时间粒度,接口粒度,最大限流值。限流规则设置是否合理直接影响到限流是否合理有效。

什么是熔断?

熔断一般针对上游服务,当下游服务超时/异常较多时,避免被下游服务拖垮。

当调用链路中某个资源出现不稳定,例如,超时异常比例升高的时候,则对这个资源的调用进行限制,并让请求快速失败,避免影响到其它的资源,最终产生雪崩的效果。

熔断尽最大的可能去完成所有的请求,容忍一些失败,熔断也能自动恢复。熔断的常见策略有:

  • 在每秒请求异常数超过多少时触发熔断降级
  • 在每秒请求异常错误率超过多少时触发熔断降级
  • 在每秒请求平均耗时超过多少时触发熔断降级

流量控制有哪些衡量指标

流量控制有以下几个角度:

  • 流量指标,例如 QPS、并发线程数等。
  • 资源的调用关系,例如资源的调用链路,资源和资源之间的关系,调用来源等。
  • 控制效果,例如排队等待、直接拒绝、Warm Up(预热)等。

限流算法

限流的本质是:在一定的时间范围内,限制某一个资源被访问的频率。如何去限制流量,就需要采用一定的策略,即限流算法。常见的限流算法有:固定窗口限流算法、滑动窗口限流算法、漏桶限流算法、令牌桶限流算法。

下面,将对这几种算法进行一一介绍。

固定窗口限流算法

固定窗口限流算法的原理

固定窗口限流算法的基本策略是:

  1. 设置一个固定时间窗口,以及这个固定时间窗口内的最大请求数;
  2. 为每个固定时间窗口设置一个计数器,用于统计请求数;
  3. 一旦请求数超过最大请求数,则请求会被拦截。

img

固定窗口限流算法的利弊

固定窗口限流算法的优点是:实现简单。

固定窗口限流算法的缺点是:存在临界问题。所谓临界问题,是指:流量分别集中在一个固定时间窗口的尾部和一个固定时间窗口的头部。举例来说,假设限流规则为每分钟不超过 100 次请求。在第一个时间窗口中,起初没有任何请求,在最后 1 s,收到 100 次请求,由于没有达到阈值,所有请求都通过;在第二个时间窗口中,第 1 秒就收到 100 次请求,而后续没有任何请求。虽然,这两个时间窗口内的流量都符合限流要求,但是在两个时间窗口临界的这 2s 内,实际上有 200 次请求,显然是超过预期吞吐量的,存在压垮系统的可能。

img

固定窗口限流算法的实现

:::details Java 版本的固定窗口限流算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class SlidingWindowRateLimiter implements RateLimiter {

/**
* 允许的最大请求数
*/
private final long maxPermits;

/**
* 窗口期时长
*/
private long periodMillis;

/**
* 分片窗口期时长
*/
private final long shardPeriodMillis;

/**
* 窗口期截止时间
*/
private long lastPeriodMillis;

/**
* 分片窗口数
*/
private final int shardNum;

/**
* 请求总计数
*/
private final AtomicLong totalCount = new AtomicLong(0);

/**
* 分片窗口计数列表
*/
private final List<AtomicLong> countList = new LinkedList<>();

public SlidingWindowRateLimiter(long qps, int shardNum) {
this(qps, 1000, TimeUnit.MILLISECONDS, shardNum);
}

public SlidingWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, int shardNum) {
this.maxPermits = maxPermits;
this.periodMillis = timeUnit.toMillis(period);
this.lastPeriodMillis = System.currentTimeMillis();
this.shardPeriodMillis = timeUnit.toMillis(period) / shardNum;
this.shardNum = shardNum;
for (int i = 0; i < shardNum; i++) {
countList.add(new AtomicLong(0));
}
}

@Override
public synchronized boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
if (now > lastPeriodMillis) {
for (int shardId = 0; shardId < shardNum; shardId++) {
long shardCount = countList.get(shardId).get();
totalCount.addAndGet(-shardCount);
countList.set(shardId, new AtomicLong(0));
lastPeriodMillis += shardPeriodMillis;
}
}
int shardId = (int) (now % periodMillis / shardPeriodMillis);
if (totalCount.get() + permits <= maxPermits) {
countList.get(shardId).addAndGet(permits);
totalCount.addAndGet(permits);
return true;
} else {
return false;
}
}

}

:::

滑动窗口限流算法

滑动窗口限流算法的原理

滑动窗口限流算法是对固定窗口限流算法的改进,解决了临界问题。

滑动窗口限流算法的基本策略是:

  • 将固定时间窗口分片为多个子窗口,每个子窗口的访问次数独立统计;
  • 当请求时间大于当前子窗口的最大时间时,则将当前子窗口废弃,并将计时窗口向前滑动,并将下一个子窗口置为当前窗口。
  • 要保证所有子窗口的统计数之和不能超过阈值。

滑动窗口限流算法就是针对固定窗口限流算法的更细粒度的控制,分片越多,则限流越精准。

img

滑动窗口限流算法的利弊

滑动窗口限流算法的优点是:在滑动窗口限流算法中,临界位置的突发请求都会被算到时间窗口内,因此可以解决计数器算法的临界问题。

滑动窗口限流算法的缺点是:

  • 额外的内存开销 - 滑动时间窗口限流算法的时间窗口是持续滑动的,并且除了需要一个计数器来记录时间窗口内接口请求次数之外,还需要记录在时间窗口内每个接口请求到达的时间点,所以存在额外的内存开销。
  • 限流的控制粒度受限于窗口分片粒度 - 滑动窗口限流算法,只能在选定的时间粒度上限流,对选定时间粒度内的更加细粒度的访问频率不做限制。但是,由于每个分片窗口都有额外的内存开销,所以也并不是分片数越多越好的。

滑动窗口限流算法的实现

:::details Java 版本的滑动窗口限流算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class SlidingWindowRateLimiter implements RateLimiter {

/**
* 允许的最大请求数
*/
private final long maxPermits;

/**
* 窗口期时长
*/
private final long periodMillis;

/**
* 分片窗口期时长
*/
private final long shardPeriodMillis;

/**
* 窗口期截止时间
*/
private long lastPeriodMillis;

/**
* 分片窗口数
*/
private final int shardNum;

/**
* 请求总计数
*/
private final AtomicLong totalCount = new AtomicLong(0);

/**
* 分片窗口计数列表
*/
private final List<AtomicLong> countList = new LinkedList<>();

public SlidingWindowRateLimiter(long qps, int shardNum) {
this(qps, 1000, TimeUnit.MILLISECONDS, shardNum);
}

public SlidingWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, int shardNum) {
this.maxPermits = maxPermits;
this.periodMillis = timeUnit.toMillis(period);
this.lastPeriodMillis = System.currentTimeMillis();
this.shardPeriodMillis = timeUnit.toMillis(period) / shardNum;
this.shardNum = shardNum;
for (int i = 0; i < shardNum; i++) {
countList.add(new AtomicLong(0));
}
}

@Override
public synchronized boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
if (now > lastPeriodMillis) {
for (int shardId = 0; shardId < shardNum; shardId++) {
long shardCount = countList.get(shardId).get();
totalCount.addAndGet(-shardCount);
countList.set(shardId, new AtomicLong(0));
lastPeriodMillis += shardPeriodMillis;
}
}
int shardId = (int) (now % periodMillis / shardPeriodMillis);
if (totalCount.get() + permits <= maxPermits) {
countList.get(shardId).addAndGet(permits);
totalCount.addAndGet(permits);
return true;
} else {
return false;
}
}

}

:::

漏桶限流算法

漏桶限流算法的原理

漏桶限流算法的基本策略是:

  • 水(请求)以任意速率由入口进入到漏桶中;
  • 水以固定的速率由出口出水(请求通过);
  • 漏桶的容量是固定的,如果水的流入速率大于流出速率,最终会导致漏桶中的水溢出(这意味着请求拒绝)。

img

漏桶限流算法的利弊

漏桶限流算法的优点是:流量速率固定——即无论流量多大,即便是突发的大流量,处理请求的速度始终是固定的。

漏桶限流算法的缺点是:不能灵活的调整流量。例如:一个集群通过增减节点的方式,弹性伸缩了其吞吐能力,漏桶限流算法无法随之调整。

漏桶策略适用于间隔性突发流量且流量不用即时处理的场景

漏桶限流算法的实现

:::details Java 版本的漏桶限流算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import java.util.concurrent.atomic.AtomicLong;

public class LeakyBucketRateLimiter implements RateLimiter {

/**
* QPS
*/
private final int qps;

/**
* 桶的容量
*/
private final long capacity;

/**
* 计算的起始时间
*/
private long beginTimeMillis;

/**
* 桶中当前的水量
*/
private final AtomicLong waterNum = new AtomicLong(0);

public LeakyBucketRateLimiter(int qps, int capacity) {
this.qps = qps;
this.capacity = capacity;
}

@Override
public synchronized boolean tryAcquire(int permits) {

// 如果桶中没有水,直接通过
if (waterNum.get() == 0) {
beginTimeMillis = System.currentTimeMillis();
waterNum.addAndGet(permits);
return true;
}

// 计算水量
long leakedWaterNum = ((System.currentTimeMillis() - beginTimeMillis) / 1000) * qps;
long currentWaterNum = waterNum.get() - leakedWaterNum;
waterNum.set(Math.max(0, currentWaterNum));

// 重置时间
beginTimeMillis = System.currentTimeMillis();

if (waterNum.get() + permits < capacity) {
waterNum.addAndGet(permits);
return true;
} else {
return false;
}
}

}

:::

令牌桶限流算法

令牌桶限流算法的原理

img

令牌桶算法的原理

  1. 接口限制 T 秒内最大访问次数为 N,则每隔 T/N 秒会放一个 token 到桶中
  2. 桶内最多存放 M 个 token,如果 token 到达时令牌桶已经满了,那么这个 token 就会被丢弃
  3. 接口请求会先从令牌桶中取 token,拿到 token 则处理接口请求,拿不到 token 则进行限流处理

令牌桶限流算法的利弊

因为令牌桶存放了很多令牌,那么大量的突发请求会被执行,但是它不会出现临界问题,在令牌用完之后,令牌是以一个恒定的速率添加到令牌桶中的,因此不能再次发送大量突发请求。

规定固定容量的桶,token 以固定速度往桶内填充,当桶满时 token 不会被继续放入,每过来一个请求把 token 从桶中移除,如果桶中没有 token 不能请求。

令牌桶算法适用于有突发特性的流量,且流量需要即时处理的场景

令牌桶限流算法的实现

:::details Java 实现令牌桶算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.concurrent.atomic.AtomicLong;

/**
* 令牌桶限流算法
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2024-01-18
*/
public class TokenBucketRateLimiter implements RateLimiter {

/**
* QPS
*/
private final long qps;

/**
* 桶的容量
*/
private final long capacity;

/**
* 上一次令牌发放时间
*/
private long endTimeMillis;

/**
* 桶中当前的令牌数量
*/
private final AtomicLong tokenNum = new AtomicLong(0);

public TokenBucketRateLimiter(long qps, long capacity) {
this.qps = qps;
this.capacity = capacity;
this.endTimeMillis = System.currentTimeMillis();
}

@Override
public synchronized boolean tryAcquire(int permits) {

long now = System.currentTimeMillis();
long gap = now - endTimeMillis;

// 计算令牌数
long newTokenNum = (gap * qps / 1000);
long currentTokenNum = tokenNum.get() + newTokenNum;
tokenNum.set(Math.min(capacity, currentTokenNum));

if (tokenNum.get() < permits) {
return false;
} else {
tokenNum.addAndGet(-permits);
endTimeMillis = now;
return true;
}
}

}

:::

扩展

Guava 的 RateLimiter 工具类就是基于令牌桶算法实现,其源码分析可以参考:RateLimiter 基于漏桶算法,但它参考了令牌桶算法

限流算法测试

:::details 限流算法测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class RateLimiterDemo {

public static void main(String[] args) {

// ============================================================================

int qps = 20;

System.out.println("======================= 固定时间窗口限流算法 =======================");
FixedWindowRateLimiter fixedWindowRateLimiter = new FixedWindowRateLimiter(qps);
testRateLimit(fixedWindowRateLimiter, qps);

System.out.println("======================= 滑动时间窗口限流算法 =======================");
SlidingWindowRateLimiter slidingWindowRateLimiter = new SlidingWindowRateLimiter(qps, 10);
testRateLimit(slidingWindowRateLimiter, qps);

System.out.println("======================= 漏桶限流算法 =======================");
LeakyBucketRateLimiter leakyBucketRateLimiter = new LeakyBucketRateLimiter(qps, 100);
testRateLimit(leakyBucketRateLimiter, qps);

System.out.println("======================= 令牌桶限流算法 =======================");
TokenBucketRateLimiter tokenBucketRateLimiter = new TokenBucketRateLimiter(qps, 100);
testRateLimit(tokenBucketRateLimiter, qps);
}

private static void testRateLimit(RateLimiter rateLimiter, int qps) {

AtomicInteger okNum = new AtomicInteger(0);
AtomicInteger limitNum = new AtomicInteger(0);
ExecutorService executorService = ThreadUtil.newFixedExecutor(10, "限流测试", true);
long beginTime = System.currentTimeMillis();

int threadNum = 4;
final CountDownLatch latch = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
executorService.submit(() -> {
try {
batchRequest(rateLimiter, okNum, limitNum, 1000);
} catch (Exception e) {
log.error("发生异常!", e);
} finally {
latch.countDown();
}
});
}

try {
latch.await(10, TimeUnit.SECONDS);
long endTime = System.currentTimeMillis();
long gap = endTime - beginTime;
log.info("限流 QPS: {} -> 实际结果:耗时 {} ms,{} 次请求成功,{} 次请求被限流,实际 QPS: {}",
qps, gap, okNum.get(), limitNum.get(), okNum.get() * 1000 / gap);
if (okNum.get() == qps) {
log.info("限流符合预期");
}
} catch (Exception e) {
log.error("发生异常!", e);
} finally {
executorService.shutdown();
}
}

private static void batchRequest(RateLimiter rateLimiter, AtomicInteger okNum, AtomicInteger limitNum, int num)
throws InterruptedException {
for (int j = 0; j < num; j++) {
if (rateLimiter.tryAcquire(1)) {
log.info("请求成功");
okNum.getAndIncrement();
} else {
log.info("请求限流");
limitNum.getAndIncrement();
}
TimeUnit.MILLISECONDS.sleep(RandomUtil.randomInt(0, 10));
}
}

}

:::

限流框架 - Hystrix

Hystrix 是由 Netflix 开源,用于处理分布式系统的延迟和容错的一个开源组件。在分布式系统里,许多依赖不可避免的会调用失败,比如超时、异常等。Hystrix 采用断路器模式来实现服务间的彼此隔离,从而避免级联故障,以提高分布式系统整体的弹性。

“断路器”本身是一种开关装置,当某个服务单元发生故障之后,通过断路器的故障监控(类似熔断保险丝),向调用方返回一个符合预期的、可处理的备选响应(FallBack),而不是长时间的等待或者抛出调用方无法处理的异常,这样就保证了服务调用方的线程不会被长时间、不必要地占用,从而避免了故障在分布式系统中的蔓延,乃至雪崩。

Hystrix 官方已宣布不再发布新版本。但是,Hystrix 的断路器设计理念,有非常高的学习价值。

如果使用 Hystrix 对每个基础依赖服务进行过载保护,则整个系统架构将会类似下图所示,每个依赖项彼此隔离,受到延迟时发生饱和的资源的被限制访问,并包含 fallback 逻辑(用于降级处理),该逻辑决定了在依赖项中发生任何类型的故障时做出对应的处理。

img

Hystrix 原理

如下图所示,Hystrix 的工作流程大致可以分为 9 个步骤。

img

(一)构建一个 HystrixCommand 或 HystrixObservableCommand 对象

Hystrix 进行资源隔离,其实是提供了一个抽象,叫做命令模式。这也是 Hystrix 最基本的资源隔离技术

在使用 Hystrix 的过程中,会对依赖服务的调用请求封装成命令对象,Hystrix 对 命令对象抽象了两个抽象类:HystrixCommandHystrixObservableCommand

  • HystrixCommand 表示的命令对象会返回一个唯一返回值。
  • HystrixObservableCommand 表示的命令对象会返回多个返回值。
1
2
HystrixCommand command = new HystrixCommand(arg1, arg2);
HystrixObservableCommand command = new HystrixObservableCommand(arg1, arg2);

(二)执行命令

Hystrix 中共有 4 种方式执行命令,如下所示:

执行方式 说明 可用对象
execute() 阻塞式同步执行,返回依赖服务的单一返回结果(或者抛出异常) HystrixCommand
queue() 异步执行,通过 Future 返回依赖服务的单一返回结果(或者抛出异常) HystrixCommand
observe() 基于 Rxjava 的 Observable 方式,返回通过 Observable 表示的依赖服务返回结果。代调用代码先执行 (Hot Obserable) HystrixObservableCommand
toObservable() 基于 Rxjava 的 Observable 方式,返回通过 Observable 表示的依赖服务返回结果。执行代码等到真正订阅的时候才会执行 (cold observable) HystrixObservableCommand

这四种命令中,exeucte()queue()observe() 的表示其实是通过 toObservable() 实现的,其转换关系如下图所示:

img

HystrixCommand 执行方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
K value   = command.execute();
// 等价语句:
K value = command.execute().queue().get();

Future<K> fValue = command.queue();
//等价语句:
Future<K> fValue = command.toObservable().toBlocking().toFuture();

Observable<K> ohValue = command.observe(); //hot observable,立刻订阅,命令立刻执行
//等价语句:
Observable<K> ohValue = command.toObservable().subscribe(subject);

// 上述执行最终实现还是基于 toObservable()
Observable<K> ocValue = command.toObservable(); //cold observable,延后订阅,订阅发生后,执行才真正执行

(三)是否缓存

如果当前命令对象启用了请求缓存,并且请求的响应存在于缓存中,则缓存的响应会立刻以 Observable 的形式返回。

(四)是否开启断路器

如果第三步没有缓存没有命中,则判断一下当前断路器的断路状态是否打开。如果断路器状态为打开状态,则 Hystrix 将不会执行此 Command 命令,直接执行步骤 8 调用 Fallback;

如果断路器状态是关闭,则执行步骤 5 检查是否有足够的资源运行 Command 命令

(五)信号量、线程池是否拒绝

当您执行该命令时,Hystrix 会检查断路器以查看电路是否打开。

如果电路开路(或“跳闸”),则 Hystrix 将不会执行该命令,而是将流程路由到 (8) 获取回退。

如果电路闭合,则流程前进至 (5) 以检查是否有可用容量来运行命令。

如果当前要执行的 Command 命令 先关连的线程池 和队列(或者信号量)资源已经满了,Hystrix 将不会运行 Command 命令,直接执行 步骤 8 的 Fallback 降级处理;如果未满,表示有剩余的资源执行 Command 命令,则执行步骤 6

(六)construct()run()

当经过步骤 5 判断,有足够的资源执行 Command 命令时,本步骤将调用 Command 命令运行方法,基于不同类型的 Command,有如下两种两种运行方式:

运行方式 说明
HystrixCommand.run() 返回一个处理结果或者抛出一个异常
HystrixObservableCommand.construct() 返回一个 Observable 表示的结果(可能多个),或者 基于onError的错误通知

如果run() 或者construct()方法 的真实执行时间超过了 Command 设置的超时时间阈值, 则当前则执行线程(或者是独立的定时器线程)将会抛出TimeoutException。抛出超时异常 TimeoutException,后,将执行**步骤 8 **的 Fallback 降级处理。即使run()或者construct()执行没有被取消或中断,最终能够处理返回结果,但在降级处理逻辑中,将会抛弃run()construct()方法的返回结果,而返回 Fallback 降级处理结果。

注意事项
需要注意的是,Hystrix 无法强制 将正在运行的线程停止掉–Hystrix 能够做的最好的方式就是在 JVM 中抛出一个InterruptedException。如果 Hystrix 包装的工作不抛出中断异常InterruptedException, 则在 Hystrix 线程池中的线程将会继续执行,尽管调用的客户端已经接收到了TimeoutException。这种方式会使 Hystrix 的线程池处于饱和状态。大部分的 Java Http Client 开源库并不会解析 InterruptedException。所以确认 HTTP client 相关的连接和读/写相关的超时时间设置。
如果 Command 命令没有抛出任何异常,并且有返回结果,则 Hystrix 将会在做完日志记录和统计之后会将结果返回。 如果是通过run()方式运行,则返回一个Obserable对象,包含一个唯一值,并且发送一个onCompleted通知;如果是通过consturct()方式运行 ,则返回一个Observable 对象

(七)健康检查

Hystrix 会统计 Command 命令执行执行过程中的成功数失败数拒绝数超时数, 将这些信息记录到断路器 (Circuit Breaker) 中。断路器将上述统计按照时间窗的形式记录到一个定长数组中。断路器根据时间窗内的统计数据去判定请求什么时候可以被熔断,熔断后,在接下来一段恢复周期内,相同的请求过来后会直接被熔断。当再次校验,如果健康监测通过后,熔断开关将会被关闭。

(八)获取 Fallback

当以下场景出现后,Hystrix 将会尝试触发 Fallback:

  • 步骤 6 Command 执行时抛出了任何异常;
  • 步骤 4 断路器已经被打开
  • 步骤 5 执行命令的线程池、队列或者信号量资源已满
  • 命令执行的时间超过阈值

(九)返回结果

如果 Hystrix 命令对象执行成功,将会返回结果,或者以Observable形式包装的结果。根据**步骤 2 **的 command 调用方式,返回的Observable 会按照如下图说是的转换关系进行返回:

img

  • execute() — 用和 .queue() 相同的方式获取 Future,然后调用 Futureget() 以获取 Observable 的单个值。
  • queue() —将 Observable 转换为 BlockingObservable,以便可以将其转换为 Future 并返回。
  • watch() —订阅 Observable 并开始执行命令的流程; 返回一个 Observable,当订阅该 Observable 时,它会重新通知。
  • toObservable() —返回不变的 Observable; 必须订阅它才能真正开始执行命令的流程。

断路器工作原理

img

  1. 断路器时间窗内的请求数是否超过了请求数断路器生效阈值circuitBreaker.requestVolumeThreshold,如果超过了阈值,则将会触发断路,断路状态为开启
    例如,如果当前阈值设置的是20,则当时间窗内统计的请求数共计 19 个,即使 19 个全部失败了,都不会触发断路器。
  2. 并且请求错误率超过了请求错误率阈值errorThresholdPercentage
  3. 如果两个都满足,则将断路器由关闭迁移到开启
  4. 如果断路器开启,则后续的所有相同请求将会被断路掉;
  5. 直到过了沉睡时间窗sleepWindowInMilliseconds后,再发起请求时,允许其通过(此时的状态为半开起状态)。如果请求失败了,则保持断路器状态为开启状态,并更新沉睡时间窗。如果请求成功了,则将断路器状态改为关闭状态;

核心的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public void onNext(HealthCounts hc) {
// check if we are past the statisticalWindowVolumeThreshold
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
//we are not past the minimum error threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
// our failure rate is too high, we need to set the state to OPEN
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}

限流框架 - Sentinel

其他限流解决方案

Guava RateLimiter

Guava 是 Google 开源的 Java 类库,提供了一个工具类 RateLimiter,它基于令牌桶算法实现了本地限流器。

:::details RateLimiter 限流示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 限流器流速:2 个请求/秒
RateLimiter limiter = RateLimiter.create(2.0);
// 执行任务的线程池
ExecutorService es = Executors.newFixedThreadPool(1);
// 记录上一次执行时间
prev = System.nanoTime();
// 测试执行 20 次
for (int i = 0; i < 20; i++) {
// 限流器限流
limiter.acquire();
// 提交任务异步执行
es.execute(() -> {
long cur = System.nanoTime();
// 打印时间间隔:毫秒
System.out.println((cur - prev) / 1000000);
prev = cur;
});
}

// 输出结果:
// ...
// 500
// 499
// 500
// 499

:::

Redis + Lua

如果想要针对分布式系统资源进行限流,则必须具备两个要素:

  1. 对于资源的访问统计,必须是所有分布式节点都可以共享访问的数据存储;并且,由于在高并发场景下,读写访问统计数据会很频繁,该数据存储必须有很高的读写性能。
  2. 访问统计、限流计算都以原子操作方式进行。

满足以上要素的一种简单解决方案是,采用 Redis + Lua 来实现,原因在于:

  • Redis 数据库的读写性能极高;
  • Redis 支持以原子操作的方式执行 Lua 脚本。

Redis + Lua 实现的固定窗口限流算法

Redis + Lua 实现的固定窗口限流算法实现思路:

  • 根据实际需要,将当前时间格式化为天(yyyyMMdd)、时(yyyyMMddHH)、分(yyyyMMddHHmm)、秒(yyyyMMddHHmmss),并作为 Redis 的 String 类型 Key。该 Key 可以视为一个固定时间窗口,其中的 value 用于统计访问量;
  • 用于代表不同粒度的时间窗口按需设置过期时间;
  • 一旦达到窗口的限流阈值时,请求被限流;否则请求通过。

:::details Redis + Lua 实现的固定窗口限流算法

下面的代码片段模拟通过一个大小为 1 分钟的固定时间窗口进行限流,阈值为 100,过期时间 60s。

限流脚本 fixed_window_rate_limit.lua 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-- 缓存 Key
local key = KEYS[1]
-- 访问请求数
local permits = tonumber(ARGV[1])
-- 过期时间
local seconds = tonumber(ARGV[2])
-- 限流阈值
local limit = tonumber(ARGV[3])

-- 获取统计值
local count = tonumber(redis.call('GET', key) or "0")

if count + permits > limit then
-- 触发限流
return 0
else
redis.call('INCRBY', key, permits)
redis.call('EXPIRE', key, seconds)
return count + permits
end

调用 lua 的实际限流代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.resource.ResourceUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class RedisFixedWindowRateLimiter implements RateLimiter {

private static final String REDIS_HOST = "localhost";

private static final int REDIS_PORT = 6379;

private static final Jedis JEDIS;

public static final String SCRIPT;

static {
// Jedis 有多种构造方法,这里选用最简单的一种情况
JEDIS = new Jedis(REDIS_HOST, REDIS_PORT);

// 触发 ping 命令
try {
JEDIS.ping();
System.out.println("jedis 连接成功");
} catch (JedisConnectionException e) {
e.printStackTrace();
}

SCRIPT = FileUtil.readString(ResourceUtil.getResource("scripts/fixed_window_rate_limit.lua"),
StandardCharsets.UTF_8);
}

private final long maxPermits;
private final long periodSeconds;
private final String key;

public RedisFixedWindowRateLimiter(long qps, String key) {
this(qps * 60, 60, TimeUnit.SECONDS, key);
}

public RedisFixedWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, String key) {
this.maxPermits = maxPermits;
this.periodSeconds = timeUnit.toSeconds(period);
this.key = key;
}

@Override
public boolean tryAcquire(int permits) {
List<String> keys = Collections.singletonList(key);
List<String> args = CollectionUtil.newLinkedList(String.valueOf(permits), String.valueOf(periodSeconds),
String.valueOf(maxPermits));
Object eval = JEDIS.eval(SCRIPT, keys, args);
long value = (long) eval;
return value != -1;
}

public static void main(String[] args) throws InterruptedException {

int qps = 20;
RateLimiter jedisFixedWindowRateLimiter = new RedisFixedWindowRateLimiter(qps, "rate:limit:20240122210000");

// 模拟在一分钟内,不断收到请求,限流是否有效
int seconds = 60;
long okNum = 0L;
long total = 0L;
long beginTime = System.currentTimeMillis();
int num = RandomUtil.randomInt(qps, 100);
for (int second = 0; second < seconds; second++) {
for (int i = 0; i < num; i++) {
total++;
if (jedisFixedWindowRateLimiter.tryAcquire(1)) {
okNum++;
System.out.println("请求成功");
} else {
System.out.println("请求限流");
}
}
TimeUnit.SECONDS.sleep(1);
}
long endTime = System.currentTimeMillis();
long time = (endTime - beginTime) / 1000;
System.out.println(StrUtil.format("请求通过数:{},总请求数:{},实际 QPS:{}", okNum, total, okNum / time));
}

}

:::

Redis + Lua 实现的令牌桶限流算法

:::details Redis + Lua 实现的令牌桶限流算法

限流脚本 token_bucket_rate_limit.lua 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
local tokenKey = KEYS[1]
local timeKey = KEYS[2]

-- 申请令牌数
local permits = tonumber(ARGV[1])
-- QPS
local qps = tonumber(ARGV[2])
-- 桶的容量
local capacity = tonumber(ARGV[3])
-- 当前时间(单位:毫秒)
local nowMillis = tonumber(ARGV[4])
-- 填满令牌桶所需要的时间
local fillTime = capacity / qps
local ttl = math.min(capacity, math.floor(fillTime * 2))

local currentTokenNum = tonumber(redis.call("GET", tokenKey))
if currentTokenNum == nil then
currentTokenNum = capacity
end

local endTimeMillis = tonumber(redis.call("GET", timeKey))
if endTimeMillis == nil then
endTimeMillis = 0
end

local gap = nowMillis - endTimeMillis
local newTokenNum = math.max(0, gap * qps / 1000)
local currentTokenNum = math.min(capacity, currentTokenNum + newTokenNum)

if currentTokenNum < permits then
-- 请求拒绝
return -1
else
-- 请求通过
local finalTokenNum = currentTokenNum - permits
redis.call("SETEX", tokenKey, ttl, finalTokenNum)
redis.call("SETEX", timeKey, ttl, nowMillis)
return finalTokenNum
end

调用 lua 的实际限流代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package io.github.dunwu.distributed.ratelimit;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.resource.ResourceUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* 基于 Redis + Lua 实现的令牌桶限流算法
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2024-01-23
*/
public class RedisTokenBucketRateLimiter implements RateLimiter {

private static final String REDIS_HOST = "localhost";

private static final int REDIS_PORT = 6379;

private static final Jedis JEDIS;

public static final String SCRIPT;

static {
// Jedis 有多种构造方法,这里选用最简单的一种情况
JEDIS = new Jedis(REDIS_HOST, REDIS_PORT);

// 触发 ping 命令
try {
JEDIS.ping();
System.out.println("jedis 连接成功");
} catch (JedisConnectionException e) {
e.printStackTrace();
}

SCRIPT = FileUtil.readString(ResourceUtil.getResource("scripts/token_bucket_rate_limit.lua"),
StandardCharsets.UTF_8);
}

private final long qps;
private final long capacity;
private final String tokenKey;
private final String timeKey;

public RedisTokenBucketRateLimiter(long qps, long capacity, String tokenKey, String timeKey) {
this.qps = qps;
this.capacity = capacity;
this.tokenKey = tokenKey;
this.timeKey = timeKey;
}

@Override
public boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
List<String> keys = CollectionUtil.newLinkedList(tokenKey, timeKey);
List<String> args = CollectionUtil.newLinkedList(String.valueOf(permits), String.valueOf(qps),
String.valueOf(capacity), String.valueOf(now));
Object eval = JEDIS.eval(SCRIPT, keys, args);
long value = (long) eval;
return value != -1;
}

public static void main(String[] args) throws InterruptedException {

int qps = 20;
int bucket = 100;
RedisTokenBucketRateLimiter redisTokenBucketRateLimiter =
new RedisTokenBucketRateLimiter(qps, bucket, "token:rate:limit", "token:rate:limit:time");

// 先将令牌桶预热令牌申请完,后续才能真实反映限流 QPS
redisTokenBucketRateLimiter.tryAcquire(bucket);
TimeUnit.SECONDS.sleep(1);

// 模拟在一分钟内,不断收到请求,限流是否有效
int seconds = 60;
long okNum = 0L;
long total = 0L;
long beginTime = System.currentTimeMillis();
for (int second = 0; second < seconds; second++) {
int num = RandomUtil.randomInt(qps, 100);
for (int i = 0; i < num; i++) {
total++;
if (redisTokenBucketRateLimiter.tryAcquire(1)) {
okNum++;
System.out.println("请求成功");
} else {
System.out.println("请求限流");
}
}
TimeUnit.SECONDS.sleep(1);
}
long endTime = System.currentTimeMillis();
long time = (endTime - beginTime) / 1000;
System.out.println(StrUtil.format("请求通过数:{},总请求数:{},实际 QPS:{}", okNum, total, okNum / time));
}

}

:::

参考资料

Java 容器之 Map

Map 简介

Map 架构

Map 家族主要成员功能如下:

  • Map 是 Map 容器家族的祖先,Map 是一个用于保存键值对(key-value)的接口。Map 中不能包含重复的键;每个键最多只能映射到一个值。
  • AbstractMap 继承了 Map 的抽象类,它实现了 Map 中的核心 API。其它 Map 的实现类可以通过继承 AbstractMap 来减少重复编码。
  • SortedMap 继承了 Map 的接口。SortedMap 中的内容是排序的键值对,排序的方法是通过实现比较器(Comparator)完成的。
  • NavigableMap 继承了 SortedMap 的接口。相比于 SortedMapNavigableMap 有一系列的“导航”方法;如”获取大于/等于某对象的键值对”、“获取小于/等于某对象的键值对”等等。
  • HashMap 继承了 AbstractMap,但没实现 NavigableMap 接口。HashMap 的主要作用是储存无序的键值对,而 Hash 也体现了它的查找效率很高。HashMap 是使用最广泛的 Map
  • Hashtable 虽然没有继承 AbstractMap,但它继承了 DictionaryDictionary 也是键值对的接口),而且也实现 Map 接口。因此,Hashtable 的主要作用是储存无序的键值对。和 HashMap 相比,Hashtable 在它的主要方法中使用 synchronized 关键字修饰,来保证线程安全。但是,由于它的锁粒度太大,非常影响读写速度,所以,现代 Java 程序几乎不会使用 Hashtable ,如果需要保证线程安全,一般会用 ConcurrentHashMap 来替代。
  • TreeMap 继承了 AbstractMap,且实现了 NavigableMap 接口。TreeMap 的主要作用是储存有序的键值对,排序依据根据元素类型的 Comparator 而定。
  • WeakHashMap 继承了 AbstractMapWeakHashMap 的键是弱引用 (即 WeakReference),它的主要作用是当 GC 内存不足时,会自动将 WeakHashMap 中的 key 回收,这避免了 WeakHashMap 的内存空间无限膨胀。很明显,WeakHashMap 适用于作为缓存。

Map 接口

Map 的定义如下:

1
public interface Map<K,V> { }

Map 是一个用于保存键值对(key-value)的接口。Map 中不能包含重复的键;每个键最多只能映射到一个值。

Map 接口提供三种 Collection 视图,允许以键集值集键-值映射关系集的形式访问数据。

Map 有些实现类,可以有序的保存元素,如 TreeMap;另一些实现类则不保证顺序,如 HashMap 类。

Map 的实现类应该提供 2 个“标准的”构造方法:

  • void(无参数)构造方法,用于创建空 Map;
  • 带有单个 Map 类型参数的构造方法,用于创建一个与其参数具有相同键-值映射关系的新 Map。

实际上,后一个构造方法允许用户复制任意 Map,生成所需类的一个等价 Map。尽管无法强制执行此建议(因为接口不能包含构造方法),但是 JDK 中所有通用的 Map 实现都遵从它。

Map.Entry 接口

Map.Entry 一般用于通过迭代器(Iterator)访问问 Map

Map.Entry 是 Map 中内部的一个接口,Map.Entry 代表了 键值对 实体,Map 通过 entrySet() 获取 Map.Entry 集合,从而通过该集合实现对键值对的操作。

AbstractMap 抽象类

AbstractMap 的定义如下:

1
public abstract class AbstractMap<K,V> implements Map<K,V> {}

AbstractMap 抽象类提供了 Map 接口的核心实现,以最大限度地减少实现 Map 接口所需的工作。

要实现不可修改的 Map,编程人员只需扩展此类并提供 entrySet() 方法的实现即可,该方法将返回 Map 的映射关系 Set 视图。通常,返回的 set 将依次在 AbstractSet 上实现。此 Set 不支持 add()remove() 方法,其迭代器也不支持 remove() 方法。

要实现可修改的 Map,编程人员必须另外重写此类的 put 方法(否则将抛出 UnsupportedOperationException),entrySet().iterator() 返回的迭代器也必须另外实现其 remove() 方法。

SortedMap 接口

SortedMap 的定义如下:

1
public interface SortedMap<K,V> extends Map<K,V> { }

SortedMap 继承了 Map ,它是一个有序的 Map

SortedMap 的排序方式有两种:自然排序或者用户指定比较器插入有序 SortedMap 的所有元素都必须实现 Comparable 接口(或者被指定的比较器所接受)

另外,所有 SortedMap 实现类都应该提供 4 个“标准”构造方法:

  1. void(无参数)构造方法,它创建一个空的有序 Map,按照键的自然顺序进行排序。
  2. 带有一个 Comparator 类型参数的构造方法,它创建一个空的有序 Map,根据指定的比较器进行排序。
  3. 带有一个 Map 类型参数的构造方法,它创建一个新的有序 Map,其键-值映射关系与参数相同,按照键的自然顺序进行排序。
  4. 带有一个 SortedMap 类型参数的构造方法,它创建一个新的有序 Map,其键-值映射关系和排序方法与输入的有序 Map 相同。无法保证强制实施此建议,因为接口不能包含构造方法。

NavigableMap 的定义如下:

1
public interface NavigableMap<K,V> extends SortedMap<K,V> { }

NavigableMap 继承了 SortedMap ,它提供了丰富的查找方法。

NavigableMap 分别提供了获取“键”、“键-值对”、“键集”、“键-值对集”的相关方法。

NavigableMap 提供的功能可以分为 4 类:

  • 获取键-值对
    • lowerEntryfloorEntryceilingEntryhigherEntry 方法,它们分别返回与小于、小于等于、大于等于、大于给定键的键关联的 Map.Entry 对象。
    • firstEntrypollFirstEntrylastEntrypollLastEntry 方法,它们返回和/或移除最小和最大的映射关系(如果存在),否则返回 null。
  • 获取键。这个和第 1 类比较类似。
    • lowerKeyfloorKeyceilingKeyhigherKey 方法,它们分别返回与小于、小于等于、大于等于、大于给定键的键。
  • 获取键的集合
    • navigableKeySetdescendingKeySet 分别获取正序/反序的键集。
  • 获取键-值对的子集

Dictionary 抽象类

Dictionary 的定义如下:

1
public abstract class Dictionary<K,V> {}

Dictionary 是 JDK 1.0 定义的操作键值对的抽象类,它包括了操作键值对的基本方法。

HashMap 类

HashMap 的命名,也可以看出:**HashMap 以散列方式存储键值对**。HashMap 是非线程安全的。

HashMap 允许使用空值和空键,但 null 作为键只能有一个,null 作为值可以有多个。

HashMap 类大致等同于 Hashtable,除了它是不同步的并且允许为空值。)这个类不保序;特别是,它的元素顺序可能会随着时间的推移变化。

JDK1.8 之前 HashMap 由 数组+链表 组成的,数组是 HashMap 的主体,链表则是主要为了解决哈希冲突而存在的(“拉链法”解决冲突)。 JDK1.8 以后的 HashMap 在解决哈希冲突时有了较大的变化,当链表长度大于等于阈值(默认为 8)(将链表转换成红黑树前会判断,如果当前数组的长度小于 64,那么会选择先进行数组扩容,而不是转换为红黑树)时,将链表转化为红黑树,以减少搜索时间。

HashMap 默认的初始化大小为 16。之后每次扩充,容量变为原来的 2 倍。并且, HashMap 总是使用 2 的幂作为哈希表的大小。

JDK8 之前 HashMap 数据结构

之前 HashMap 底层是 数组和链表 结合在一起使用也就是 链表散列

HashMap 通过 key 的 hashCode 经过扰动函数处理过后得到 hash 值,然后通过 (n - 1) & hash 判断当前元素存放的位置(这里的 n 指的是数组的长度),如果当前位置存在元素的话,就判断该元素与要存入的元素的 hash 值以及 key 是否相同,如果相同的话,直接覆盖,不相同就通过拉链法解决冲突。

所谓扰动函数指的就是 HashMap 的 hash 方法。使用 hash 方法也就是扰动函数是为了防止一些实现比较差的 hashCode() 方法 换句话说使用扰动函数之后可以减少碰撞。

JDK7 的 HashMap 的 hash 方法:

1
2
3
4
static int hash(int h) {
h ^= (h >>> 20) ^ (h >>> 12);
return h ^ (h >>> 7) ^ (h >>> 4);
}

JDK8 之后 HashMap 数据结构

HashMap 的主要字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class HashMap<K,V> extends AbstractMap<K,V>
implements Map<K,V>, Cloneable, Serializable {

// 该表在初次使用时初始化,并根据需要调整大小。分配时,长度总是2的幂。
transient Node<K,V>[] table;
// 保存缓存的 entrySet()。请注意,AbstractMap 字段用于 keySet() 和 values()。
transient Set<Map.Entry<K,V>> entrySet;
// map 中的键值对数
transient int size;
// 这个HashMap被结构修改的次数结构修改是那些改变HashMap中的映射数量或者修改其内部结构(例如,重新散列)的修改。
transient int modCount;
// 下一个调整大小的值(容量*负载因子)。
int threshold;
// 散列表的负载因子
final float loadFactor;
}

HashMap 有两个影响其性能的参数:初始容量和负载因子

  • size - 初始容量。默认为 16,每次容量不够自动扩容。容量是哈希表中桶的数量,初始容量就是哈希表创建时的容量。
  • loadFactor - 负载因子。自动扩容之前被允许的最大饱和量,默认 0.75。负载因子是散列表在其容量自动扩容之前被允许的最大饱和量。当哈希表中的 entry 数量超过负载因子和当前容量的乘积时,散列表就会被重新映射(即重建内部数据结构),一般散列表大约是存储桶数量的两倍。

通常,默认负载因子(0.75)在时间和空间成本之间提供了良好的平衡。较高的值会减少空间开销,但会增加查找成本(反映在大部分 HashMap 类的操作中,包括 getput)。在设置初始容量时,应考虑映射中的条目数量及其负载因子,以尽量减少重新运行操作的次数。如果初始容量大于最大入口数除以负载因子,则不会发生重新刷新操作。

如果许多映射要存储在 HashMap 实例中,使用足够大的容量创建映射将允许映射存储的效率高于根据需要执行自动重新散列以增长表。请注意,使用多个具有相同 hashCode() 的密钥是降低任何散列表性能的一个可靠方法。为了改善影响,当键是 Comparable 时,该类可以使用键之间的比较顺序来帮助断开关系。

JDK8 的 HashMap 的 hash 方法:

1
2
3
4
5
6
7
static final int hash(Object key) {
int h;
// key.hashCode():返回散列值也就是hashcode
// ^:按位异或
// >>>:无符号右移,忽略符号位,空位都以0补齐
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

getput 的过程中,计算下标时,先对 hashCode 进行 hash 操作,然后再通过 hash 值进一步计算下标,如下图所示:

在对 hashCode() 计算 hash 时具体实现是这样的:

1
2
3
4
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

可以看到这个方法大概的作用就是:高 16bit 不变,低 16bit 和高 16bit 做了一个异或。

在设计 hash 方法时,因为目前的 table 长度 n 为 2 的幂,而计算下标的时候,是这样实现的(使用 & 位操作,而非 % 求余):

1
(n - 1) & hash

设计者认为这方法很容易发生碰撞。为什么这么说呢?不妨思考一下,在 n - 1 为 15(0x1111) 时,其实散列真正生效的只是低 4bit 的有效位,当然容易碰撞了。

因此,设计者想了一个顾全大局的方法(综合考虑了速度、作用、质量),就是把高 16bit 和低 16bit 异或了一下。设计者还解释到因为现在大多数的 hashCode 的分布已经很不错了,就算是发生了碰撞也用 O(logn)的 tree 去做了。仅仅异或一下,既减少了系统的开销,也不会造成的因为高位没有参与下标的计算(table 长度比较小时),从而引起的碰撞。

如果还是产生了频繁的碰撞,会发生什么问题呢?作者注释说,他们使用树来处理频繁的碰撞(we use trees to handle large sets of collisions in bins),在 JEP-180 中,描述了这个问题:

Improve the performance of java.util.HashMap under high hash-collision conditions by using balanced trees rather than linked lists to store map entries. Implement the same improvement in the LinkedHashMap class.

之前已经提过,在获取 HashMap 的元素时,基本分两步:

  1. 首先根据 hashCode() 做 hash,然后确定 bucket 的 index;

  2. 如果 bucket 的节点的 key 不是我们需要的,则通过 keys.equals()在链中找。

在 JDK8 之前的实现中是用链表解决冲突的,在产生碰撞的情况下,进行 get 时,两步的时间复杂度是 O(1)+O(n)。因此,当碰撞很厉害的时候 n 很大,O(n)的速度显然是影响速度的。

因此在 JDK8 中,利用红黑树替换链表,这样复杂度就变成了 O(1)+O(logn)了,这样在 n 很大的时候,能够比较理想的解决这个问题,在 JDK8:HashMap 的性能提升一文中有性能测试的结果。

HashMap 构造方法

1
2
3
4
public HashMap(); // 默认负载因子0.75
public HashMap(int initialCapacity); // 默认负载因子0.75;以 initialCapacity 初始化容量
public HashMap(int initialCapacity, float loadFactor); // 以 initialCapacity 初始化容量;以 loadFactor 初始化负载因子
public HashMap(Map<? extends K, ? extends V> m) // 默认负载因子0.75

put 方法的实现

put 方法大致的思路为:

  • 对 key 的 hashCode() 做 hash 计算,然后根据 hash 值再计算 Node 的存储位置;
  • 如果没有哈希碰撞,直接放到桶里;如果有哈希碰撞,以链表的形式存在桶后。
  • 如果哈希碰撞导致链表过长(大于等于 TREEIFY_THRESHOLD,数值为 8),就把链表转换成红黑树;
  • 如果节点已经存在就替换旧值
  • 桶数量超过容量*负载因子(即 load factor * current capacity),HashMap 调用 resize 自动扩容一倍

具体代码的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public V put(K key, V value) {
return putVal(hash(key), key, value, false, true);
}

// hashcode 无符号位移 16 位
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
// tab 为空则创建
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
// 计算 index,并对 null 做处理
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
// 节点存在
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
// 该链为树
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
// 该链为链表
else {
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
// 写入
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}

为什么计算 hash 使用 hashcode 无符号位移 16 位。

假设要添加两个对象 a 和 b,如果数组长度是 16,这时对象 a 和 b 通过公式 (n - 1) & hash 运算,也就是 (16-1)&a.hashCode 和 (16-1)&b.hashCode,15 的二进制为 0000000000000000000000000001111,假设对象 A 的 hashCode 为 1000010001110001000001111000000,对象 B 的 hashCode 为 0111011100111000101000010100000,你会发现上述与运算结果都是 0。这样的哈希结果就太让人失望了,很明显不是一个好的哈希算法。

但如果我们将 hashCode 值右移 16 位(h >>> 16 代表无符号右移 16 位),也就是取 int 类型的一半,刚好可以将该二进制数对半切开,并且使用位异或运算(如果两个数对应的位置相反,则结果为 1,反之为 0),这样的话,就能避免上面的情况发生。这就是 hash() 方法的具体实现方式。简而言之,就是尽量打乱 hashCode 真正参与运算的低 16 位。

get 方法的实现

在理解了 put 之后,get 就很简单了。大致思路如下:

  • 对 key 的 hashCode() 做 hash 计算,然后根据 hash 值再计算桶的 index

  • 如果桶中的第一个节点命中,直接返回;

  • 如果有冲突,则通过 key.equals(k) 去查找对应的 entry

    • 若为树,则在红黑树中通过 key.equals(k) 查找,O(logn);

    • 若为链表,则在链表中通过 key.equals(k) 查找,O(n)。

具体代码的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public V get(Object key) {
Node<K,V> e;
return (e = getNode(hash(key), key)) == null ? null : e.value;
}

final Node<K,V> getNode(int hash, Object key) {
Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & hash]) != null) {
// 直接命中
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k))))
return first;
// 未命中
if ((e = first.next) != null) {
// 在树中 get
if (first instanceof TreeNode)
return ((TreeNode<K,V>)first).getTreeNode(hash, key);
// 在链表中 get
do {
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}
return null;
}

resize 的实现

put 时,如果发现目前的 bucket 占用程度已经超过了 Load Factor 所希望的比例,那么就会发生 resize。在 resize 的过程,简单的说就是把 bucket 扩充为 2 倍,之后重新计算 index,把节点再放到新的 bucket 中。

当超过限制的时候会 resize,然而又因为我们使用的是 2 次幂的扩展(指长度扩为原来 2 倍),所以,元素的位置要么是在原位置,要么是在原位置再移动 2 次幂的位置。

怎么理解呢?例如我们从 16 扩展为 32 时,具体的变化如下所示:

因此元素在重新计算 hash 之后,因为 n 变为 2 倍,那么 n-1 的 mask 范围在高位多 1bit(红色),因此新的 index 就会发生这样的变化:

因此,我们在扩充 HashMap 的时候,不需要重新计算 hash,只需要看看原来的 hash 值新增的那个 bit 是 1 还是 0 就好了,是 0 的话索引没变,是 1 的话索引变成“原索引+oldCap”。可以看看下图为 16 扩充为 32 的 resize 示意图:

这个设计确实非常的巧妙,既省去了重新计算 hash 值的时间,而且同时,由于新增的 1bit 是 0 还是 1 可以认为是随机的,因此 resize 的过程,均匀的把之前的冲突的节点分散到新的 bucket 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
final Node<K,V>[] resize() {
Node<K,V>[] oldTab = table;
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
int newCap, newThr = 0;
if (oldCap > 0) {
// 超过最大值就不再扩充了,就只好随你碰撞去吧
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
// 没超过最大值,就扩充为原来的 2 倍
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; // double threshold
}
else if (oldThr > 0) // initial capacity was placed in threshold
newCap = oldThr;
else { // zero initial threshold signifies using defaults
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}

// 计算新的 resize 上限
if (newThr == 0) {
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
if (oldTab != null) {
// 把每个 bucket 都移动到新的 buckets 中
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
if (e.next == null)
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode)
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else { // preserve order
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
// 原索引
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
// 原索引+oldCap
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
// 原索引放到bucket里
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
// 原索引+oldCap放到bucket里
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}

LinkedHashMap 类

LinkedHashMap 要点

LinkedHashMap 通过维护一个保存所有条目(Entry)的双向链表,保证了元素迭代的顺序(即插入顺序)

关注点 结论
是否允许键值对为 null Key 和 Value 都允许 null
是否允许重复数据 Key 重复会覆盖、Value 允许重复
是否有序 按照元素插入顺序存储
是否线程安全 非线程安全

LinkedHashMap 数据结构

LinkedHashMap 通过维护一对 LinkedHashMap.Entry<K,V> 类型的头尾指针,以双链表形式,保存所有数据

学习过数据结构的双链表,就能理解其元素存储以及访问必然是有序的。

1
2
3
4
5
6
7
8
9
10
11
public class LinkedHashMap<K,V>
extends HashMap<K,V>
implements Map<K,V> {

// 双链表的头指针
transient LinkedHashMap.Entry<K,V> head;
// 双链表的尾指针
transient LinkedHashMap.Entry<K,V> tail;
// 迭代排序方法:true 表示访问顺序;false 表示插入顺序
final boolean accessOrder;
}

LinkedHashMap 继承了 HashMapput 方法,本身没有实现 put 方法。

TreeMap 类

TreeMap 要点

TreeMap 基于红黑树实现。

TreeMap 是有序的。它的排序规则是:根据 map 中的 key 的自然语义顺序或提供的比较器(Comparator)的自定义比较顺序。

TreeMap 不是线程安全的。

TreeMap 原理

put 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public V put(K key, V value) {
Entry<K,V> t = root;
// 如果根节点为 null,插入第一个节点
if (t == null) {
compare(key, key); // type (and possibly null) check

root = new Entry<>(key, value, null);
size = 1;
modCount++;
return null;
}
int cmp;
Entry<K,V> parent;
// split comparator and comparable paths
Comparator<? super K> cpr = comparator;
// 每个节点的左孩子节点的值小于它;右孩子节点的值大于它
// 如果有比较器,使用比较器进行比较
if (cpr != null) {
do {
parent = t;
cmp = cpr.compare(key, t.key);
if (cmp < 0)
t = t.left;
else if (cmp > 0)
t = t.right;
else
return t.setValue(value);
} while (t != null);
}
// 没有比较器,使用 key 的自然顺序进行比较
else {
if (key == null)
throw new NullPointerException();
@SuppressWarnings("unchecked")
Comparable<? super K> k = (Comparable<? super K>) key;
do {
parent = t;
cmp = k.compareTo(t.key);
if (cmp < 0)
t = t.left;
else if (cmp > 0)
t = t.right;
else
return t.setValue(value);
} while (t != null);
}
// 通过上面的遍历未找到 key 值,则新插入节点
Entry<K,V> e = new Entry<>(key, value, parent);
if (cmp < 0)
parent.left = e;
else
parent.right = e;
// 插入后,为了维持红黑树的平衡需要调整
fixAfterInsertion(e);
size++;
modCount++;
return null;
}

get 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public V get(Object key) {
Entry<K,V> p = getEntry(key);
return (p==null ? null : p.value);
}

final Entry<K,V> getEntry(Object key) {
// Offload comparator-based version for sake of performance
if (comparator != null)
return getEntryUsingComparator(key);
if (key == null)
throw new NullPointerException();
@SuppressWarnings("unchecked")
Comparable<? super K> k = (Comparable<? super K>) key;
Entry<K,V> p = root;
// 按照二叉树搜索的方式进行搜索,搜到返回
while (p != null) {
int cmp = k.compareTo(p.key);
if (cmp < 0)
p = p.left;
else if (cmp > 0)
p = p.right;
else
return p;
}
return null;
}

remove 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public V remove(Object key) {
Entry<K,V> p = getEntry(key);
if (p == null)
return null;

V oldValue = p.value;
deleteEntry(p);
return oldValue;
}
private void deleteEntry(Entry<K,V> p) {
modCount++;
size--;

// 如果当前节点有左右孩子节点,使用后继节点替换要删除的节点
// If strictly internal, copy successor's element to p and then make p
// point to successor.
if (p.left != null && p.right != null) {
Entry<K,V> s = successor(p);
p.key = s.key;
p.value = s.value;
p = s;
} // p has 2 children

// Start fixup at replacement node, if it exists.
Entry<K,V> replacement = (p.left != null ? p.left : p.right);

if (replacement != null) { // 要删除的节点有一个孩子节点
// Link replacement to parent
replacement.parent = p.parent;
if (p.parent == null)
root = replacement;
else if (p == p.parent.left)
p.parent.left = replacement;
else
p.parent.right = replacement;

// Null out links so they are OK to use by fixAfterDeletion.
p.left = p.right = p.parent = null;

// Fix replacement
if (p.color == BLACK)
fixAfterDeletion(replacement);
} else if (p.parent == null) { // return if we are the only node.
root = null;
} else { // No children. Use self as phantom replacement and unlink.
if (p.color == BLACK)
fixAfterDeletion(p);

if (p.parent != null) {
if (p == p.parent.left)
p.parent.left = null;
else if (p == p.parent.right)
p.parent.right = null;
p.parent = null;
}
}
}

TreeMap 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class TreeMapDemo {

private static final String[] chars = "A B C D E F G H I J K L M N O P Q R S T U V W X Y Z".split(" ");

public static void main(String[] args) {
TreeMap<Integer, String> treeMap = new TreeMap<>();
for (int i = 0; i < chars.length; i++) {
treeMap.put(i, chars[i]);
}
System.out.println(treeMap);
Integer low = treeMap.firstKey();
Integer high = treeMap.lastKey();
System.out.println(low);
System.out.println(high);
Iterator<Integer> it = treeMap.keySet().iterator();
for (int i = 0; i <= 6; i++) {
if (i == 3) { low = it.next(); }
if (i == 6) { high = it.next(); } else { it.next(); }
}
System.out.println(low);
System.out.println(high);
System.out.println(treeMap.subMap(low, high));
System.out.println(treeMap.headMap(high));
System.out.println(treeMap.tailMap(low));
}
}

WeakHashMap

WeakHashMap 的定义如下:

1
2
3
public class WeakHashMap<K,V>
extends AbstractMap<K,V>
implements Map<K,V> {}

WeakHashMap 继承了 AbstractMap,实现了 Map 接口。

和 HashMap 一样,WeakHashMap 也是一个散列表,它存储的内容也是键值对(key-value)映射,而且键和值都可以是 null。

不过 WeakHashMap 的键是弱键。在 WeakHashMap 中,当某个键不再被其它对象引用,会被从 WeakHashMap 中被自动移除。更精确地说,对于一个给定的键,其映射的存在并不阻止垃圾回收器对该键的丢弃,这就使该键成为可终止的,被终止,然后被回收。某个键被终止时,它对应的键值对也就从映射中有效地移除了。

这个弱键的原理呢?大致上就是,通过 WeakReference 和 ReferenceQueue 实现的。

WeakHashMap 的 key 是弱键,即是 WeakReference 类型的;ReferenceQueue 是一个队列,它会保存被 GC 回收的弱键。实现步骤是:

  1. 新建 WeakHashMap,将键值对添加到 WeakHashMap 中。实际上,WeakHashMap 是通过数组 table 保存 Entry(键值对);每一个 Entry 实际上是一个单向链表,即 Entry 是键值对链表。
  2. 当某弱键不再被其它对象引用,并被 GC 回收时。在 GC 回收该弱键时,这个弱键也同时会被添加到 ReferenceQueue(queue)队列中。
  3. 当下一次我们需要操作 WeakHashMap 时,会先同步 table 和 queue。table 中保存了全部的键值对,而 queue 中保存被 GC 回收的键值对;同步它们,就是删除 table 中被 GC 回收的键值对。

这就是弱键如何被自动从 WeakHashMap 中删除的步骤了。

和 HashMap 一样,WeakHashMap 是不同步的。可以使用 Collections.synchronizedMap 方法来构造同步的 WeakHashMap。

总结

Map 简介

img

HashMap

img

其他 Map

img

参考资料

Java 容器之 Set

Set 简介

Set 家族成员简介:

  • Set 继承了 Collection 的接口。实际上 Set 就是 Collection,只是行为略有不同:Set 集合不允许有重复元素。
  • SortedSet 继承了 Set 的接口。SortedSet 中的内容是排序的唯一值,排序的方法是通过比较器(Comparator)。
  • NavigableSet 继承了 SortedSet 的接口。它提供了丰富的查找方法:如”获取大于/等于某值的元素”、“获取小于/等于某值的元素”等等。
  • AbstractSet 是一个抽象类,它继承于 AbstractCollectionAbstractCollection 实现了 Set 中的绝大部分方法,为实现 Set 的实例类提供了便利。
  • HashSet 类依赖于 HashMap,它实际上是通过 HashMap 实现的。HashSet 中的元素是无序的、散列的。
  • TreeSet 类依赖于 TreeMap,它实际上是通过 TreeMap 实现的。TreeSet 中的元素是有序的,它是按自然排序或者用户指定比较器排序的 Set。
  • LinkedHashSet 是按插入顺序排序的 Set。
  • EnumSet 是只能存放 Emum 枚举类型的 Set。

Set 接口

Set 继承了 Collection 的接口。实际上,Set 就是 Collection,二者提供的方法完全相同。

Set 接口定义如下:

1
public interface Set<E> extends Collection<E> {}

SortedSet 接口

继承了 Set 的接口。SortedSet 中的内容是排序的唯一值,排序的方法是通过比较器(Comparator)。

SortedSet 接口定义如下:

1
public interface SortedSet<E> extends Set<E> {}

SortedSet 接口新扩展的方法:

  • comparator - 返回 Comparator
  • subSet - 返回指定区间的子集
  • headSet - 返回小于指定元素的子集
  • tailSet - 返回大于指定元素的子集
  • first - 返回第一个元素
  • last - 返回最后一个元素
  • spliterator

NavigableSet 继承了 SortedSet。它提供了丰富的查找方法。

NavigableSet 接口定义如下:

1
public interface NavigableSet<E> extends SortedSet<E> {}

NavigableSet 接口新扩展的方法:

  • lower - 返回小于指定值的元素中最接近的元素
  • higher - 返回大于指定值的元素中最接近的元素
  • floor - 返回小于或等于指定值的元素中最接近的元素
  • ceiling - 返回大于或等于指定值的元素中最接近的元素
  • pollFirst - 检索并移除第一个(最小的)元素
  • pollLast - 检索并移除最后一个(最大的)元素
  • descendingSet - 返回反序排列的 Set
  • descendingIterator - 返回反序排列的 Set 的迭代器
  • subSet - 返回指定区间的子集
  • headSet - 返回小于指定元素的子集
  • tailSet - 返回大于指定元素的子集

AbstractSet 抽象类

AbstractSet 类提供 Set 接口的核心实现,以最大限度地减少实现 Set 接口所需的工作。

AbstractSet 抽象类定义如下:

1
public abstract class AbstractSet<E> extends AbstractCollection<E> implements Set<E> {}

事实上,主要的实现已经在 AbstractCollection 中完成。

HashSet 类

HashSet 类依赖于 HashMap,它实际上是通过 HashMap 实现的。HashSet 中的元素是无序的、散列的。

HashSet 类定义如下:

1
2
3
public class HashSet<E>
extends AbstractSet<E>
implements Set<E>, Cloneable, java.io.Serializable {}

HashSet 要点

  • HashSet 通过继承 AbstractSet 实现了 Set 接口中的骨干方法。
  • HashSet 实现了 Cloneable,所以支持克隆。
  • HashSet 实现了 Serializable,所以支持序列化。
  • HashSet 中存储的元素是无序的。
  • HashSet 允许 null 值的元素。
  • HashSet 不是线程安全的。

HashSet 原理

HashSet 是基于 HashMap 实现的。

1
2
3
4
5
6
// HashSet 的核心,通过维护一个 HashMap 实体来实现 HashSet 方法
private transient HashMap<E,Object> map;

// PRESENT 是用于关联 map 中当前操作元素的一个虚拟值
private static final Object PRESENT = new Object();
}
  • HashSet 中维护了一个 HashMap 对象 map,HashSet 的重要方法,如 addremoveiteratorclearsize 等都是围绕 map 实现的。
    • HashSet 类中通过定义 writeObject()readObject() 方法确定了其序列化和反序列化的机制。
  • PRESENT 是用于关联 map 中当前操作元素的一个虚拟值。

TreeSet 类

TreeSet 类依赖于 TreeMap,它实际上是通过 TreeMap 实现的。TreeSet 中的元素是有序的,它是按自然排序或者用户指定比较器排序的 Set。

TreeSet 类定义如下:

1
2
public class TreeSet<E> extends AbstractSet<E>
implements NavigableSet<E>, Cloneable, java.io.Serializable {}

TreeSet 要点

  • TreeSet 通过继承 AbstractSet 实现了 NavigableSet 接口中的骨干方法。
  • TreeSet 实现了 Cloneable,所以支持克隆。
  • TreeSet 实现了 Serializable,所以支持序列化。
  • TreeSet 中存储的元素是有序的。排序规则是自然顺序或比较器(Comparator)中提供的顺序规则。
  • TreeSet 不是线程安全的。

TreeSet 源码

TreeSet 是基于 TreeMap 实现的。

1
2
3
4
5
// TreeSet 的核心,通过维护一个 NavigableMap 实体来实现 TreeSet 方法
private transient NavigableMap<E,Object> m;

// PRESENT 是用于关联 map 中当前操作元素的一个虚拟值
private static final Object PRESENT = new Object();
  • TreeSet 中维护了一个 NavigableMap 对象 map(实际上是一个 TreeMap 实例),TreeSet 的重要方法,如 addremoveiteratorclearsize 等都是围绕 map 实现的。
  • PRESENT 是用于关联 map 中当前操作元素的一个虚拟值。TreeSet 中的元素都被当成 TreeMap 的 key 存储,而 value 都填的是 PRESENT

LinkedHashSet 类

LinkedHashSet 是按插入顺序排序的 Set。

LinkedHashSet 类定义如下:

1
2
3
public class LinkedHashSet<E>
extends HashSet<E>
implements Set<E>, Cloneable, java.io.Serializable {}

LinkedHashSet 要点

  • LinkedHashSet 通过继承 HashSet 实现了 Set 接口中的骨干方法。
  • LinkedHashSet 实现了 Cloneable,所以支持克隆。
  • LinkedHashSet 实现了 Serializable,所以支持序列化。
  • LinkedHashSet 中存储的元素是按照插入顺序保存的。
  • LinkedHashSet 不是线程安全的。

LinkedHashSet 原理

LinkedHashSet 有三个构造方法,无一例外,都是调用父类 HashSet 的构造方法。

1
2
3
4
5
6
7
8
9
public LinkedHashSet(int initialCapacity, float loadFactor) {
super(initialCapacity, loadFactor, true);
}
public LinkedHashSet(int initialCapacity) {
super(initialCapacity, .75f, true);
}
public LinkedHashSet() {
super(16, .75f, true);
}

需要强调的是:LinkedHashSet 构造方法实际上调用的是父类 HashSet 的非 public 构造方法。

1
2
3
HashSet(int initialCapacity, float loadFactor, boolean dummy) {
map = new LinkedHashMap<>(initialCapacity, loadFactor);
}

不同于 HashSet public 构造方法中初始化的 HashMap 实例,这个构造方法中,初始化了 LinkedHashMap 实例。

也就是说,实际上,LinkedHashSet 维护了一个双链表。由双链表的特性可以知道,它是按照元素的插入顺序保存的。所以,这就是 LinkedHashSet 中存储的元素是按照插入顺序保存的原理。

EnumSet 类

EnumSet 类定义如下:

1
2
public abstract class EnumSet<E extends Enum<E>> extends AbstractSet<E>
implements Cloneable, java.io.Serializable {}

EnumSet 要点

  • EnumSet 继承了 AbstractSet,所以有 Set 接口中的骨干方法。
  • EnumSet 实现了 Cloneable,所以支持克隆。
  • EnumSet 实现了 Serializable,所以支持序列化。
  • EnumSet 通过 <E extends Enum<E>> 限定了存储元素必须是枚举值。
  • EnumSet 没有构造方法,只能通过类中的 static 方法来创建 EnumSet 对象。
  • EnumSet 是有序的。以枚举值在 EnumSet 类中的定义顺序来决定集合元素的顺序。
  • EnumSet 不是线程安全的。

HashSet vs. LinkedHashSet vs. TreeSet

HashSetLinkedHashSetTreeSet 都是 Set 接口的实现类,都能保证元素唯一,并且都不是线程安全的。

HashSetLinkedHashSetTreeSet 的主要区别在于底层数据结构不同。HashSet 的底层数据结构是哈希表(基于 HashMap 实现)。LinkedHashSet 的底层数据结构是链表和哈希表,元素的插入和取出顺序满足 FIFO。TreeSet 底层数据结构是红黑树,元素是有序的,排序的方式有自然排序和定制排序。

底层数据结构不同又导致这三者的应用场景不同。HashSet 用于不需要保证元素插入和取出顺序的场景,LinkedHashSet 用于保证元素的插入和取出顺序满足 FIFO 的场景,TreeSet 用于支持对元素自定义排序规则的场景。

参考资料