Dunwu Blog

大道至简,知易行难

Arthas 快速入门

Arthas 是 Alibaba 开源的 Java 诊断工具 。

简介

Arthas可以解决的问题:

  1. 这个类从哪个 jar 包加载的?为什么会报各种类相关的 Exception?
  2. 我改的代码为什么没有执行到?难道是我没 commit?分支搞错了?
  3. 遇到问题无法在线上 debug,难道只能通过加日志再重新发布吗?
  4. 线上遇到某个用户的数据处理有问题,但线上同样无法 debug,线下无法重现!
  5. 是否有一个全局视角来查看系统的运行状况?
  6. 有什么办法可以监控到 JVM 的实时运行状态?

Arthas支持 JDK 6+,支持 Linux/Mac/Windows,采用命令行交互模式,同时提供丰富的 Tab 自动补全功能,进一步方便进行问题的定位和诊断。

安装

使用arthas-boot(推荐)

下载arthas-boot.jar,然后用java -jar的方式启动:

1
2
wget https://alibaba.github.io/arthas/arthas-boot.jar
java -jar arthas-boot.jar

打印帮助信息:

1
java -jar arthas-boot.jar -h
  • 如果下载速度比较慢,可以使用 aliyun 的镜像:

    1
    java -jar arthas-boot.jar --repo-mirror aliyun --use-http
  • 如果从 github 下载有问题,可以使用 gitee 镜像

    1
    wget https://arthas.gitee.io/arthas-boot.jar

使用as.sh

Arthas 支持在 Linux/Unix/Mac 等平台上一键安装,请复制以下内容,并粘贴到命令行中,敲 回车 执行即可:

1
curl -L https://alibaba.github.io/arthas/install.sh | sh

上述命令会下载启动脚本文件 as.sh 到当前目录,你可以放在任何地方或将其加入到 $PATH 中。

直接在 shell 下面执行./as.sh,就会进入交互界面。

也可以执行./as.sh -h来获取更多参数信息。

  • 如果从 github 下载有问题,可以使用 gitee 镜像

    1
    curl -L https://arthas.gitee.io/install.sh | sh

全量安装

最新版本,点击下载:下载地址

解压后,在文件夹里有arthas-boot.jar,直接用java -jar的方式启动:

1
java -jar arthas-boot.jar

打印帮助信息:

1
java -jar arthas-boot.jar -h

基础使用

启动 Demo

1
2
wget https://alibaba.github.io/arthas/arthas-demo.jar
java -jar arthas-demo.jar

arthas-demo是一个简单的程序,每隔一秒生成一个随机数,再执行质因式分解,并打印出分解结果。

arthas-demo源代码:查看

启动 arthas

在命令行下面执行(使用和目标进程一致的用户启动,否则可能 attach 失败):

1
2
wget https://alibaba.github.io/arthas/arthas-boot.jar
java -jar arthas-boot.jar
  • 执行该程序的用户需要和目标进程具有相同的权限。比如以admin用户来执行:sudo su admin && java -jar arthas-boot.jarsudo -u admin -EH java -jar arthas-boot.jar
  • 如果 attach 不上目标进程,可以查看~/logs/arthas/ 目录下的日志。
  • 如果下载速度比较慢,可以使用 aliyun 的镜像:java -jar arthas-boot.jar --repo-mirror aliyun --use-http
  • java -jar arthas-boot.jar -h 打印更多参数信息。

选择应用 java 进程:

1
2
3
$ $ java -jar arthas-boot.jar
* [1]: 35542
[2]: 71560 arthas-demo.jar

Demo 进程是第 2 个,则输入 2,再输入回车/enter。Arthas 会 attach 到目标进程上,并输出日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[INFO] Try to attach process 71560
[INFO] Attach process 71560 success.
[INFO] arthas-client connect 127.0.0.1 3658
,---. ,------. ,--------.,--. ,--. ,---. ,---.
/ O \ | .--. ''--. .--'| '--' | / O \ ' .-'
| .-. || '--'.' | | | .--. || .-. |`. `-.
| | | || |\ \ | | | | | || | | |.-' |
`--' `--'`--' '--' `--' `--' `--'`--' `--'`-----'

wiki: https://alibaba.github.io/arthas
version: 3.0.5.20181127201536
pid: 71560
time: 2018-11-28 19:16:24

$

查看 dashboard

输入dashboard,按回车/enter,会展示当前进程的信息,按ctrl+c可以中断执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
$ dashboard
ID NAME GROUP PRIORI STATE %CPU TIME INTERRU DAEMON
17 pool-2-thread-1 system 5 WAITIN 67 0:0 false false
27 Timer-for-arthas-dashb system 10 RUNNAB 32 0:0 false true
11 AsyncAppender-Worker-a system 9 WAITIN 0 0:0 false true
9 Attach Listener system 9 RUNNAB 0 0:0 false true
3 Finalizer system 8 WAITIN 0 0:0 false true
2 Reference Handler system 10 WAITIN 0 0:0 false true
4 Signal Dispatcher system 9 RUNNAB 0 0:0 false true
26 as-command-execute-dae system 10 TIMED_ 0 0:0 false true
13 job-timeout system 9 TIMED_ 0 0:0 false true
1 main main 5 TIMED_ 0 0:0 false false
14 nioEventLoopGroup-2-1 system 10 RUNNAB 0 0:0 false false
18 nioEventLoopGroup-2-2 system 10 RUNNAB 0 0:0 false false
23 nioEventLoopGroup-2-3 system 10 RUNNAB 0 0:0 false false
15 nioEventLoopGroup-3-1 system 10 RUNNAB 0 0:0 false false
Memory used total max usage GC
heap 32M 155M 1820M 1.77% gc.ps_scavenge.count 4
ps_eden_space 14M 65M 672M 2.21% gc.ps_scavenge.time(m 166
ps_survivor_space 4M 5M 5M s)
ps_old_gen 12M 85M 1365M 0.91% gc.ps_marksweep.count 0
nonheap 20M 23M -1 gc.ps_marksweep.time( 0
code_cache 3M 5M 240M 1.32% ms)
Runtime
os.name Mac OS X
os.version 10.13.4
java.version 1.8.0_162
java.home /Library/Java/JavaVir
tualMachines/jdk1.8.0
_162.jdk/Contents/Hom
e/jre

通过 thread 命令来获取到arthas-demo进程的 Main Class

thread 1会打印线程 ID 1 的栈,通常是 main 函数的线程。

1
2
$ thread 1 | grep 'main('
at demo.MathGame.main(MathGame.java:17)

通过 jad 来反编译 Main Class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
$ jad demo.MathGame

ClassLoader:
+-sun.misc.Launcher$AppClassLoader@3d4eac69
+-sun.misc.Launcher$ExtClassLoader@66350f69

Location:
/tmp/arthas-demo.jar

/*
* Decompiled with CFR 0_132.
*/
package demo;

import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class MathGame {
private static Random random = new Random();
private int illegalArgumentCount = 0;

public static void main(String[] args) throws InterruptedException {
MathGame game = new MathGame();
do {
game.run();
TimeUnit.SECONDS.sleep(1L);
} while (true);
}

public void run() throws InterruptedException {
try {
int number = random.nextInt();
List<Integer> primeFactors = this.primeFactors(number);
MathGame.print(number, primeFactors);
}
catch (Exception e) {
System.out.println(String.format("illegalArgumentCount:%3d, ", this.illegalArgumentCount) + e.getMessage());
}
}

public static void print(int number, List<Integer> primeFactors) {
StringBuffer sb = new StringBuffer("" + number + "=");
Iterator<Integer> iterator = primeFactors.iterator();
while (iterator.hasNext()) {
int factor = iterator.next();
sb.append(factor).append('*');
}
if (sb.charAt(sb.length() - 1) == '*') {
sb.deleteCharAt(sb.length() - 1);
}
System.out.println(sb);
}

public List<Integer> primeFactors(int number) {
if (number < 2) {
++this.illegalArgumentCount;
throw new IllegalArgumentException("number is: " + number + ", need >= 2");
}
ArrayList<Integer> result = new ArrayList<Integer>();
int i = 2;
while (i <= number) {
if (number % i == 0) {
result.add(i);
number /= i;
i = 2;
continue;
}
++i;
}
return result;
}
}

Affect(row-cnt:1) cost in 970 ms.

watch

通过watch命令来查看demo.MathGame#primeFactors函数的返回值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
$ watch demo.MathGame primeFactors returnObj
Press Ctrl+C to abort.
Affect(class-cnt:1 , method-cnt:1) cost in 107 ms.
ts=2018-11-28 19:22:30; [cost=1.715367ms] result=null
ts=2018-11-28 19:22:31; [cost=0.185203ms] result=null
ts=2018-11-28 19:22:32; [cost=19.012416ms] result=@ArrayList[
@Integer[5],
@Integer[47],
@Integer[2675531],
]
ts=2018-11-28 19:22:33; [cost=0.311395ms] result=@ArrayList[
@Integer[2],
@Integer[5],
@Integer[317],
@Integer[503],
@Integer[887],
]
ts=2018-11-28 19:22:34; [cost=10.136007ms] result=@ArrayList[
@Integer[2],
@Integer[2],
@Integer[3],
@Integer[3],
@Integer[31],
@Integer[717593],
]
ts=2018-11-28 19:22:35; [cost=29.969732ms] result=@ArrayList[
@Integer[5],
@Integer[29],
@Integer[7651739],
]

更多的功能可以查看进阶使用

退出 arthas

如果只是退出当前的连接,可以用quit或者exit命令。Attach 到目标进程上的 arthas 还会继续运行,端口会保持开放,下次连接时可以直接连接上。

如果想完全退出 arthas,可以执行shutdown命令。

进阶使用

基础命令

  • help——查看命令帮助信息
  • cat——打印文件内容,和 linux 里的 cat 命令类似
  • pwd——返回当前的工作目录,和 linux 命令类似
  • cls——清空当前屏幕区域
  • session——查看当前会话的信息
  • reset——重置增强类,将被 Arthas 增强过的类全部还原,Arthas 服务端关闭时会重置所有增强过的类
  • version——输出当前目标 Java 进程所加载的 Arthas 版本号
  • history——打印命令历史
  • quit——退出当前 Arthas 客户端,其他 Arthas 客户端不受影响
  • shutdown——关闭 Arthas 服务端,所有 Arthas 客户端全部退出
  • keymap——Arthas 快捷键列表及自定义快捷键

jvm 相关

  • dashboard——当前系统的实时数据面板
  • thread——查看当前 JVM 的线程堆栈信息
  • jvm——查看当前 JVM 的信息
  • sysprop——查看和修改 JVM 的系统属性
  • sysenv——查看 JVM 的环境变量
  • vmoption——查看和修改 JVM 里诊断相关的 option
  • logger——查看和修改 logger
  • getstatic——查看类的静态属性
  • ognl——执行 ognl 表达式
  • mbean——查看 Mbean 的信息
  • heapdump——dump java heap, 类似 jmap 命令的 heap dump 功能

class/classloader 相关

  • sc——查看 JVM 已加载的类信息
  • sm——查看已加载类的方法信息
  • jad——反编译指定已加载类的源码
  • mc——内存编绎器,内存编绎.java文件为.class文件
  • redefine——加载外部的.class文件,redefine 到 JVM 里
  • dump——dump 已加载类的 byte code 到特定目录
  • classloader——查看 classloader 的继承树,urls,类加载信息,使用 classloader 去 getResource

monitor/watch/trace 相关

请注意,这些命令,都通过字节码增强技术来实现的,会在指定类的方法中插入一些切面来实现数据统计和观测,因此在线上、预发使用时,请尽量明确需要观测的类、方法以及条件,诊断结束要执行 shutdown 或将增强过的类执行 reset 命令。

  • monitor——方法执行监控
  • watch——方法执行数据观测
  • trace——方法内部调用路径,并输出方法路径上的每个节点上耗时
  • stack——输出当前方法被调用的调用路径
  • tt——方法执行数据的时空隧道,记录下指定方法每次调用的入参和返回信息,并能对这些不同的时间下调用进行观测

options

  • options——查看或设置 Arthas 全局开关

管道

Arthas 支持使用管道对上述命令的结果进行进一步的处理,如sm java.lang.String * | grep 'index'

  • grep——搜索满足条件的  结果
  • plaintext——将  命令的结果去除 ANSI 颜色
  • wc——按行统计输出结果

后台异步任务

当线上出现偶发的问题,比如需要 watch 某个条件,而这个条件一天可能才会出现一次时,异步后台任务就派上用场了,详情请参考这里

  • 使用 > 将结果重写向到日志文件,使用 & 指定命令是后台运行,session 断开不影响任务执行(生命周期默认为 1 天)
  • jobs——列出所有 job
  • kill——强制终止任务
  • fg——将暂停的任务拉到前台执行
  • bg——将暂停的任务放到后台执行

Web Console

通过 websocket 连接 Arthas。

用户数据回报

3.1.4版本后,增加了用户数据回报功能,方便统一做安全或者历史数据统计。

在启动时,指定stat-url,就会回报执行的每一行命令,比如: ./as.sh --stat-url 'http://192.168.10.11:8080/api/stat'

在 tunnel server 里有一个示例的回报代码,用户可以自己在服务器上实现。

StatController.java

其他特性

参考资料

Maven 快速入门

Maven 简介

Maven 是什么

Maven 是一个项目管理工具。它负责管理项目开发过程中的几乎所有的东西。

  • 版本 - maven 有自己的版本定义和规则。
  • 构建 - maven 支持许多种的应用程序类型,对于每一种支持的应用程序类型都定义好了一组构建规则和工具集。
  • 输出物管理 - maven 可以管理项目构建的产物,并将其加入到用户库中。这个功能可以用于项目组和其他部门之间的交付行为。
  • 依赖关系 - maven 对依赖关系的特性进行细致的分析和划分,避免开发过程中的依赖混乱和相互污染行为
  • 文档和构建结果 - maven 的 site 命令支持各种文档信息的发布,包括构建过程的各种输出,javadoc,产品文档等。
  • 项目关系 - 一个大型的项目通常有几个小项目或者模块组成,用 maven 可以很方便地管理。
  • 移植性管理 - maven 可以针对不同的开发场景,输出不同种类的输出结果。

Maven 的生命周期

maven 把项目的构建划分为不同的生命周期(lifecycle)。粗略一点的话,它这个过程(phase)包括:编译、测试、打包、集成测试、验证、部署。maven 中所有的执行动作(goal)都需要指明自己在这个过程中的执行位置,然后 maven 执行的时候,就依照过程的发展依次调用这些 goal 进行各种处理。

这个也是 maven 的一个基本调度机制。一般来说,位置稍后的过程都会依赖于之前的过程。当然,maven 同样提供了配置文件,可以依照用户要求,跳过某些阶段。

Maven 的标准工程结构

Maven 的标准工程结构如下:

1
2
3
4
5
6
7
8
9
|-- pom.xml(maven的核心配置文件)
|-- src
|-- main
|-- java(java源代码目录)
|-- resources(资源文件目录)
|-- test
|-- java(单元测试代码目录)
|-- target(输出目录,所有的输出物都存放在这个目录下)
|-- classes(编译后的class文件存放处)

Maven 的”约定优于配置”

所谓的”约定优于配置”,在 maven 中并不是完全不可以修改的,他们只是一些配置的默认值而已。但是除非必要,并不需要去修改那些约定内容。maven 默认的文件存放结构如下:

每一个阶段的任务都知道怎么正确完成自己的工作,比如 compile 任务就知道从 src/main/java 下编译所有的 java 文件,并把它的输出 class 文件存放到 target/classes 中。

对 maven 来说,采用”约定优于配置”的策略可以减少修改配置的工作量,也可以降低学习成本,更重要的是,给项目引入了统一的规范。

Maven 的版本规范

maven 使用如下几个要素来唯一定位某一个输出物:

  • groupId - 团体、组织的标识符。团体标识的约定是,它以创建这个项目的组织名称的逆向域名(reverse domain name)开头。一般对应着 JAVA 的包的结构。例如 org.apache
  • artifactId - 单独项目的唯一标识符。比如我们的 tomcat, commons 等。不要在 artifactId 中包含点号(.)。
  • version - 一个项目的特定版本。
  • packaging - 项目的类型,默认是 jar,描述了项目打包后的输出。类型为 jar 的项目产生一个 JAR 文件,类型为 war 的项目产生一个 web 应用。

例如:想在 maven 工程中引入 4.12 版本的 junit 包,添加如下依赖即可。

1
2
3
4
5
6
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>compile</scope>
</dependency>

maven 有自己的版本规范,一般是如下定义 <major version><minor version><incremental version>-<qualifier> ,比如 1.2.3-beta-01。要说明的是,maven 自己判断版本的算法是 major,minor,incremental 部分用数字比 较,qualifier 部分用字符串比较,所以要小心 alpha-2 和 alpha-15 的比较关系,最好用 alpha-02 的格式。

maven 在版本管理时候可以使用几个特殊的字符串 SNAPSHOT,LATEST,RELEASE。比如”1.0-SNAPSHOT”。各个部分的含义和处理逻辑如下说明:

  • SNAPSHOT - 这个版本一般用于开发过程中,表示不稳定的版本。
  • LATEST - 指某个特定构件的最新发布,这个发布可能是一个发布版,也可能是一个 snapshot 版,具体看哪个时间最后。
  • RELEASE - 指最后一个发布版。

Maven 安装

Linux 环境安装可以使用我写一键安装脚本:https://github.com/dunwu/linux-tutorial/tree/master/codes/linux/ops/service/maven

环境准备

Maven 依赖于 Java,所以本地必须安装 JDK。

打开控制台,执行 java -version 确认本地已安装 JDK。

1
2
3
4
$ java -version
java version "1.8.0_191"
Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)

下载解压

进入 **官网下载地址**,选择合适版本,下载并解压到本地。解压命令如下:

1
2
3
# 以下解压命令分别针对 zip 包和 tar 包
unzip apache-maven-3.6.3-bin.zip
tar xzvf apache-maven-3.6.3-bin.tar.gz

环境变量

添加环境变量 MAVEN_HOME,值为 Maven 的安装路径。

配置 Unix 系统环境变量

输入 vi /etc/profile ,添加环境变量如下:

1
2
3
# MAVEN 的根路径
export MAVEN_HOME=/opt/maven/apache-maven-3.5.2
export PATH=$MAVEN_HOME/bin:$PATH

执行 source /etc/profile ,立即生效。

配置 Windows 系统环境变量

右键 “计算机”,选择 “属性”,之后点击 “高级系统设置”,点击”环境变量”,来设置环境变量,有以下系统变量需要配置:

img

img

检测安装成功

检验是否安装成功,执行 mvn -v 命令,如果输出类似下面的 maven 版本信息,说明配置成功。

1
2
3
4
5
6
$ mvn -v
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /opt/maven/apache-maven-3.5.4
Java version: 1.8.0_191, vendor: Oracle Corporation, runtime: /mnt/disk1/jdk1.8.0_191/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-327.el7.x86_64", arch: "amd64", family: "unix"

Maven 配置文件

setting.xml 文件是 Maven 的默认配置文件,其默认路径为:<Maven 安装目录>/conf/settings.xml

如果需要修改 Maven 配置,直接修改 setting.xml 并保持即可。

例如:想要修改本地仓库位置可以按如下配置,这样,所有通过 Maven 下载打包的 jar 包都会存储在 D:\maven\repo 路径下。

1
2
3
4
5
<settings xmlns="http://maven.apache.org/SETTINGS/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.1.0 http://maven.apache.org/xsd/settings-1.1.0.xsd">
<localRepository>D:\maven\repo<localRepository/>
<!-- 略 -->
</settings>

快速入门

创建 Maven 工程

初始化工程

执行指令:

1
mvn archetype:generate -DgroupId=com.mycompany.app -DartifactId=my-app -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false

会在当前路径新建一个名为 my-app 的 Maven 工程,其目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
my-app
|-- pom.xml
`-- src
|-- main
| `-- java
| `-- com
| `-- mycompany
| `-- app
| `-- App.java
`-- test
`-- java
`-- com
`-- mycompany
`-- app
`-- AppTest.java

其中, src/main/java 目录包含 java 源码, src/test/java 目录包含 java 测试源码,而 pom.xml 文件是 maven 工程的配置文件。

POM 配置

pom.xml 是 maven 工程的配置文件,它描述了 maven 工程的构建方式,其配置信息是很复杂的,这里给一个最简单的配置示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.mycompany.app</groupId>
<artifactId>my-app</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

构建项目

执行以下命令,即可构建项目:

1
mvn clean package -Dmaven.test.skip=true -B -U

构建成功后,会输出类似下面的信息:

1
2
3
4
5
6
7
...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.953 s
[INFO] Finished at: 2019-11-24T13:05:10+01:00
[INFO] ------------------------------------------------------------------------

这时,在当前路径下会产生一个 target 目录,其中包含了构建的输出物,如:jar 包、class 文件等。

这时,我们可以执行以下命令启动 jar 包:

1
java -cp target/my-app-1.0-SNAPSHOT.jar com.mycompany.app.App

在 Intellij 中创建 Maven 工程

(1)创建 Maven 工程

依次点击 File -> New -> Project 打开创建工程对话框,选择 Maven 工程。

img

(2)输入项目信息

img

(3)点击 Intellij 侧边栏中的 Maven 工具界面,有几个可以直接使用的 maven 命令,可以帮助你进行构建。

img

在 Eclipse 中创建 Maven 工程

(1)Maven 插件

在 Eclipse 中创建 Maven 工程,需要安装 Maven 插件。

一般较新版本的 Eclipse 都会带有 Maven 插件,如果你的 Eclipse 中已经有 Maven 插件,可以跳过这一步骤。

点击 Help -> Eclipse Marketplace,搜索 maven 关键字,选择安装红框对应的 Maven 插件。

img

(2)Maven 环境配置

点击 Window -> Preferences

如下图所示,配置 settings.xml 文件的位置

img

(3)创建 Maven 工程

File -> New -> Maven Project -> Next,在接下来的窗口中会看到一大堆的项目模板,选择合适的模板。

接下来设置项目的参数,如下:

img

groupId是项目组织唯一的标识符,实际对应 JAVA 的包的结构,是 main 目录里 java 的目录结构。

artifactId就是项目的唯一的标识符,实际对应项目的名称,就是项目根目录的名称。

点击 Finish,Eclipse 会创建一个 Maven 工程。

(4)使用 Maven 进行构建

Eclipse 中构建方式:

在 Elipse 项目上右击 -> Run As 就能看到很多 Maven 操作。这些操作和 maven 命令是等效的。例如 Maven clean,等同于 mvn clean 命令。

img

你也可以点击 Maven build,输入组合命令,并保存下来。如下图:

img

Maven 命令构建方式:

当然,你也可以直接使用 maven 命令进行构建。

进入工程所在目录,输入 maven 命令就可以了。

img

使用说明

如何添加依赖

在 Maven 工程中添加依赖 jar 包,很简单,只要在 POM 文件中引入对应的<dependency>标签即可。

参考下例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
<groupId>com.zp.maven</groupId>
<artifactId>MavenDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>MavenDemo</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<junit.version>3.8.1</junit.version>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.12</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

<dependency> 标签最常用的四个属性标签:

  • <groupId> - 项目组织唯一的标识符,实际对应 JAVA 的包的结构。
  • <artifactId> - 项目唯一的标识符,实际对应项目的名称,就是项目根目录的名称。
  • <version> - jar 包的版本号。可以直接填版本数字,也可以在 properties 标签中设置属性值。
  • <scope> - jar 包的作用范围。可以填写 compile、runtime、test、system 和 provided。用来在编译、测试等场景下选择对应的 classpath。

如何寻找 jar 包

可以在 http://mvnrepository.com/ 站点搜寻你想要的 jar 包版本

例如,想要使用 log4j,可以找到需要的版本号,然后拷贝对应的 maven 标签信息,将其添加到 pom .xml 文件中。

如何使用 Maven 插件(Plugin)

要添加 Maven 插件,可以在 pom.xml 文件中添加 <plugin> 标签。

1
2
3
4
5
6
7
8
9
10
11
12
13
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>

<configuration> 标签用来配置插件的一些使用参数。

如何一次编译多个工程

假设要创建一个父 maven 工程,它有两个子工程:my-app 和 my-webapp:

1
2
3
4
5
6
7
8
9
10
11
+- pom.xml
+- my-app
| +- pom.xml
| +- src
| +- main
| +- java
+- my-webapp
| +- pom.xml
| +- src
| +- main
| +- webapp

app 工程的 pom.xml 如下,重点在于在 modules 中引入两个子 module:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.mycompany.app</groupId>
<artifactId>app</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>

<modules>
<module>my-app</module>
<module>my-webapp</module>
</modules>
</project>

选择编译 XXX 时,会依次对它的所有 Module 执行相同操作。

常用 Maven 插件

更多详情请参考:https://maven.apache.org/plugins/

maven-antrun-plugin

maven-antrun-plugin 能让用户在 Maven 项目中运行 Ant 任务。用户可以直接在该插件的配置以 Ant 的方式编写 Target, 然后交给该插件的 run 目标去执行。在一些由 Ant 往 Maven 迁移的项目中,该插件尤其有用。此外当你发现需要编写一些自定义程度很高的任务,同时又觉 得 Maven 不够灵活时,也可以以 Ant 的方式实现之。maven-antrun-plugin 的 run 目标通常与生命周期绑定运行。

maven-archetype-plugin

Archtype 指项目的骨架,Maven 初学者最开始执行的 Maven 命令可能就是mvn archetype:generate,这实际上就是让 maven-archetype-plugin 生成一个很简单的项目骨架,帮助开发者快速上手。可能也有人看到一些文档写了mvn archetype:create, 但实际上 create 目标已经被弃用了,取而代之的是 generate 目标,该目标使用交互式的方式提示用户输入必要的信息以创建项目,体验更好。 maven-archetype-plugin 还有一些其他目标帮助用户自己定义项目原型,例如你由一个产品需要交付给很多客户进行二次开发,你就可以为 他们提供一个 Archtype,帮助他们快速上手。

maven-assembly-plugin

maven-assembly-plugin 的用途是将项目打包,该包可能包含了项目的可执行文件、源代码、readme、平台脚本等等。 maven-assembly-plugin 支持各种主流的格式如 zip、tar.gz、jar 和 war 等,具体打包哪些文件是高度可控的,例如用户可以 按文件级别的粒度、文件集级别的粒度、模块级别的粒度、以及依赖级别的粒度控制打包,此外,包含和排除配置也是支持的。maven-assembly- plugin 要求用户使用一个名为assembly.xml的元数据文件来表述打包,它的 single 目标可以直接在命令行调用,也可以被绑定至生命周期。

maven-dependency-plugin

maven-dependency-plugin 最大的用途是帮助分析项目依赖,dependency:list能够列出项目最终解析到的依赖列表,dependency:tree能进一步的描绘项目依赖树,dependency:analyze可以告诉你项目依赖潜在的问题,如果你有直接使用到的却未声明的依赖,该目标就会发出警告。maven-dependency-plugin 还有很多目标帮助你操作依赖文件,例如dependency:copy-dependencies能将项目依赖从本地 Maven 仓库复制到某个特定的文件夹下面。

maven-enforcer-plugin

在一个稍大一点的组织或团队中,你无法保证所有成员都熟悉 Maven,那他们做一些比较愚蠢的事情就会变得很正常,例如给项目引入了外部的 SNAPSHOT 依赖而导致构建不稳定,使用了一个与大家不一致的 Maven 版本而经常抱怨构建出现诡异问题。maven-enforcer- plugin 能够帮助你避免之类问题,它允许你创建一系列规则强制大家遵守,包括设定 Java 版本、设定 Maven 版本、禁止某些依赖、禁止 SNAPSHOT 依赖。只要在一个父 POM 配置规则,然后让大家继承,当规则遭到破坏的时候,Maven 就会报错。除了标准的规则之外,你还可以扩展该插 件,编写自己的规则。maven-enforcer-plugin 的 enforce 目标负责检查规则,它默认绑定到生命周期的 validate 阶段。

maven-help-plugin

maven-help-plugin 是一个小巧的辅助工具,最简单的help:system可以打印所有可用的环境变量和 Java 系统属性。help:effective-pomhelp:effective-settings最 为有用,它们分别打印项目的有效 POM 和有效 settings,有效 POM 是指合并了所有父 POM(包括 Super POM)后的 XML,当你不确定 POM 的某些信息从何而来时,就可以查看有效 POM。有效 settings 同理,特别是当你发现自己配置的 settings.xml 没有生效时,就可以用help:effective-settings来验证。此外,maven-help-plugin 的 describe 目标可以帮助你描述任何一个 Maven 插件的信息,还有 all-profiles 目标和 active-profiles 目标帮助查看项目的 Profile。

maven-release-plugin

maven-release-plugin 的用途是帮助自动化项目版本发布,它依赖于 POM 中的 SCM 信息。release:prepare用来准备版本发布,具体的工作包括检查是否有未提交代码、检查是否有 SNAPSHOT 依赖、升级项目的 SNAPSHOT 版本至 RELEASE 版本、为项目打标签等等。release:perform则 是签出标签中的 RELEASE 源码,构建并发布。版本发布是非常琐碎的工作,它涉及了各种检查,而且由于该工作仅仅是偶尔需要,因此手动操作很容易遗漏一 些细节,maven-release-plugin 让该工作变得非常快速简便,不易出错。maven-release-plugin 的各种目标通常直接在 命令行调用,因为版本发布显然不是日常构建生命周期的一部分。

maven-resources-plugin

为了使项目结构更为清晰,Maven 区别对待 Java 代码文件和资源文件,maven-compiler-plugin 用来编译 Java 代码,maven-resources-plugin 则用来处理资源文件。默认的主资源文件目录是src/main/resources,很多用户会需要添加额外的资源文件目录,这个时候就可以通过配置 maven-resources-plugin 来实现。此外,资源文件过滤也是 Maven 的一大特性,你可以在资源文件中使用*${propertyName}*形式的 Maven 属性,然后配置 maven-resources-plugin 开启对资源文件的过滤,之后就可以针对不同环境通过命令行或者 Profile 传入属性的值,以实现更为灵活的构建。

maven-surefire-plugin

可能是由于历史的原因,Maven 2.3 中用于执行测试的插件不是 maven-test-plugin,而是 maven-surefire-plugin。其实大部分时间内,只要你的测试 类遵循通用的命令约定(以 Test 结尾、以 TestCase 结尾、或者以 Test 开头),就几乎不用知晓该插件的存在。然而在当你想要跳过测试、排除某些 测试类、或者使用一些 TestNG 特性的时候,了解 maven-surefire-plugin 的一些配置选项就很有用了。例如 mvn test -Dtest=FooTest 这样一条命令的效果是仅运行 FooTest 测试类,这是通过控制 maven-surefire-plugin 的 test 参数实现的。

build-helper-maven-plugin

Maven 默认只允许指定一个主 Java 代码目录和一个测试 Java 代码目录,虽然这其实是个应当尽量遵守的约定,但偶尔你还是会希望能够指定多个 源码目录(例如为了应对遗留项目),build-helper-maven-plugin 的 add-source 目标就是服务于这个目的,通常它被绑定到 默认生命周期的 generate-sources 阶段以添加额外的源码目录。需要强调的是,这种做法还是不推荐的,因为它破坏了 Maven 的约定,而且可能会遇到其他严格遵守约定的插件工具无法正确识别额外的源码目录。

build-helper-maven-plugin 的另一个非常有用的目标是 attach-artifact,使用该目标你可以以 classifier 的形式选取部分项目文件生成附属构件,并同时 install 到本地仓库,也可以 deploy 到远程仓库。

exec-maven-plugin

exec-maven-plugin 很好理解,顾名思义,它能让你运行任何本地的系统程序,在某些特定情况下,运行一个 Maven 外部的程序可能就是最简单的问题解决方案,这就是exec:exec的 用途,当然,该插件还允许你配置相关的程序运行参数。除了 exec 目标之外,exec-maven-plugin 还提供了一个 java 目标,该目标要求你 提供一个 mainClass 参数,然后它能够利用当前项目的依赖作为 classpath,在同一个 JVM 中运行该 mainClass。有时候,为了简单的 演示一个命令行 Java 程序,你可以在 POM 中配置好 exec-maven-plugin 的相关运行参数,然后直接在命令运行mvn exec:java 以查看运行效果。

jetty-maven-plugin

在进行 Web 开发的时候,打开浏览器对应用进行手动的测试几乎是无法避免的,这种测试方法通常就是将项目打包成 war 文件,然后部署到 Web 容器 中,再启动容器进行验证,这显然十分耗时。为了帮助开发者节省时间,jetty-maven-plugin 应运而生,它完全兼容 Maven 项目的目录结构,能够周期性地检查源文件,一旦发现变更后自动更新到内置的 Jetty Web 容器中。做一些基本配置后(例如 Web 应用的 contextPath 和自动扫描变更的时间间隔),你只要执行 mvn jetty:run ,然后在 IDE 中修改代码,代码经 IDE 自动编译后产生变更,再由 jetty-maven-plugin 侦测到后更新至 Jetty 容器,这时你就可以直接 测试 Web 页面了。需要注意的是,jetty-maven-plugin 并不是宿主于 Apache 或 Codehaus 的官方插件,因此使用的时候需要额外 的配置settings.xml的 pluginGroups 元素,将 org.mortbay.jetty 这个 pluginGroup 加入。

versions-maven-plugin

很多 Maven 用户遇到过这样一个问题,当项目包含大量模块的时候,为他们集体更新版本就变成一件烦人的事情,到底有没有自动化工具能帮助完成这件 事情呢?(当然你可以使用 sed 之类的文本操作工具,不过不在本文讨论范围)答案是肯定的,versions-maven- plugin 提供了很多目标帮助你管理 Maven 项目的各种版本信息。例如最常用的,命令 mvn versions:set -DnewVersion=1.1-SNAPSHOT 就能帮助你把所有模块的版本更新到 1.1-SNAPSHOT。该插件还提供了其他一些很有用的目标,display-dependency- updates 能告诉你项目依赖有哪些可用的更新;类似的 display-plugin-updates 能告诉你可用的插件更新;然后 use- latest-versions 能自动帮你将所有依赖升级到最新版本。最后,如果你对所做的更改满意,则可以使用 mvn versions:commit 提交,不满意的话也可以使用 mvn versions:revert 进行撤销。

Maven 命令

常用 maven 命令清单:

生命周期 阶段描述
mvn validate 验证项目是否正确,以及所有为了完整构建必要的信息是否可用
mvn generate-sources 生成所有需要包含在编译过程中的源代码
mvn process-sources 处理源代码,比如过滤一些值
mvn generate-resources 生成所有需要包含在打包过程中的资源文件
mvn process-resources 复制并处理资源文件至目标目录,准备打包
mvn compile 编译项目的源代码
mvn process-classes 后处理编译生成的文件,例如对 Java 类进行字节码增强(bytecode enhancement)
mvn generate-test-sources 生成所有包含在测试编译过程中的测试源码
mvn process-test-sources 处理测试源码,比如过滤一些值
mvn generate-test-resources 生成测试需要的资源文件
mvn process-test-resources 复制并处理测试资源文件至测试目标目录
mvn test-compile 编译测试源码至测试目标目录
mvn test 使用合适的单元测试框架运行测试。这些测试应该不需要代码被打包或发布
mvn prepare-package 在真正的打包之前,执行一些准备打包必要的操作。这通常会产生一个包的展开的处理过的版本(将会在 Maven 2.1+中实现)
mvn package 将编译好的代码打包成可分发的格式,如 JAR,WAR,或者 EAR
mvn pre-integration-test 执行一些在集成测试运行之前需要的动作。如建立集成测试需要的环境
mvn integration-test 如果有必要的话,处理包并发布至集成测试可以运行的环境
mvn post-integration-test 执行一些在集成测试运行之后需要的动作。如清理集成测试环境。
mvn verify 执行所有检查,验证包是有效的,符合质量规范
mvn install 安装包至本地仓库,以备本地的其它项目作为依赖使用
mvn deploy 复制最终的包至远程仓库,共享给其它开发人员和项目(通常和一次正式的发布相关)

示例:最常用的 maven 构建命令

1
mvn clean install -Dmaven.test.skip=true -B -U

清理本地输出物,并构建 maven 项目,最后将输出物归档在本地仓库。

:bulb: 想了解更多 maven 命令行细节可以参考官方文档:

参考资料

网络技术之 VPN

img

简介

虚拟专用网络(VPN)的功能是:在公用网络上建立专用网络,进行加密通讯。在企业网络中有广泛应用。VPN 网关通过对数据包的加密和数据包目标地址的转换实现远程访问。VPN 可通过服务器、硬件、软件等多种方式实现。

VPN 属于远程访问技术,简单地说就是利用公用网络架设专用网络。例如某公司员工出差到外地,他想访问企业内网的服务器资源,这种访问就属于远程访问。
在传统的企业网络配置中,要进行远程访问,传统的方法是租用 DDN(数字数据网)专线或帧中继,这样的通讯方案必然导致高昂的网络通讯和维护费用。对于移动用户(移动办公人员)与远端个人用户而言,一般会通过拨号线路(Internet)进入企业的局域网,但这样必然带来安全上的隐患。
让外地员工访问到内网资源,利用 VPN 的解决方法就是在内网中架设一台 VPN 服务器。外地员工在当地连上互联网后,通过互联网连接 VPN 服务器,然后通过 VPN 服务器进入企业内网。为了保证数据安全,VPN 服务器和客户机之间的通讯数据都进行了加密处理。有了数据加密,就可以认为数据是在一条专用的数据链路上进行安全传输,就如同专门架设了一个专用网络一样,但实际上 VPN 使用的是互联网上的公用链路,因此 VPN 称为虚拟专用网络,其实质上就是利用加密技术在公网上封装出一个数据通讯隧道。有了 VPN 技术,用户无论是在外地出差还是在家中办公,只要能上互联网就能利用 VPN 访问内网资源,这就是 VPN 在企业中应用得如此广泛的原因。

VPN 的作用

隐藏 IP 和位置

img

VPN 可以隐藏使用者的 IP 地址和位置。

使用 VPN 的最常见原因之一是屏蔽您的真实 IP 地址。

您的 IP 地址是由 ISP 分配的唯一数字地址。 您在线上所做的所有事情都链接到您的 IP 地址,因此可以用来将您与在线活动进行匹配。 大多数网站记录其访问者的 IP 地址。

广告商还可以使用您的 IP 地址,根据您的身份和浏览历史为您提供有针对性的广告。

连接到 VPN 服务器时,您将使用该 VPN 服务器的 IP 地址。 您访问的任何网站都会看到 VPN 服务器的 IP 地址,而不是您自己的。

您将能够绕过 IP 地址阻止并浏览网站,而不会将您的活动作为一个个人追溯到您。

通信加密

img

使用 VPN 时,可以对信息进行加密,使得密码,电子邮件,照片,银行数据和其他敏感信息不会被拦截。

如果在公共场所使用公共 WiFi 连接网络时,敏感数据有被盗的风险。黑客可以利用开放和未加密的网络来窃取重要数据,例如您的密码,电子邮件,照片,银行数据和其他敏感信息。

VPN 可以加密信息,使黑客更难以拦截和窃取数据。

翻墙

img

轻松解除对 Facebook 和 Twitter,Skype,YouTube 和 Gmail 等网站和服务的阻止。 即使您被告知您所在的国家/地区不可用它,或者您所在的学校或办公室网络限制访问,也可以获取所需的东西。

某些服务(例如 Netflix 或 BBC iPlayer)会根据您访问的国家/地区限制访问内容。使用 VPN 可以绕过这些地理限制并解锁“隐藏”内容的唯一可靠方法。

避免被监听

img

使用 VPN 可以向政府、ISP、黑客隐藏通信信息。

您的 Internet 服务提供商(ISP)可以看到您访问的所有网站,并且几乎可以肯定会记录该信息。

在某些国家/地区,ISP 需要长时间收集和存储用户数据,并且政府能够访问,存储和搜索该信息。

在美国,英国,澳大利亚和欧洲大部分地区就是这种情况,仅举几例。

由于 VPN 会加密从设备到 VPN 服务器的互联网流量,因此您的 ISP 或任何其他第三方将无法监视您的在线活动。

要了解有关监视技术和全球大规模监视问题的更多信息,请访问 EFF 和 Privacy International。 您还可以在此处找到全球监视披露的更新列表。

工作原理

VPN 会在您的设备和私人服务器之间建立私人和加密的互联网连接。 这意味着您的数据无法被 ISP 或任何其他第三方读取或理解。 然后,私有服务器将您的流量发送到您要访问的网站或服务上。

img

VPN 的基本处理过程如下:

  1. 要保护主机发送明文信息到其他 VPN 设备。
  2. VPN 设备根据网络管理员设置的规则,确定是对数据进行加密还是直接传输。
  3. 对需要加密的数据,VPN 设备将其整个数据包(包括要传输的数据、源 IP 地址和目的 lP 地址)进行加密并附上数据签名,加上新的数据报头(包括目的地 VPN 设备需要的安全信息和一些初始化参数)重新封装。
  4. 将封装后的数据包通过隧道在公共网络上传输。
  5. 数据包到达目的 VPN 设备后,将其解封,核对数字签名无误后,对数据包解密。

VPN 协议

img

  • OpenVPN

  • IKEv2 / IPSec

  • SSTP

  • PPTP

  • Wireguard

VPN 服务

你可以选择付费 VPN 或自行搭建 VPN。

VPN 服务商:

开源 VPN:

参考资料

深入剖析共识性算法 Paxos

Paxos 是一种基于消息传递且具有容错性的共识性(consensus)算法。

Paxos 算法解决的问题正是分布式一致性问题。在一个节点数为 2N+1 的分布式集群中,只要半数以上的节点(N + 1)还正常工作,整个系统仍可以正常工作。

Paxos 背景

Paxos 是 Leslie Lamport 于 1990 年提出的一种基于消息传递且具有高度容错特性的共识(consensus)算法

为描述 Paxos 算法,Lamport 虚拟了一个叫做 Paxos 的希腊城邦,这个岛按照议会民主制的政治模式制订法律,但是没有人愿意将自己的全部时间和精力放在这种事情上。所以无论是议员,议长或者传递纸条的服务员都不能承诺别人需要时一定会出现,也无法承诺批准决议或者传递消息的时间。

Paxos 算法包含 2 个部分:

  • Basic Paxos 算法:描述的多节点之间如何就某个值达成共识。
  • Multi Paxos 思想:描述的是执行多个 Basic Paxos 实例,就一系列值达成共识。

Paxos 算法解决的问题正是分布式共识性问题,即一个分布式系统中的各个进程如何就某个值(决议)达成一致。

Paxos 算法运行在允许宕机故障的异步系统中,不要求可靠的消息传递,可容忍消息丢失、延迟、乱序以及重复。它利用大多数 (Majority) 机制保证了 2N+1 的容错能力,即 2N+1 个节点的系统最多允许 N 个节点同时出现故障。

Basic Paxos 算法

角色

Paxos 将分布式系统中的节点分 Proposer、Acceptor、Learner 三种角色。

img

  • 提议者(Proposer):发出提案(Proposal),用于投票表决。Proposal 信息包括提案编号 (Proposal ID) 和提议的值 (Value)。在绝大多数场景中,集群中收到客户端请求的节点,才是提议者。这样做的好处是,对业务代码没有入侵性,也就是说,我们不需要在业务代码中实现算法逻辑。
  • 接受者(Acceptor):对每个 Proposal 进行投票,若 Proposal 获得多数 Acceptor 的接受,则称该 Proposal 被批准。一般来说,集群中的所有节点都在扮演接受者的角色,参与共识协商,并接受和存储数据。
  • 学习者(Learner):不参与接受,从 Proposers/Acceptors 学习、记录最新达成共识的提案(Value)。一般来说,学习者是数据备份节点,比如主从架构中的从节点,被动地接受数据,容灾备份。

在多副本状态机中,每个副本都同时具有 Proposer、Acceptor、Learner 三种角色。

这三种角色,在本质上代表的是三种功能:

  • 提议者代表的是接入和协调功能,收到客户端请求后,发起二阶段提交,进行共识协商;
  • 接受者代表投票协商和存储数据,对提议的值进行投票,并接受达成共识的值,存储保存;
  • 学习者代表存储数据,不参与共识协商,只接受达成共识的值,存储保存。

算法

Paxos 算法有 3 个阶段,其中,前 2 个阶段负责协商并达成共识:

  1. 准备(Prepare)阶段:Proposer 向 Acceptors 发出 Prepare 请求,Acceptors 针对收到的 Prepare 请求进行 Promise 承诺。
  2. 接受(Accept)阶段:Proposer 收到多数 Acceptors 承诺的 Promise 后,向 Acceptors 发出 Propose 请求,Acceptors 针对收到的 Propose 请求进行 Accept 处理。
  3. 学习(Learn)阶段:Proposer 在收到多数 Acceptors 的 Accept 之后,标志着本次 Accept 成功,决议形成,将形成的决议发送给所有 Learners。

看到这里,了解过分布式事务的读者,想必会觉得眼熟:这不就是两阶段提交嘛!没错,这里采用的正式两阶段提交的思想。两阶段提交是达成共识的常用方式。

Paxos 算法流程中的每条消息描述如下:

  • Prepare: Proposer 生成全局唯一且递增的 Proposal ID (可使用时间戳加 Server ID),向所有 Acceptors 发送 Prepare 请求,这里无需携带提案内容,只携带 Proposal ID 即可。

  • Promise: Acceptors 收到 Prepare 请求后,做出“两个承诺,一个应答”。

    • 两个承诺:

      • 不再接受 Proposal ID 小于等于当前请求的 Prepare 请求。
      • 不再接受 Proposal ID 小于当前请求的 Propose 请求。
    • 一个应答:

      • 不违背以前作出的承诺下,回复已经 Accept 过的提案中 Proposal ID 最大的那个提案的 Value 和 Proposal ID,没有则返回空值。
  • Propose: Proposer 收到多数 Acceptors 的 Promise 应答后,从应答中选择 Proposal ID 最大的提案的 Value,作为本次要发起的提案。如果所有应答的提案 Value 均为空值,则可以自己随意决定提案 Value。然后携带当前 Proposal ID,向所有 Acceptors 发送 Propose 请求。

  • Accept: Acceptor 收到 Propose 请求后,在不违背自己之前作出的承诺下,接受并持久化当前 Proposal ID 和提案 Value。

  • Learn: Proposer 收到多数 Acceptors 的 Accept 后,决议形成,将形成的决议发送给所有 Learners。

Prepare 阶段

在准备请求中是不需要指定提议的值的,只需要携带提案编号就可以了。

下图的示例中,首先客户端 1、2 作为提议者,分别向所有接受者发送包含提案编号的准备请求:

接着,当节点 A、B 收到提案编号为 1 的准备请求,节点 C 收到提案编号为 5 的准备请求后,将进行这样的处理:

  • 由于之前没有通过任何提案,所以节点 A、B 将返回一个 “尚无提案” 的响应。也就是说节点 A 和 B 在告诉提议者,我之前没有通过任何提案呢,并承诺以后不再响应提案编号小于等于 1 的准备请求,不会通过编号小于 1 的提案。
  • 节点 C 也是如此,它将返回一个 “尚无提案”的响应,并承诺以后不再响应提案编号小于等于 5 的准备请求,不会通过编号小于 5 的提案。

另外,当节点 A、B 收到提案编号为 5 的准备请求,和节点 C 收到提案编号为 1 的准备请求的时候,将进行这样的处理过程:

  • 当节点 A、B 收到提案编号为 5 的准备请求的时候,因为提案编号 5 大于它们之前响应的准备请求的提案编号 1,而且两个节点都没有通过任何提案,所以它将返回一个 “尚无提案”的响应,并承诺以后不再响应提案编号小于等于 5 的准备请求,不会通过编号小于 5 的提案。

  • 当节点 C 收到提案编号为 1 的准备请求的时候,由于提案编号 1 小于它之前响应的准备请求的提案编号 5,所以丢弃该准备请求,不做响应。

Accept 阶段

首先客户端 1、2 在收到大多数节点的准备响应之后,会分别发送接受请求:

  • 当客户端 1 收到大多数的接受者(节点 A、B)的准备响应后,根据响应中提案编号最大的提案的值,设置接受请求中的值。因为该值在来自节点 A、B 的准备响应中都为空(也就是图 5 中的“尚无提案”),所以就把自己的提议值 3 作为提案的值,发送接受请求[1, 3]。

  • 当客户端 2 收到大多数的接受者的准备响应后(节点 A、B 和节点 C),根据响应中提案编号最大的提案的值,来设置接受请求中的值。因为该值在来自节点 A、B、C 的准备响应中都为空(也就是图 5 和图 6 中的“尚无提案”),所以就把自己的提议值 7 作为提案的值,发送接受请求[5, 7]。

当三个节点收到 2 个客户端的接受请求时,会进行这样的处理:

  • 当节点 A、B、C 收到接受请求[1, 3]的时候,由于提案的提案编号 1 小于三个节点承诺能通过的提案的最小提案编号 5,所以提案[1, 3]将被拒绝。
  • 当节点 A、B、C 收到接受请求[5, 7]的时候,由于提案的提案编号 5 不小于三个节点承诺能通过的提案的最小提案编号 5,所以就通过提案[5, 7],也就是接受了值 7,三个节点就 X 值为 7 达成了共识。

小结

Basic Paxos 是通过二阶段提交的方式来达成共识的。

除了共识,Basic Paxos 还实现了容错,在少于一半的节点出现故障时,集群也能工作。它不像分布式事务算法那样,必须要所有节点都同意后才提交操作,因为“所有节点都同意”这个原则,在出现节点故障的时候会导致整个集群不可用。也就是说,“大多数节点都同意”的原则,赋予了 Basic Paxos 容错的能力,让它能够容忍少于一半的节点的故障。

本质上而言,提案编号的大小代表着优先级,你可以这么理解,根据提案编号的大小,接受者保证三个承诺,具体来说:

  • 如果准备请求的提案编号,小于等于接受者已经响应的准备请求的提案编号,那么接受者将承诺不响应这个准备请求;
  • 如果接受请求中的提案的提案编号,小于接受者已经响应的准备请求的提案编号,那么接受者将承诺不通过这个提案;
  • 如果接受者之前有通过提案,那么接受者将承诺,会在准备请求的响应中,包含已经通过的最大编号的提案信息。

Multi Paxos 思想

兰伯特提到的 Multi-Paxos 是一种思想,不是算法。而 Multi-Paxos 算法是一个统称,它是指基于 Multi-Paxos 思想,通过多个 Basic Paxos 实例实现一系列值的共识的算法(比如 Chubby 的 Multi-Paxos 实现、Raft 算法等)。

Basic Paxos 的问题

Basic Paxos 有以下问题,导致它不能应用于实际:

  • Basic Paxos 算法只能对一个值形成决议
  • Basic Paxos 算法会消耗大量网络带宽。Basic Paxos 中,决议的形成至少需要两次网络通信,在高并发情况下可能需要更多的网络通信,极端情况下甚至可能形成活锁。如果想连续确定多个值,Basic Paxos 搞不定了。

Multi Paxos 的改进

Multi Paxos 正是为解决以上问题而提出。Multi Paxos 基于 Basic Paxos 做了两点改进:

  • 针对每一个要确定的值,运行一次 Paxos 算法实例(Instance),形成决议。每一个 Paxos 实例使用唯一的 Instance ID 标识。
  • 在所有 Proposer 中选举一个 Leader,由 Leader 唯一地提交 Proposal 给 Acceptor 进行表决。这样没有 Proposer 竞争,解决了活锁问题。在系统中仅有一个 Leader 进行 Value 提交的情况下,Prepare 阶段就可以跳过,从而将两阶段变为一阶段,提高效率。

Multi Paxos 首先需要选举 Leader,Leader 的确定也是一次决议的形成,所以可执行一次 Basic Paxos 实例来选举出一个 Leader。选出 Leader 之后只能由 Leader 提交 Proposal,在 Leader 宕机之后服务临时不可用,需要重新选举 Leader 继续服务。在系统中仅有一个 Leader 进行 Proposal 提交的情况下,Prepare 阶段可以跳过。

Multi Paxos 通过改变 Prepare 阶段的作用范围至后面 Leader 提交的所有实例,从而使得 Leader 的连续提交只需要执行一次 Prepare 阶段,后续只需要执行 Accept 阶段,将两阶段变为一阶段,提高了效率。为了区分连续提交的多个实例,每个实例使用一个 Instance ID 标识,Instance ID 由 Leader 本地递增生成即可。

Multi Paxos 允许有多个自认为是 Leader 的节点并发提交 Proposal 而不影响其安全性,这样的场景即退化为 Basic Paxos。

Chubby 和 Boxwood 均使用 Multi Paxos。ZooKeeper 使用的 Zab 也是 Multi Paxos 的变形。

参考资料

Java 并发和容器

同步容器

同步容器简介

在 Java 中,同步容器主要包括 2 类:

  • VectorStackHashtable
    • Vector - Vector 实现了 List 接口。Vector 实际上就是一个数组,和 ArrayList 类似。但是 Vector 中的方法都是 synchronized 方法,即进行了同步措施。
    • Stack - Stack 也是一个同步容器,它的方法也用 synchronized 进行了同步,它实际上是继承于 Vector 类。
    • Hashtable- Hashtable 实现了 Map 接口,它和 HashMap 很相似,但是 Hashtable 进行了同步处理,而 HashMap 没有。
  • Collections 类中提供的静态工厂方法创建的类(由 Collections.synchronizedXXX 等方法)

同步容器的问题

同步容器的同步原理就是在其 getsetsize 等主要方法上用 synchronized 修饰。 synchronized 可以保证在同一个时刻,只有一个线程可以执行某个方法或者某个代码块

想详细了解 synchronized 用法和原理可以参考:Java 并发核心机制

性能问题

synchronized 的互斥同步会产生阻塞和唤醒线程的开销。显然,这种方式比没有使用 synchronized 的容器性能要差很多。

注:尤其是在 Java 1.6 没有对 synchronized 进行优化前,阻塞开销很高。

安全问题

同步容器真的绝对安全吗?

其实也未必。在做复合操作(非原子操作)时,仍然需要加锁来保护。常见复合操作如下:

  • 迭代:反复访问元素,直到遍历完全部元素;
  • 跳转:根据指定顺序寻找当前元素的下一个(下 n 个)元素;
  • 条件运算:例如若没有则添加等;

❌ 不安全的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class VectorDemo {

static Vector<Integer> vector = new Vector<>();

public static void main(String[] args) {
while (true) {
vector.clear();

for (int i = 0; i < 10; i++) {
vector.add(i);
}

Thread thread1 = new Thread() {
@Override
public void run() {
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
};

Thread thread2 = new Thread() {
@Override
public void run() {
for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
}
};

thread1.start();
thread2.start();

while (Thread.activeCount() > 10) {
System.out.println("同时存在 10 个以上线程,退出");
return;
}
}
}

}

以上程序执行时可能会出现数组越界错误。

Vector 是线程安全的,那为什么还会报这个错?

这是因为,对于 Vector,虽然能保证每一个时刻只能有一个线程访问它,但是不排除这种可能:

当某个线程在某个时刻执行这句时:

1
2
for(int i=0;i<vector.size();i++)
vector.get(i);

假若此时 vector 的 size 方法返回的是 10,i 的值为 9

然后另外一个线程执行了这句:

1
2
for(int i=0;i<vector.size();i++)
vector.remove(i);

将下标为 9 的元素删除了。

那么通过 get 方法访问下标为 9 的元素肯定就会出问题了。

✔️️️ 安全示例

因此为了保证线程安全,必须在方法调用端做额外的同步措施,如下面所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class VectorDemo2 {

static Vector<Integer> vector = new Vector<Integer>();

public static void main(String[] args) {
while (true) {
for (int i = 0; i < 10; i++) {
vector.add(i);
}

Thread thread1 = new Thread() {
@Override
public void run() {
synchronized (VectorDemo2.class) { //进行额外的同步
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
}
};

Thread thread2 = new Thread() {
@Override
public void run() {
synchronized (VectorDemo2.class) {
for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
}
}
};

thread1.start();
thread2.start();

while (Thread.activeCount() > 10) {
System.out.println("同时存在 10 个以上线程,退出");
return;
}
}
}

}

ConcurrentModificationException 异常

在对 Vector 等容器并发地进行迭代修改时,会报 ConcurrentModificationException 异常,关于这个异常将会在后续文章中讲述。

但是在并发容器中不会出现这个问题。

并发容器简介

同步容器将所有对容器状态的访问都串行化,以保证线程安全性,这种策略会严重降低并发性。

Java 1.5 后提供了多种并发容器,使用并发容器来替代同步容器,可以极大地提高伸缩性并降低风险

J.U.C 包中提供了几个非常有用的并发容器作为线程安全的容器:

并发容器 对应的普通容器 描述
ConcurrentHashMap HashMap Java 1.8 之前采用分段锁机制细化锁粒度,降低阻塞,从而提高并发性;Java 1.8 之后基于 CAS 实现。
ConcurrentSkipListMap SortedMap 基于跳表实现的
CopyOnWriteArrayList ArrayList
CopyOnWriteArraySet Set 基于 CopyOnWriteArrayList 实现。
ConcurrentSkipListSet SortedSet 基于 ConcurrentSkipListMap 实现。
ConcurrentLinkedQueue Queue 线程安全的无界队列。底层采用单链表。支持 FIFO。
ConcurrentLinkedDeque Deque 线程安全的无界双端队列。底层采用双向链表。支持 FIFO 和 FILO。
ArrayBlockingQueue Queue 数组实现的阻塞队列。
LinkedBlockingQueue Queue 链表实现的阻塞队列。
LinkedBlockingDeque Deque 双向链表实现的双端阻塞队列。

J.U.C 包中提供的并发容器命名一般分为三类:

  • Concurrent
    • 这类型的锁竞争相对于 CopyOnWrite 要高一些,但写操作代价要小一些。
    • 此外,Concurrent 往往提供了较低的遍历一致性,即:当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历。代价就是,在获取容器大小 size() ,容器是否为空等方法,不一定完全精确,但这是为了获取并发吞吐量的设计取舍,可以理解。与之相比,如果是使用同步容器,就会出现 fail-fast 问题,即:检测到容器在遍历过程中发生了修改,则抛出 ConcurrentModificationException,不再继续遍历。
  • CopyOnWrite - 一个线程写,多个线程读。读操作时不加锁,写操作时通过在副本上加锁保证并发安全,空间开销较大。
  • Blocking - 内部实现一般是基于锁,提供阻塞队列的能力。

:x: 错误示例,产生 ConcurrentModificationException 异常:

1
2
3
public void removeKeys(Map<String, Object> map, final String... keys) {
map.keySet().removeIf(key -> ArrayUtil.contains(keys, key));
}

:x: 错误示例,产生 ConcurrentModificationException 异常:

1
2
3
4
5
6
public static <K, V> Map<K, V> removeKeys(Map<String, Object> map, final String... keys) {
for (K key : keys) {
map.remove(key);
}
return map;
}

并发场景下的 Map

如果对数据有强一致要求,则需使用 Hashtable;在大部分场景通常都是弱一致性的情况下,使用 ConcurrentHashMap 即可;如果数据量在千万级别,且存在大量增删改操作,则可以考虑使用 ConcurrentSkipListMap

并发场景下的 List

读多写少用 CopyOnWriteArrayList

写多读少用 ConcurrentLinkedQueue ,但由于是无界的,要有容量限制,避免无限膨胀,导致内存溢出。

Map

Map 接口的两个实现是 ConcurrentHashMap 和 ConcurrentSkipListMap,它们从应用的角度来看,主要区别在于ConcurrentHashMap 的 key 是无序的,而 ConcurrentSkipListMap 的 key 是有序的。所以如果你需要保证 key 的顺序,就只能使用 ConcurrentSkipListMap。

使用 ConcurrentHashMap 和 ConcurrentSkipListMap 需要注意的地方是,它们的 key 和 value 都不能为空,否则会抛出NullPointerException这个运行时异常。

ConcurrentHashMap

ConcurrentHashMap 是线程安全的 HashMap ,用于替代 Hashtable

ConcurrentHashMap 的特性

ConcurrentHashMap 实现了 ConcurrentMap 接口,而 ConcurrentMap 接口扩展了 Map 接口。

1
2
3
4
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
// ...
}

ConcurrentHashMap 的实现包含了 HashMap 所有的基本特性,如:数据结构、读写策略等。

ConcurrentHashMap 没有实现对 Map 加锁以提供独占访问。因此无法通过在客户端加锁的方式来创建新的原子操作。但是,一些常见的复合操作,如:“若没有则添加”、“若相等则移除”、“若相等则替换”,都已经实现为原子操作,并且是围绕 ConcurrentMap 的扩展接口而实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface ConcurrentMap<K, V> extends Map<K, V> {

// 仅当 K 没有相应的映射值才插入
V putIfAbsent(K key, V value);

// 仅当 K 被映射到 V 时才移除
boolean remove(Object key, Object value);

// 仅当 K 被映射到 oldValue 时才替换为 newValue
boolean replace(K key, V oldValue, V newValue);

// 仅当 K 被映射到某个值时才替换为 newValue
V replace(K key, V value);
}

不同于 HashtableConcurrentHashMap 提供的迭代器不会抛出 ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。

:bell: 注意:一些需要对整个 Map 进行计算的方法,如 sizeisEmpty ,由于返回的结果在计算时可能已经过期,所以并非实时的精确值。这是一种策略上的权衡,在并发环境下,这类方法由于总在不断变化,所以获取其实时精确值的意义不大。ConcurrentHashMap 弱化这类方法,以换取更重要操作(如:getputcontainesKeyremove 等)的性能。

ConcurrentHashMap 的用法

示例:不会出现 ConcurrentModificationException

ConcurrentHashMap 的基本操作与 HashMap 的用法基本一样。不同于 HashMapHashtableConcurrentHashMap 提供的迭代器不会抛出 ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ConcurrentHashMapDemo {

public static void main(String[] args) throws InterruptedException {

// HashMap 在并发迭代访问时会抛出 ConcurrentModificationException 异常
// Map<Integer, Character> map = new HashMap<>();
Map<Integer, Character> map = new ConcurrentHashMap<>();

Thread wthread = new Thread(() -> {
System.out.println("写操作线程开始执行");
for (int i = 0; i < 26; i++) {
map.put(i, (char) ('a' + i));
}
});
Thread rthread = new Thread(() -> {
System.out.println("读操作线程开始执行");
for (Integer key : map.keySet()) {
System.out.println(key + " - " + map.get(key));
}
});
wthread.start();
rthread.start();
Thread.sleep(1000);
}
}

ConcurrentHashMap 的原理

ConcurrentHashMap 一直在演进,尤其在 Java 1.7 和 Java 1.8,其数据结构和并发机制有很大的差异。

  • Java 1.7
    • 数据结构:数组+单链表
    • 并发机制:采用分段锁机制细化锁粒度,降低阻塞,从而提高并发性。
  • Java 1.8
    • 数据结构:数组+单链表+红黑树
    • 并发机制:取消分段锁,之后基于 CAS + synchronized 实现。
Java 1.7 的实现

分段锁,是将内部进行分段(Segment),里面是 HashEntry 数组,和 HashMap 类似,哈希相同的条目也是以链表形式存放。
HashEntry 内部使用 volatilevalue 字段来保证可见性,也利用了不可变对象的机制,以改进利用 Unsafe 提供的底层能力,比如 volatile access,去直接完成部分操作,以最优化性能,毕竟 Unsafe 中的很多操作都是 JVM intrinsic 优化过的。

img

在进行并发写操作时,ConcurrentHashMap 会获取可重入锁(ReentrantLock),以保证数据一致性。所以,在并发修改期间,相应 Segment 是被锁定的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {

// 将整个hashmap分成几个小的map,每个segment都是一个锁;与hashtable相比,这么设计的目的是对于put, remove等操作,可以减少并发冲突,对
// 不属于同一个片段的节点可以并发操作,大大提高了性能
final Segment<K,V>[] segments;

// 本质上Segment类就是一个小的hashmap,里面table数组存储了各个节点的数据,继承了ReentrantLock, 可以作为互拆锁使用
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile HashEntry<K,V>[] table;
transient int count;
}

// 基本节点,存储Key, Value值
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
}
Java 1.8 的实现
  • 数据结构改进:与 HashMap 一样,将原先 数组+单链表 的数据结构,变更为 数组+单链表+红黑树 的结构。当出现哈希冲突时,数据会存入数组指定桶的单链表,当链表长度达到 8,则将其转换为红黑树结构,这样其查询的时间复杂度可以降低到 $$O(logN)$$,以改进性能。
  • 并发机制改进:
    • 取消 segments 字段,直接采用 transient volatile HashEntry<K,V>[] table 保存数据,采用 table 数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率
    • 使用 CAS + sychronized 操作,在特定场景进行无锁并发操作。使用 Unsafe、LongAdder 之类底层手段,进行极端情况的优化。现代 JDK 中,synchronized 已经被不断优化,可以不再过分担心性能差异,另外,相比于 ReentrantLock,它可以减少内存消耗,这是个非常大的优势。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果table为空,初始化;否则,根据hash值计算得到数组索引i,如果tab[i]为空,直接新建节点Node即可。注:tab[i]实质为链表或者红黑树的首节点。
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果tab[i]不为空并且hash值为MOVED,说明该链表正在进行transfer操作,返回扩容完成后的table。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 针对首个节点进行加锁操作,而不是segment,进一步减少线程冲突
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果在链表中找到值为key的节点e,直接设置e.val = value即可。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 如果没有找到值为key的节点,直接新建Node并加入链表即可。
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果首节点为TreeBin类型,说明为红黑树结构,执行putTreeVal操作。
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果节点数>=8,那么转换链表结构为红黑树结构。
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 计数增加1,有可能触发transfer操作(扩容)。
addCount(1L, binCount);
return null;
}

ConcurrentHashMap 的实战

示例摘自:《Java 业务开发常见错误 100 例》

ConcurrentHashMap 错误示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//线程个数
private static int THREAD_COUNT = 10;
//总元素数量
private static int ITEM_COUNT = 1000;

public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
//初始900个元素
System.out.println("init size:" + concurrentHashMap.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
//使用线程池并发处理逻辑
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
//查询还需要补充多少个元素
int gap = ITEM_COUNT - concurrentHashMap.size();
System.out.println("gap size:" + gap);
//补充元素
concurrentHashMap.putAll(getData(gap));
}));
//等待所有任务完成
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
//最后元素个数会是1000吗?
System.out.println("finish size:" + concurrentHashMap.size());
}

private static ConcurrentHashMap<String, Long> getData(int count) {
return LongStream.rangeClosed(1, count)
.boxed()
.collect(
Collectors.toConcurrentMap(
i -> UUID.randomUUID().toString(),
i -> i,
(o1, o2) -> o1,
ConcurrentHashMap::new));
}

初始大小 900 符合预期,还需要填充 100 个元素。

预期结果为 1000 个元素,实际大于 1000 个元素。

【分析】

ConcurrentHashMap 对外提供的方法或能力的限制:

  • 使用了 ConcurrentHashMap,不代表对它的多个操作之间的状态是一致的,是没有其他线程在操作它的,如果需要确保需要手动加锁。
  • 诸如 size、isEmpty 和 containsValue 等聚合方法,在并发情况下可能会反映 ConcurrentHashMap 的中间状态。因此在并发情况下,这些方法的返回值只能用作参考,而不能用于流程控制。显然,利用 size 方法计算差异值,是一个流程控制。
  • 诸如 putAll 这样的聚合方法也不能确保原子性,在 putAll 的过程中去获取数据可能会获取到部分数据。
ConcurrentHashMap 错误示例修正 1.0 版

通过 synchronized 加锁,当然可以保证数据一致性,但是牺牲了 ConcurrentHashMap 的性能,没哟真正发挥出 ConcurrentHashMap 的特性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

//线程个数
private static int THREAD_COUNT = 10;
//总元素数量
private static int ITEM_COUNT = 1000;

public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
//初始900个元素
System.out.println("init size:" + concurrentHashMap.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
//使用线程池并发处理逻辑
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
//查询还需要补充多少个元素
synchronized (concurrentHashMap) {
int gap = ITEM_COUNT - concurrentHashMap.size();
System.out.println("gap size:" + gap);
//补充元素
concurrentHashMap.putAll(getData(gap));
}
}));
//等待所有任务完成
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
//最后元素个数会是1000吗?
System.out.println("finish size:" + concurrentHashMap.size());
}

private static ConcurrentHashMap<String, Long> getData(int count) {
return LongStream.rangeClosed(1, count)
.boxed()
.collect(
Collectors.toConcurrentMap(
i -> UUID.randomUUID().toString(),
i -> i,
(o1, o2) -> o1,
ConcurrentHashMap::new));
}
ConcurrentHashMap 错误示例修正 2.0 版
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

//循环次数
private static int LOOP_COUNT = 10000000;
//线程个数
private static int THREAD_COUNT = 10;
//总元素数量
private static int ITEM_COUNT = 1000;

public static void main(String[] args) throws InterruptedException {
StopWatch stopWatch = new StopWatch();
stopWatch.start("normaluse");
Map<String, Long> normaluse = normaluse();
stopWatch.stop();
Assert.isTrue(normaluse.size() == ITEM_COUNT, "normaluse size error");
Assert.isTrue(normaluse.values().stream()
.mapToLong(aLong -> aLong).reduce(0, Long::sum) == LOOP_COUNT
, "normaluse count error");
stopWatch.start("gooduse");
Map<String, Long> gooduse = gooduse();
stopWatch.stop();
Assert.isTrue(gooduse.size() == ITEM_COUNT, "gooduse size error");
Assert.isTrue(gooduse.values().stream()
.mapToLong(l -> l)
.reduce(0, Long::sum) == LOOP_COUNT
, "gooduse count error");
System.out.println(stopWatch.prettyPrint());
}

private static Map<String, Long> normaluse() throws InterruptedException {
ConcurrentHashMap<String, Long> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
synchronized (freqs) {
if (freqs.containsKey(key)) {
freqs.put(key, freqs.get(key) + 1);
} else {
freqs.put(key, 1L);
}
}
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
return freqs;
}

private static Map<String, Long> gooduse() throws InterruptedException {
ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
return freqs.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().longValue())
);
}

List

CopyOnWriteArrayList

CopyOnWriteArrayList 是线程安全的 ArrayListCopyOnWrite 字面意思为写的时候会将共享变量新复制一份出来。复制的好处在于读操作是无锁的(也就是无阻塞)。

CopyOnWriteArrayList 仅适用于写操作非常少的场景,而且能够容忍读写的短暂不一致。如果读写比例均衡或者有大量写操作的话,使用 CopyOnWriteArrayList 的性能会非常糟糕。

CopyOnWriteArrayList 原理

CopyOnWriteArrayList 内部维护了一个数组,成员变量 array 就指向这个内部数组,所有的读操作都是基于 array 进行的,如下图所示,迭代器 Iterator 遍历的就是 array 数组。

img

  • lock - 执行写时复制操作,需要使用可重入锁加锁
  • array - 对象数组,用于存放元素
1
2
3
4
5
/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock();

/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;

img

(1)读操作

CopyOnWriteAarrayList 中,读操作不同步,因为它们在内部数组的快照上工作,所以多个迭代器可以同时遍历而不会相互阻塞(图 1,2,4)。

CopyOnWriteArrayList 的读操作是不用加锁的,性能很高。

1
2
3
4
5
6
public E get(int index) {
return get(getArray(), index);
}
private E get(Object[] a, int index) {
return (E) a[index];
}

(2)写操作

所有的写操作都是同步的。他们在备份数组(图 3)的副本上工作。写操作完成后,后备阵列将被替换为复制的阵列,并释放锁定。支持数组变得易变,所以替换数组的调用是原子(图 5)。

写操作后创建的迭代器将能够看到修改的结构(图 6,7)。

写时复制集合返回的迭代器不会抛出 ConcurrentModificationException,因为它们在数组的快照上工作,并且无论后续的修改(2,4)如何,都会像迭代器创建时那样完全返回元素。

添加操作 - 添加的逻辑很简单,先将原容器 copy 一份,然后在新副本上执行写操作,之后再切换引用。当然此过程是要加锁的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean add(E e) {
//ReentrantLock加锁,保证线程安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
//拷贝原容器,长度为原容器长度加一
Object[] newElements = Arrays.copyOf(elements, len + 1);
//在新副本上执行添加操作
newElements[len] = e;
//将原容器引用指向新副本
setArray(newElements);
return true;
} finally {
//解锁
lock.unlock();
}
}

删除操作 - 删除操作同理,将除要删除元素之外的其他元素拷贝到新副本中,然后切换引用,将原容器引用指向新副本。同属写操作,需要加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public E remove(int index) {
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
if (numMoved == 0)
//如果要删除的是列表末端数据,拷贝前len-1个数据到新副本上,再切换引用
setArray(Arrays.copyOf(elements, len - 1));
else {
//否则,将除要删除元素之外的其他元素拷贝到新副本中,并切换引用
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
setArray(newElements);
}
return oldValue;
} finally {
//解锁
lock.unlock();
}
}

CopyOnWriteArrayList 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class CopyOnWriteArrayListDemo {

static class ReadTask implements Runnable {

List<String> list;

ReadTask(List<String> list) {
this.list = list;
}

public void run() {
for (String str : list) {
System.out.println(str);
}
}
}

static class WriteTask implements Runnable {

List<String> list;
int index;

WriteTask(List<String> list, int index) {
this.list = list;
this.index = index;
}

public void run() {
list.remove(index);
list.add(index, "write_" + index);
}
}

public void run() {
final int NUM = 10;
// ArrayList 在并发迭代访问时会抛出 ConcurrentModificationException 异常
// List<String> list = new ArrayList<>();
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < NUM; i++) {
list.add("main_" + i);
}
ExecutorService executorService = Executors.newFixedThreadPool(NUM);
for (int i = 0; i < NUM; i++) {
executorService.execute(new ReadTask(list));
executorService.execute(new WriteTask(list, i));
}
executorService.shutdown();
}

public static void main(String[] args) {
new CopyOnWriteArrayListDemo().run();
}
}

CopyOnWriteArrayList 实战

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Slf4j
public class WrongCopyOnWriteList {

public static void main(String[] args) {
testRead();
testWrite();
}

public static Map testWrite() {
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
StopWatch stopWatch = new StopWatch();
int loopCount = 100000;
stopWatch.start("Write:copyOnWriteArrayList");
IntStream.rangeClosed(1, loopCount)
.parallel()
.forEach(__ -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
stopWatch.start("Write:synchronizedList");
IntStream.rangeClosed(1, loopCount)
.parallel()
.forEach(__ -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
Map result = new HashMap();
result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
result.put("synchronizedList", synchronizedList.size());
return result;
}

private static void addAll(List<Integer> list) {
list.addAll(IntStream.rangeClosed(1, 1000000).boxed().collect(Collectors.toList()));
}

public static Map testRead() {
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
addAll(copyOnWriteArrayList);
addAll(synchronizedList);
StopWatch stopWatch = new StopWatch();
int loopCount = 1000000;
int count = copyOnWriteArrayList.size();
stopWatch.start("Read:copyOnWriteArrayList");
IntStream.rangeClosed(1, loopCount)
.parallel()
.forEach(__ -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
stopWatch.start("Read:synchronizedList");
IntStream.range(0, loopCount)
.parallel()
.forEach(__ -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
Map result = new HashMap();
result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
result.put("synchronizedList", synchronizedList.size());
return result;
}

}

读性能差不多是写性能的一百倍。

Set

Set 接口的两个实现是 CopyOnWriteArraySet 和 ConcurrentSkipListSet,使用场景可以参考前面讲述的 CopyOnWriteArrayList 和 ConcurrentSkipListMap,它们的原理都是一样的。

Queue

Java 并发包里面 Queue 这类并发容器是最复杂的,你可以从以下两个维度来分类。一个维度是阻塞与非阻塞,所谓阻塞指的是:当队列已满时,入队操作阻塞;当队列已空时,出队操作阻塞。另一个维度是单端与双端,单端指的是只能队尾入队,队首出队;而双端指的是队首队尾皆可入队出队。Java 并发包里阻塞队列都用 Blocking 关键字标识,单端队列使用 Queue 标识,双端队列使用 Deque 标识

BlockingQueue

BlockingQueue 顾名思义,是一个阻塞队列。**BlockingQueue 基本都是基于锁实现。在 BlockingQueue 中,当队列已满时,入队操作阻塞;当队列已空时,出队操作阻塞**。

BlockingQueue 接口定义如下:

1
public interface BlockingQueue<E> extends Queue<E> {}

核心 API:

1
2
3
4
// 获取并移除队列头结点,如果必要,其会等待直到队列出现元素
E take() throws InterruptedException;
// 插入元素,如果队列已满,则等待直到队列出现空闲空间
void put(E e) throws InterruptedException;

BlockingQueue 对插入操作、移除操作、获取元素操作提供了四种不同的方法用于不同的场景中使用:

  • 抛出异常;
  • 返回特殊值(nulltrue/false,取决于具体的操作);
  • 阻塞等待此操作,直到这个操作成功;
  • 阻塞等待此操作,直到成功或者超时指定时间。

总结如下:

Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable

BlockingQueue 的各个实现类都遵循了这些规则。

BlockingQueue 不接受 null 值元素。

JDK 提供了以下阻塞队列:

  • ArrayBlockingQueue - 一个由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue - 一个由链表结构组成的有界阻塞队列
  • PriorityBlockingQueue - 一个支持优先级排序的无界阻塞队列
  • SynchronousQueue - 一个不存储元素的阻塞队列
  • DelayQueue - 一个使用优先级队列实现的无界阻塞队列。
  • LinkedTransferQueue - 一个由链表结构组成的无界阻塞队列

BlockingQueue 基本都是基于锁实现。

PriorityBlockingQueue 类

PriorityBlockingQueue 类定义如下:

1
2
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {}

PriorityBlockingQueue 要点

  • PriorityBlockingQueue 可以视为 PriorityQueue 的线程安全版本。
  • PriorityBlockingQueue 实现了 BlockingQueue,也是一个阻塞队列。
  • PriorityBlockingQueue 实现了 Serializable,支持序列化。
  • PriorityBlockingQueue 不接受 null 值元素。
  • PriorityBlockingQueue 的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。

PriorityBlockingQueue 原理

PriorityBlockingQueue 有两个重要成员:

1
2
private transient Object[] queue;
private final ReentrantLock lock;
  • queue 是一个 Object 数组,用于保存 PriorityBlockingQueue 的元素。
  • 而可重入锁 lock 则用于在执行插入、删除操作时,保证这个方法在当前线程释放锁之前,其他线程不能访问。

PriorityBlockingQueue 的容量虽然有初始化大小,但是不限制大小,如果当前容量已满,插入新元素时会自动扩容。

ArrayBlockingQueue 类

ArrayBlockingQueue 是由数组结构组成的有界阻塞队列

ArrayBlockingQueue 要点

ArrayBlockingQueue 类定义如下:

1
2
3
4
5
6
7
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 数组的大小就决定了队列的边界,所以初始化时必须指定容量
public ArrayBlockingQueue(int capacity) { //... }
public ArrayBlockingQueue(int capacity, boolean fair) { //... }
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { //... }
}

说明:

  • ArrayBlockingQueue 实现了 BlockingQueue,也是一个阻塞队列。
  • ArrayBlockingQueue 实现了 Serializable,支持序列化。
  • ArrayBlockingQueue 是基于数组实现的有界阻塞队列。所以初始化时必须指定容量。

ArrayBlockingQueue 原理

ArrayBlockingQueue 的重要成员如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 用于存放元素的数组
final Object[] items;
// 下一次读取操作的位置
int takeIndex;
// 下一次写入操作的位置
int putIndex;
// 队列中的元素数量
int count;

// 以下几个就是控制并发用的同步器
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

ArrayBlockingQueue 内部以 final 的数组保存数据,数组的大小就决定了队列的边界。

ArrayBlockingQueue 实现并发同步的原理就是,读操作和写操作都需要获取到 AQS 独占锁才能进行操作。

  • 如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程。
  • 如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除,然后唤醒写线程队列的第一个等待线程。

对于 ArrayBlockingQueue,我们可以在构造的时候指定以下三个参数:

  • 队列容量,其限制了队列中最多允许的元素个数;
  • 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁;
  • 可以指定用一个集合来初始化,将此集合中的元素在构造方法期间就先添加到队列中。

LinkedBlockingQueue 类

LinkedBlockingQueue 是由链表结构组成的有界阻塞队列。容易被误解为无边界,但其实其行为和内部代码都是基于有界的逻辑实现的,只不过如果我们没有在创建队列时就指定容量,那么其容量限制就自动被设置为 Integer.MAX_VALUE,成为了无界队列。

LinkedBlockingQueue 要点

LinkedBlockingQueue 类定义如下:

1
2
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {}
  • LinkedBlockingQueue 实现了 BlockingQueue,也是一个阻塞队列。
  • LinkedBlockingQueue 实现了 Serializable,支持序列化。
  • LinkedBlockingQueue 是基于单链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。
  • LinkedBlockingQueue 中元素按照插入顺序保存(FIFO)。

LinkedBlockingQueue 原理

LinkedBlockingQueue 中的重要数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 队列容量
private final int capacity;
// 队列中的元素数量
private final AtomicInteger count = new AtomicInteger(0);
// 队头
private transient Node<E> head;
// 队尾
private transient Node<E> last;

// take, poll, peek 等读操作的方法需要获取到这个锁
private final ReentrantLock takeLock = new ReentrantLock();
// 如果读操作的时候队列是空的,那么等待 notEmpty 条件
private final Condition notEmpty = takeLock.newCondition();
// put, offer 等写操作的方法需要获取到这个锁
private final ReentrantLock putLock = new ReentrantLock();
// 如果写操作的时候队列是满的,那么等待 notFull 条件
private final Condition notFull = putLock.newCondition();

这里用了两对 LockCondition,简单介绍如下:

  • takeLocknotEmpty 搭配:如果要获取(take)一个元素,需要获取 takeLock 锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。
  • putLock 需要和 notFull 搭配:如果要插入(put)一个元素,需要获取 putLock 锁,但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condition)。

SynchronousQueue 类

SynchronousQueue 是不存储元素的阻塞队列。每个删除操作都要等待插入操作,反之每个插入操作也都要等待删除动作。那么这个队列的容量是多少呢?是 1 吗?其实不是的,其内部容量是 0。

SynchronousQueue 定义如下:

1
2
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {}

SynchronousQueue 这个类,在线程池的实现类 ScheduledThreadPoolExecutor 中得到了应用。

SynchronousQueue 的队列其实是虚的,即队列容量为 0。数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。

SynchronousQueue 中不能使用 peek 方法(在这里这个方法直接返回 null),peek 方法的语义是只读取不移除,显然,这个方法的语义是不符合 SynchronousQueue 的特征的。

SynchronousQueue 也不能被迭代,因为根本就没有元素可以拿来迭代的。

虽然 SynchronousQueue 间接地实现了 Collection 接口,但是如果你将其当做 Collection 来用的话,那么集合是空的。

当然,SynchronousQueue 也不允许传递 null 值的(并发包中的容器类好像都不支持插入 null 值,因为 null 值往往用作其他用途,比如用于方法的返回值代表操作失败)。

ConcurrentLinkedDeque 类

Deque 的侧重点是支持对队列头尾都进行插入和删除,所以提供了特定的方法,如:

  • 尾部插入时需要的 addLast(e)offerLast(e)
  • 尾部删除所需要的 removeLast()pollLast()

Queue 的并发应用

Queue 被广泛使用在生产者 - 消费者场景。而在并发场景,利用 BlockingQueue 的阻塞机制,可以减少很多并发协调工作。

这么多并发 Queue 的实现,如何选择呢?

  • 考虑应用场景中对队列边界的要求。ArrayBlockingQueue 是有明确的容量限制的,而 LinkedBlockingQueue 则取决于我们是否在创建时指定,SynchronousQueue 则干脆不能缓存任何元素。
  • 从空间利用角度,数组结构的 ArrayBlockingQueue 要比 LinkedBlockingQueue 紧凑,因为其不需要创建所谓节点,但是其初始分配阶段就需要一段连续的空间,所以初始内存需求更大。
  • 通用场景中,LinkedBlockingQueue 的吞吐量一般优于 ArrayBlockingQueue,因为它实现了更加细粒度的锁操作。
  • ArrayBlockingQueue 实现比较简单,性能更好预测,属于表现稳定的“选手”。
  • 可能令人意外的是,很多时候 SynchronousQueue 的性能表现,往往大大超过其他实现,尤其是在队列元素较小的场景。

参考资料

深入剖析共识性算法 Raft

img

Raft 简介

Raft 是一种为了管理日志复制的分布式共识性算法。从本质上说,Raft 算法是通过一切以领导者为准的方式,实现一系列值的共识和各节点日志的一致

Raft 出现之前,Paxos 一直是分布式共识性算法的标准。Paxos 难以理解,更难以实现。Raft 的设计目标是简化 Paxos,使得算法既容易理解,也容易实现

Paxos 和 Raft 都是分布式共识性算法,这个过程如同投票选举领袖(Leader),参选者(Candidate)需要说服大多数投票者(Follower)投票给他,一旦选举出领袖,就由领袖发号施令。Paxos 和 Raft 的区别在于选举的具体过程不同。

Raft 可以解决分布式 CAP 理论中的 CP,即 一致性(C:Consistency) 和 _分区容忍性(P:Partition Tolerance)_,并不能解决 可用性(A:Availability) 的问题。

分布式共识性

分布式共识性 (distributed consensus) 是分布式系统中最基本的问题,用来保证一个分布式系统的可靠性以及容错能力。简单来说,**分布式共识性是指多个服务器的保持状态一致**。

在分布式系统中,可能出现各种意外(断电、网络拥塞、CPU/内存耗尽等等),使得服务器宕机或无法访问,最终导致无法和其他服务器保持状态一致。为了应对这种情况,就需要有一种一致性协议来进行容错,使得分布式系统中即使有部分服务器宕机或无法访问,整体依然可以对外提供服务。

以容错方式达成一致,自然不能要求所有服务器都达成一致状态,只要超过半数以上的服务器达成一致就可以了。假设有 N 台服务器, 大于等于 N/2 + 1 台服务器就算是半数以上了 。

复制状态机

复制状态机(Replicated State Machines) 是指一组服务器上的状态机产生相同状态的副本,并且在一些机器宕掉的情况下也可以继续运行。一致性算法管理着来自客户端指令的复制日志。状态机从日志中处理相同顺序的相同指令,所以产生的结果也是相同的。

img

复制状态机通常都是基于复制日志实现的,如上图。每一个服务器存储一个包含一系列指令的日志,并且按照日志的顺序进行执行。每一个日志都按照相同的顺序包含相同的指令,所以每一个服务器都执行相同的指令序列。因为每个状态机都是确定的,每一次执行操作都产生相同的状态和同样的序列。

保证复制日志相同就是一致性算法的工作了。在一台服务器上,一致性模块接收客户端发送来的指令然后增加到自己的日志中去。它和其他服务器上的一致性模块进行通信来保证每一个服务器上的日志最终都以相同的顺序包含相同的请求,尽管有些服务器会宕机。一旦指令被正确的复制,每一个服务器的状态机按照日志顺序处理他们,然后输出结果被返回给客户端。因此,服务器集群看起来形成一个高可靠的状态机。

实际系统中使用的一致性算法通常含有以下特性:

  • 安全性保证(绝对不会返回一个错误的结果):在非拜占庭错误情况下,包括网络延迟、分区、丢包、冗余和乱序等错误都可以保证正确。
  • 可用性:集群中只要有大多数的机器可运行并且能够相互通信、和客户端通信,就可以保证可用。因此,一个典型的包含 5 个节点的集群可以容忍两个节点的失败。服务器被停止就认为是失败。他们当有稳定的存储的时候可以从状态中恢复回来并重新加入集群。
  • 不依赖时序来保证一致性:物理时钟错误或者极端的消息延迟只有在最坏情况下才会导致可用性问题。
  • 通常情况下,一条指令可以尽可能快的在集群中大多数节点响应一轮远程过程调用时完成。小部分比较慢的节点不会影响系统整体的性能。

RAFT 应用

RAFT 可以做什么?

通过 RAFT 提供的复制状态机,可以解决分布式系统的复制、修复、节点管理等问题。Raft 极大的简化当前分布式系统的设计与实现,让开发者只关注于业务逻辑,将其抽象实现成对应的状态机即可。基于这套框架,可以构建很多分布式应用:

  • 分布式存储系统:比如分布式消息队列、分布式块系统、分布式文件系统、分布式表格系统等。代表有:Redis、Etcd、Consul
  • 高可靠元信息管理:比如各类 Master 模块的 HA

Raft 基础

Raft 将一致性问题分解成了三个子问题:

  • 选举 Leader
  • 日志复制
  • 安全性

在后续章节,会详细讲解这个子问题。现在,先了解一下 Raft 的一些核心概念。

服务器角色

在 Raft 中,任何时刻,每个服务器都处于这三个角色之一 :

  • Leader - 领导者,通常一个系统中是一主(Leader)多从(Follower)。Leader 负责处理所有的客户端请求
  • Follower - 跟随者,不会发送任何请求,只是简单的 响应来自 Leader 或者 Candidate 的请求
  • Candidate - 参选者,选举新 Leader 时的临时角色。

img

:bulb: 图示说明:

  • Follower 只响应来自其他服务器的请求。在一定时限内,如果 Follower 接收不到消息,就会转变成 Candidate,并发起选举。
  • Candidate 向 Follower 发起投票请求,如果获得集群中半数以上的选票,就会转变为 Leader。
  • 在一个 Term 内,Leader 始终保持不变,直到下线了。Leader 需要周期性向所有 Follower 发送心跳消息,以阻止 Follower 转变为 Candidate。

任期

img

Raft 把时间分割成任意长度的 任期(Term),任期用连续的整数标记。每一段任期从一次选举开始。Raft 保证了在一个给定的任期内,最多只有一个领导者

  • 如果选举成功,Leader 会管理整个集群直到任期结束。
  • 如果选举失败,那么这个任期就会因为没有 Leader 而结束。

不同服务器节点观察到的任期转换状态可能不一样

  • 服务器节点可能观察到多次的任期转换。
  • 服务器节点也可能观察不到任何一次任期转换。

任期在 Raft 算法中充当逻辑时钟的作用,使得服务器节点可以查明一些过期的信息(比如过期的 Leader)。每个服务器节点都会存储一个当前任期号,这一编号在整个时期内单调的增长。当服务器之间通信的时候会交换当前任期号。

  • 如果一个服务器的当前任期号比其他人小,那么他会更新自己的编号到较大的编号值。
  • 如果一个 Candidate 或者 Leader 发现自己的任期号过期了,那么他会立即恢复成跟随者状态。
  • 如果一个节点接收到一个包含过期的任期号的请求,那么他会直接拒绝这个请求。

RPC

Raft 算法中服务器节点之间的通信使用 **_远程过程调用(RPC)_**。

基本的一致性算法只需要两种 RPC:

  • RequestVote RPC - 请求投票 RPC,由 Candidate 在选举期间发起。
  • AppendEntries RPC - 附加条目 RPC,由 Leader 发起,用来复制日志和提供一种心跳机制。

选举 Leader

选举规则

领导者心跳消息:Raft 使用一种心跳机制来触发 Leader 选举。Leader 需要周期性的向所有 Follower 发送心跳消息,以此维持 Leader 身份。

随机的竞选超时时间:每个 Follower 都设置了一个随机的竞选超时时间,一般为 150ms ~ 300ms,如果在竞选超时时间内没有收到 Leader 的心跳消息,就会认为当前 Term 没有可用的 Leader,并发起选举来选出新的 Leader。开始一次选举过程,Follower 先要增加自己的当前 Term 号,并转换为 Candidate

Candidate 会并行的向集群中的所有服务器节点发送投票请求(RequestVote RPC,它会保持当前状态直到以下三件事情之一发生:

  • 自己成为 Leader
  • 其他的服务器成为 Leader
  • 没有任何服务器成为 Leader

自己成为 Leader

  • 当一个 Candidate 从整个集群半数以上的服务器节点获得了针对同一个 Term 的选票,那么它就赢得了这次选举并成为 Leader。每个服务器最多会对一个 Term 投出一张选票,按照先来先服务(FIFO)的原则。_要求半数以上选票的规则确保了最多只会有一个 Candidate 赢得此次选举_。
  • 一旦 Candidate 赢得选举,就立即成为 Leader。然后它会向其他的服务器发送心跳消息来建立自己的权威并且阻止新的领导人的产生。

其他的服务器成为 Leader

等待投票期间,Candidate 可能会从其他的服务器接收到声明它是 Leader 的 AppendEntries RPC

  • 如果这个 Leader 的 Term 号(包含在此次的 RPC 中)不小于 Candidate 当前的 Term,那么 Candidate 会承认 Leader 合法并回到 Follower 状态。
  • 如果此次 RPC 中的 Term 号比自己小,那么 Candidate 就会拒绝这个消息并继续保持 Candidate 状态。

没有任何服务器成为 Leader

如果有多个 Follower 同时成为 Candidate,那么选票可能会被瓜分以至于没有 Candidate 可以赢得半数以上的投票。当这种情况发生的时候,每一个 Candidate 都会竞选超时,然后通过增加当前 Term 号来开始一轮新的选举。然而,没有其他机制的话,选票可能会被无限的重复瓜分。

Raft 算法使用随机选举超时时间的方法来确保很少会发生选票瓜分的情况,就算发生也能很快的解决。为了阻止选票起初就被瓜分,竞选超时时间是一个随机的时间,在一个固定的区间(例如 150-300 毫秒)随机选择,这样可以把选举都分散开。

  • 以至于在大多数情况下,只有一个服务器会超时,然后它赢得选举,成为 Leader,并在其他服务器超时之前发送心跳包。
  • 同样的机制也被用在选票瓜分的情况下:每一个 Candidate 在开始一次选举的时候会重置一个随机的选举超时时间,然后在超时时间内等待投票的结果;这样减少了在新的选举中另外的选票瓜分的可能性。

理解了上面的选举规则后,我们通过动图来加深认识。

单 Candidate 选举

(1)下图表示一个分布式系统的最初阶段,此时只有 Follower,没有 Leader。Follower A 等待一个随机的选举超时时间之后,没收到 Leader 发来的心跳消息。因此,将 Term 由 0 增加为 1,转换为 Candidate,进入选举状态。

img

(2)此时,A 向所有其他节点发送投票请求。

img

(3)其它节点会对投票请求进行回复,如果超过半数以上的节点投票了,那么该 Candidate 就会立即变成 Term 为 1 的 Leader。

img

(4)Leader 会周期性地发送心跳消息给所有 Follower,Follower 接收到心跳包,会重新开始计时。

img

多 Candidate 选举

(1)如果有多个 Follower 成为 Candidate,并且所获得票数相同,那么就需要重新开始投票。例如下图中 Candidate B 和 Candidate D 都发起 Term 为 4 的选举,且都获得两票,因此需要重新开始投票。

img

(2)当重新开始投票时,由于每个节点设置的随机竞选超时时间不同,因此能下一次再次出现多个 Candidate 并获得同样票数的概率很低。

img

小结

Raft 算法通过:领导者心跳消息、随机选举超时时间、得到大多数选票才通过原则、任期最新者优先、先来先服务的投票原则、等,保证了一个任期只有一位领导,也极大地减少了选举失败的情况。

日志复制

日志格式

日志由含日志索引(log index)的日志条目(log entry)组成。每个日志条目包含它被创建时的 Term 号(下图中方框中的数字),和一个复制状态机需要执行的指令。如果一个日志条目被复制到半数以上的服务器上,就被认为可以提交(Commit)了。

  • 日志条目中的 Term 号被用来检查是否出现不一致的情况,它实际上是创建这条日志的领导者的任期编号。
  • 日志条目中的日志索引用来表明它在日志中的位置,它是一个单调递增的整数。

img

Raft 日志同步保证如下两点:

  • 如果不同日志中的两个日志条目有着相同的日志索引和 Term,则它们所存储的命令是相同的
    • 这个特性基于这条原则:Leader 最多在一个 Term 内、在指定的一个日志索引上创建一条日志条目,同时日志条目在日志中的位置也从来不会改变。
  • 如果不同日志中的两个日志条目有着相同的日志索引和 Term,则它们之前的所有条目都是完全一样的
    • 这个特性由 AppendEntries RPC 的一个简单的一致性检查所保证。在发送 AppendEntries RPC 时,Leader 会把新日志条目之前的日志条目的日志索引和 Term 号一起发送。如果 Follower 在它的日志中找不到包含相同日志索引和 Term 号的日志条目,它就会拒绝接收新的日志条目。

日志复制流程

img

  1. Leader 负责处理所有客户端的请求。
  2. Leader 把请求作为日志条目加入到它的日志中,然后并行的向其他服务器发送 AppendEntries RPC 请求,要求 Follower 复制日志条目。
  3. Follower 复制成功后,返回确认消息。
  4. 当这个日志条目被半数以上的服务器复制后,Leader 提交这个日志条目到它的复制状态机,并向客户端返回执行结果。

注意:如果 Follower 崩溃或者运行缓慢,再或者网络丢包,Leader 会不断的重复尝试发送 AppendEntries RPC 请求 (尽管已经回复了客户端),直到所有的跟随者都最终复制了所有的日志条目。

下面,通过一组动图来加深认识:

(1)来自客户端的修改都会被传入 Leader。注意该修改还未被提交,只是写入日志中。

img

(2)Leader 会把修改复制到所有 Follower。

img

(3)Leader 会等待大多数的 Follower 也进行了修改,然后才将修改提交。

img

(4)此时 Leader 会通知的所有 Follower 让它们也提交修改,此时所有节点的值达成一致。

img

日志一致性

一般情况下,Leader 和 Followers 的日志保持一致,因此日志条目一致性检查通常不会失败。然而,Leader 崩溃可能会导致日志不一致:旧的 Leader 可能没有完全复制完日志中的所有条目。

Leader 和 Follower 日志不一致的可能

Leader 和 Follower 可能存在多种日志不一致的可能。

img

:bulb: 图示说明:

上图阐述了 Leader 和 Follower 可能存在多种日志不一致的可能,每一个方框表示一个日志条目,里面的数字表示任期号 。

当一个 Leader 成功当选时,Follower 可能出现以下情况(a-f):

  • 存在未更新日志条目,如(a、b)。
  • 存在未提交日志条目,如(c、d)。
  • 两种情况都存在,如(e、f)。

_例如,场景 f 可能会这样发生,某服务器在 Term2 的时候是 Leader,已附加了一些日志条目到自己的日志中,但在提交之前就崩溃了;很快这个机器就被重启了,在 Term3 重新被选为 Leader,并且又增加了一些日志条目到自己的日志中;在 Term 2 和 Term 3 的日志被提交之前,这个服务器又宕机了,并且在接下来的几个任期里一直处于宕机状态_。

Leader 和 Follower 日志一致的保证

Leader 通过强制 Followers 复制它的日志来处理日志的不一致,Followers 上的不一致的日志会被 Leader 的日志覆盖

  • Leader 为了使 Followers 的日志同自己的一致,Leader 需要找到 Followers 同它的日志一致的地方,然后覆盖 Followers 在该位置之后的条目。
  • Leader 会从后往前试,每次日志条目失败后尝试前一个日志条目,直到成功找到每个 Follower 的日志一致位点,然后向后逐条覆盖 Followers 在该位置之后的条目。

安全性

前面描述了 Raft 算法是如何选举 Leader 和复制日志的。

Raft 还增加了一些限制来完善 Raft 算法,以保证安全性:保证了任意 Leader 对于给定的 Term,都拥有了之前 Term 的所有被提交的日志条目。

选举限制

拥有最新的已提交的日志条目的 Follower 才有资格成为 Leader。

Raft 使用投票的方式来阻止一个 Candidate 赢得选举除非这个 Candidate 包含了所有已经提交的日志条目。 Candidate 为了赢得选举必须联系集群中的大部分节点,这意味着每一个已经提交的日志条目在这些服务器节点中肯定存在于至少一个节点上。如果 Candidate 的日志至少和大多数的服务器节点一样新(这个新的定义会在下面讨论),那么他一定持有了所有已经提交的日志条目。

RequestVote RPC 实现了这样的限制:RequestVote RPC 中包含了 Candidate 的日志信息, Follower 会拒绝掉那些日志没有自己新的投票请求

如何判断哪个日志条目比较新?

Raft 通过比较两份日志中最后一条日志条目的日志索引和 Term 来判断哪个日志比较新。

  • 先判断 Term,哪个数值大即代表哪个日志比较新。
  • 如果 Term 相同,再比较 日志索引,哪个数值大即代表哪个日志比较新。

提交旧任期的日志条目

一个当前 Term 的日志条目被复制到了半数以上的服务器上,Leader 就认为它是可以被提交的。如果这个 Leader 在提交日志条目前就下线了,后续的 Leader 可能会覆盖掉这个日志条目。

img

💡 图示说明:

上图解释了为什么 Leader 无法对旧 Term 的日志条目进行提交。

  • 阶段 (a) ,S1 是 Leader,且 S1 写入日志条目为 (Term 2,日志索引 2),只有 S2 复制了这个日志条目。
  • 阶段 (b),S1 下线,S5 被选举为 Term3 的 Leader。S5 写入日志条目为 (Term 3,日志索引 2)。
  • 阶段 (c),S5 下线,S1 重新上线,并被选举为 Term4 的 Leader。此时,Term 2 的那条日志条目已经被复制到了集群中的大多数节点上,但是还没有被提交。
  • 阶段 (d),S1 再次下线,S5 重新上线,并被重新选举为 Term3 的 Leader。然后 S5 覆盖了日志索引 2 处的日志。
  • 阶段 (e),如果阶段 (d) 还未发生,即 S1 再次下线之前,S1 把自己主导的日志条目复制到了大多数节点上,那么在后续 Term 里面这些新日志条目就会被提交。这样在同一时刻就同时保证了,之前的所有旧日志条目就会被提交。

Raft 永远不会通过计算副本数目的方式去提交一个之前 Term 内的日志条目。只有 Leader 当前 Term 里的日志条目通过计算副本数目可以被提交;一旦当前 Term 的日志条目以这种方式被提交,那么由于日志匹配特性,之前的日志条目也都会被间接的提交。

当 Leader 复制之前任期里的日志时,Raft 会为所有日志保留原始的 Term,这在提交规则上产生了额外的复杂性。在其他的一致性算法中,如果一个新的领导人要重新复制之前的任期里的日志时,它必须使用当前新的任期号。Raft 使用的方法更加容易辨别出日志,因为它可以随着时间和日志的变化对日志维护着同一个任期编号。另外,和其他的算法相比,Raft 中的新领导人只需要发送更少日志条目(其他算法中必须在他们被提交之前发送更多的冗余日志条目来为他们重新编号)。

日志压缩

在实际的系统中,不能让日志无限膨胀,否则系统重启时需要花很长的时间进行恢复,从而影响可用性。Raft 采用对整个系统进行快照来解决,快照之前的日志都可以丢弃。

每个副本独立的对自己的系统状态生成快照,并且只能对已经提交的日志条目生成快照。

快照包含以下内容:

  • 日志元数据。最后一条已提交的日志条目的日志索引和 Term。这两个值在快照之后的第一条日志条目的 AppendEntries RPC 的完整性检查的时候会被用上。
  • 系统当前状态。

当 Leader 要发送某个日志条目,落后太多的 Follower 的日志条目会被丢弃,Leader 会将快照发给 Follower。或者新上线一台机器时,也会发送快照给它。

img

生成快照的频率要适中,频率过高会消耗大量 I/O 带宽;频率过低,一旦需要执行恢复操作,会丢失大量数据,影响可用性。推荐当日志达到某个固定的大小时生成快照。

生成一次快照可能耗时过长,影响正常日志同步。可以通过使用 copy-on-write 技术避免快照过程影响正常日志同步。

说明:本文仅阐述 Raft 算法的核心内容,不包括算法论证、评估等

参考资料

Redis 事务

Redis 仅支持“非严格”的事务。所谓“非严格”是指:Redis 事务保证“全部执行命令”;但是,Redis 事务“不支持回滚”。

关键词:事务ACIDMULTIEXECDISCARDWATCH

Redis 事务简介

什么是 ACID

ACID 是数据库事务正确执行的四个基本要素。

  • 原子性(Atomicity)
    • 事务被视为不可分割的最小单元,事务中的所有操作要么全部提交成功,要么全部失败回滚
    • 回滚可以用日志来实现,日志记录着事务所执行的修改操作,在回滚时反向执行这些修改操作即可。
  • 一致性(Consistency)
    • 数据库在事务执行前后都保持一致性状态。
    • 在一致性状态下,所有事务对一个数据的读取结果都是相同的。
  • 隔离性(Isolation)
    • 一个事务所做的修改在最终提交以前,对其它事务是不可见的。
  • 持久性(Durability)
    • 一旦事务提交,则其所做的修改将会永远保存到数据库中。即使系统发生崩溃,事务执行的结果也不能丢失。
    • 可以通过数据库备份和恢复来实现,在系统发生奔溃时,使用备份的数据库进行数据恢复。

一个支持事务(Transaction)中的数据库系统,必需要具有这四种特性,否则在事务过程(Transaction processing)当中无法保证数据的正确性,交易过程极可能达不到交易。

  • 只有满足一致性,事务的执行结果才是正确的。
  • 在无并发的情况下,事务串行执行,隔离性一定能够满足。此时只要能满足原子性,就一定能满足一致性。
  • 在并发的情况下,多个事务并行执行,事务不仅要满足原子性,还需要满足隔离性,才能满足一致性。
  • 事务满足持久化是为了能应对系统崩溃的情况。

ACID

Redis 事务的特性

Redis 的事务总是支持 ACID 中的原子性、一致性和隔离性, 当服务器运行在 AOF 持久化模式下, 并且 appendfsync 选项的值为 always 时, 事务也具有持久性。

但需要注意的是:Redis 仅支持“非严格”的事务。这里的“非严格”,其实指的是 Redis 事务只能部分保证 ACID 中的原子性。

  • Redis 事务保证全部执行命令 - Redis 事务中的多个命令会被打包到事务队列中,然后按先进先出(FIFO)的顺序执行。事务在执行过程中不会被中断,当事务队列中的所有命令都被执行完毕之后,事务才会结束。
  • Redis 事务不支持回滚 - 如果命令执行失败不会回滚,而是会继续执行下去。

Redis 官方的事务特性文档给出的不支持回滚的理由是:

  • Redis 命令只会因为错误的语法而失败,或是命令用在了错误类型的键上面。
  • 因为不需要对回滚进行支持,所以 Redis 的内部可以保持简单且快速。

Redis 事务应用

MULTIEXECDISCARDWATCH 是 Redis 事务相关的命令。

事务可以一次执行多个命令, 并且有以下两个重要的保证:

  • 事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
  • 事务是一个原子操作:事务中的命令要么全部被执行,要么全部都不执行。

MULTI

MULTI 命令用于开启一个事务,它总是返回 OK 。

MULTI 执行之后, 客户端可以继续向服务器发送任意多条命令, 这些命令不会立即被执行, 而是被放到一个队列中, 当 EXEC 命令被调用时, 所有队列中的命令才会被执行。

以下是一个事务例子, 它原子地增加了 foobar 两个键的值:

1
2
3
4
5
6
7
8
9
> MULTI
OK
> INCR foo
QUEUED
> INCR bar
QUEUED
> EXEC
1) (integer) 1
2) (integer) 1

EXEC

EXEC 命令负责触发并执行事务中的所有命令。

  • 如果客户端在使用 MULTI 开启了一个事务之后,却因为断线而没有成功执行 EXEC ,那么事务中的所有命令都不会被执行。
  • 另一方面,如果客户端成功在开启事务之后执行 EXEC ,那么事务中的所有命令都会被执行。

MULTIEXEC 中的操作将会一次性发送给服务器,而不是一条一条发送,这种方式称为流水线,它可以减少客户端与服务器之间的网络通信次数从而提升性能。

DISCARD

当执行 DISCARD 命令时, 事务会被放弃, 事务队列会被清空, 并且客户端会从事务状态中退出。

示例:

1
2
3
4
5
6
7
8
9
10
> SET foo 1
OK
> MULTI
OK
> INCR foo
QUEUED
> DISCARD
OK
> GET foo
"1"

WATCH

WATCH 命令可以为 Redis 事务提供 check-and-set (CAS)行为。

WATCH 的键会被监视,并会发觉这些键是否被改动过了。 如果有至少一个被监视的键在 EXEC 执行之前被修改了, 那么整个事务都会被取消, EXEC 返回 nil-reply 来表示事务已经失败。

1
2
3
4
5
6
WATCH mykey
val = GET mykey
val = val + 1
MULTI
SET mykey $val
EXEC

使用上面的代码, 如果在 WATCH 执行之后, EXEC 执行之前, 有其他客户端修改了 mykey 的值, 那么当前客户端的事务就会失败。 程序需要做的, 就是不断重试这个操作, 直到没有发生碰撞为止。

这种形式的锁被称作乐观锁, 它是一种非常强大的锁机制。 并且因为大多数情况下, 不同的客户端会访问不同的键, 碰撞的情况一般都很少, 所以通常并不需要进行重试。

WATCH 使得 EXEC 命令需要有条件地执行:事务只能在所有被监视键都没有被修改的前提下执行,如果这个前提不能满足的话,事务就不会被执行。

WATCH 命令可以被调用多次。对键的监视从 WATCH 执行之后开始生效,直到调用 EXEC 为止。

用户还可以在单个 WATCH 命令中监视任意多个键,例如:

1
2
redis> WATCH key1 key2 key3
OK

取消 WATCH 的场景

EXEC 被调用时, 不管事务是否成功执行, 对所有键的监视都会被取消。另外, 当客户端断开连接时, 该客户端对键的监视也会被取消。

使用无参数的 UNWATCH 命令可以手动取消对所有键的监视。 对于一些需要改动多个键的事务, 有时候程序需要同时对多个键进行加锁, 然后检查这些键的当前值是否符合程序的要求。 当值达不到要求时, 就可以使用 UNWATCH 命令来取消目前对键的监视, 中途放弃这个事务, 并等待事务的下次尝试。

使用 WATCH 创建原子操作

WATCH 可以用于创建 Redis 没有内置的原子操作。

举个例子,以下代码实现了原创的 ZPOP 命令,它可以原子地弹出有序集合中分值(score)最小的元素:

1
2
3
4
5
WATCH zset
element = ZRANGE zset 0 0
MULTI
ZREM zset element
EXEC

参考资料

Redis 脚本

Redis 脚本使用 Lua 解释器来执行脚本。 Redis 2.6 版本通过内嵌支持 Lua 环境。

关键词:Lua

为什么使用 Lua

Lua 是一种轻量小巧的脚本语言,用标准 C 语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

在 Redis 中,执行单一命令是原子性操作,所以不会出现并发问题。但有的业务场景下,需要执行多个命令,同时确保不出现并发问题,这就需要用到 Lua 脚本了。

Redis 执行 Lua 是原子操作。因为 Redis 使用串行化的方式来执行 Redis 命令, 所以在任何特定时间里, 最多都只会有一个脚本能够被放进 Lua 环境里面运行, 因此, 整个 Redis 服务器只需要创建一个 Lua 环境即可。

由于,Redis 执行 Lua 具有原子性,所以常被用于需要原子性执行多命令的场景。

Redis 脚本命令

命令 说明
EVAL EVAL 命令为客户端输入的脚本在 Lua 环境中定义一个函数, 并通过调用这个函数来执行脚本。
EVALSHA EVALSHA 命令通过直接调用 Lua 环境中已定义的函数来执行脚本。
SCRIPT_FLUSH SCRIPT_FLUSH 命令会清空服务器 lua_scripts 字典中保存的脚本, 并重置 Lua 环境。
SCRIPT_EXISTS SCRIPT_EXISTS 命令接受一个或多个 SHA1 校验和为参数, 并通过检查 lua_scripts 字典来确认校验和对应的脚本是否存在。
SCRIPT_LOAD SCRIPT_LOAD 命令接受一个 Lua 脚本为参数, 为该脚本在 Lua 环境中创建函数, 并将脚本保存到 lua_scripts 字典中。
SCRIPT_KILL SCRIPT_KILL 命令用于停止正在执行的脚本。

Redis 执行 Lua 的工作流程

为了在 Redis 服务器中执行 Lua 脚本, Redis 在服务器内嵌了一个 Lua 环境(environment), 并对这个 Lua 环境进行了一系列修改, 从而确保这个 Lua 环境可以满足 Redis 服务器的需要。

Redis 服务器创建并修改 Lua 环境的整个过程由以下步骤组成:

  1. 创建一个基础的 Lua 环境, 之后的所有修改都是针对这个环境进行的。
  2. 载入多个函数库到 Lua 环境里面, 让 Lua 脚本可以使用这些函数库来进行数据操作。
  3. 创建全局表格 redis , 这个表格包含了对 Redis 进行操作的函数, 比如用于在 Lua 脚本中执行 Redis 命令的 redis.call 函数。
  4. 使用 Redis 自制的随机函数来替换 Lua 原有的带有副作用的随机函数, 从而避免在脚本中引入副作用。
  5. 创建排序辅助函数, Lua 环境使用这个辅佐函数来对一部分 Redis 命令的结果进行排序, 从而消除这些命令的不确定性。
  6. 创建 redis.pcall 函数的错误报告辅助函数, 这个函数可以提供更详细的出错信息。
  7. 对 Lua 环境里面的全局环境进行保护, 防止用户在执行 Lua 脚本的过程中, 将额外的全局变量添加到了 Lua 环境里面。
  8. 将完成修改的 Lua 环境保存到服务器状态的 lua 属性里面, 等待执行服务器传来的 Lua 脚本。

Redis 执行 Lua 的要点

  • Redis 服务器专门使用一个伪客户端来执行 Lua 脚本中包含的 Redis 命令。
  • Redis 使用脚本字典来保存所有被 EVAL 命令执行过, 或者被 SCRIPT_LOAD 命令载入过的 Lua 脚本, 这些脚本可以用于实现 SCRIPT_EXISTS 命令, 以及实现脚本复制功能。
  • 服务器在执行脚本之前, 会为 Lua 环境设置一个超时处理钩子, 当脚本出现超时运行情况时, 客户端可以通过向服务器发送 SCRIPT_KILL 命令来让钩子停止正在执行的脚本, 或者发送 SHUTDOWN nosave 命令来让钩子关闭整个服务器。
  • 主服务器复制 EVALSCRIPT_FLUSHSCRIPT_LOAD 三个命令的方法和复制普通 Redis 命令一样 —— 只要将相同的命令传播给从服务器就可以了。
  • 主服务器在复制 EVALSHA 命令时, 必须确保所有从服务器都已经载入了 EVALSHA 命令指定的 SHA1 校验和所对应的 Lua 脚本, 如果不能确保这一点的话, 主服务器会将 EVALSHA 命令转换成等效的 EVAL 命令, 并通过传播 EVAL 命令来获得相同的脚本执行效果。

参考资料

Markdown 极简教程

目录

标题

Markdown 支持六个级别的标题。

1
2
3
4
5
6
7
语法:
# 一级标题
## 二级标题
### 三级标题
#### 四级标题
##### 五级标题
###### 六级标题

文本样式

:bulb: 粗体、斜体、删除线可以混合使用。

在 Markdown 中,粗体文本、斜体文本可以使用 *_ 符号标记。建议统一风格,始终只用一种符号。

语法 效果
普通文本 普通文本
*斜体文本* _斜体文本_ 斜体文本 斜体文本
**粗体文本** __粗体文本__ 粗体文本 粗体文本
~~删除文本~~ 删除文本
***粗斜体文本*** ___粗斜体文本___ 粗斜体文本 粗斜体文本

列表

无序列表

  • RED
  • YELLOW
  • BLUE

有序列表

  1. 第一步
  2. 第二步
  3. 第三步

任务列表

  • 完成任务
  • 计划任务

多级列表

  • 数据结构
    • 线性表
      • 顺序表
      • 链表
        • 单链表
        • 双链表
      • 二叉树
        • 二叉平衡树

分割线

***---___ 都可以作为分割线。




链接

普通链接

语法:

1
[钝悟的博客](https://dunwu.github.io/waterdrop/)
  • [] 中标记链接名。类似 HTML 中 <a> 元素的 title 属性。
  • () 中标记链接的 url,也支持相对路径(前提是资源可以访问)。类似 HTML 中 <a> 元素的 href 属性。

效果:

图片

Markdown 引用图片的语法:

1
![alt](url title)

alt 和 title 即对应 HTML 中 img 元素的 alt 和 title 属性(都可省略):

  • alt - 表示图片显示失败时的替换文本。

  • title - 表示鼠标悬停在图片时的显示文本(注意这里要加引号)

  • url - 即图片的 url 地址

logo

图片链接

可以将图片和链接混合使用。

logo

锚点

其实呢,每一个标题都是一个锚点,和 HTML 的锚点(#)类似,比如:回到顶部

引用

普通引用:

:question: 什么是 Markdown

Markdown是一种轻量级标记语言,创始人为约翰·格鲁伯(英语:John Gruber)。它允许人们“使用易读易写的纯文本格式编写文档,然后转换成有效的XHTML(或者HTML)文档”。[4]这种语言吸收了很多在电子邮件中已有的纯文本标记的特性。 —— 摘自 Wiki

嵌套引用:

数据结构

二叉树

平衡二叉树

满二叉树

代码高亮

标签

语法:

1
`Markdown` `Doc`

效果:

Markdown, Doc

代码块

语法一:在文本前后都使用三个反引号进行标记。【✔️️️️ 推荐】

1
2
3
这是一个文本块。
这是一个文本块。
这是一个文本块。

语法二:在连续几行的文本开头加入 1 个 Tab 或者 4 个空格。【❌ 不推荐】

这是一个文本块。
这是一个文本块。
这是一个文本块。

语法

在三个反引号后面加上编程语言的名字,另起一行开始写代码,最后一行再加上三个反引号。

1
public static void main(String[]args){} //Java
1
int main(int argc, char *argv[]) //C
1
echo "hello GitHub" #Bash
1
document.getElementById('myH1').innerHTML = 'Welcome to my Homepage' //javascipt
1
string &operator+(const string& A,const string& B) //cpp

表格

一般表格:

表头 1 表头 2
表格单元 表格单元
表格单元 表格单元

表格可以指定对齐方式:

序号 商品 价格
1 电脑 6000.0
2 鼠标 100.0
3 键盘 200.0

Emoji 表情

:bulb: 注意:部分 Markdown 引擎支持 Emoji。

合理使用 Emoji 表情,往往可以使得文章内容更加丰富生动。例如::heavy_check_mark: :x: :bulb: :bell: :heavy_exclamation_mark: :question:

更多 Emoji 表情请参考:

注脚

:bulb: 注意:部分 Markdown 引擎支持注脚。

一个具有注脚的文本。^1

数学公式

:bulb: 注意:部分 Markdown 引擎支持 Latex。

很多文档中,需要引入一些数学符号、特殊符号,其排版问题比较头疼。这种问题,可以用 Latex 来解决,大部分 Markdown 引擎都支持 Latex。

Latex 可以使用 $ 符号来标记 Latex 表达式,下面是一个数学公式示例:

$$
\Gamma(z) = \int_0^\infty t^{z-1}e^{-t}dt,.
$$

列举一些常用数学符号:

符号 语法 描述
$\leq$ $\leq$ 小于等于
$\geq$ $\geq$ 大于等于
$\neq$ $\neq$ 不等于
$\approx$ $\approx$ 约等于
$\infty$ $\infty$ 无穷
$\prod_{x}^{y}$ $\prod_{x}^{y}$ 累乘
$\sum_{i=0}^n$ $\sum_{i=0}^n$ 求和
$\int$ $\int$ 积分
$\iint$ $\iint$ 双重积分
$\log_x{y}$ $\log_x{y}$ 对数
$x^{y+1}$ $x^{y+1}$ 上标
$x_{y+1}$ $x_{y+1}$ 下标
$\frac{x}{y}$ $\frac{x}{y}$ 分数
$\sqrt[y]{x}$ $\sqrt[y]{x}$ 开方
$\sin$ $\sin$ 正弦
$\cos$ $\cos$ 余弦
$\tan$ $\tan$ 正切

更多数学符号支持请参考:

Diff

:bulb: 注意:部分 Markdown 引擎支持 Diff。

版本控制的系统中都少不了 diff 的功能,即展示一个文件内容的增加与删除。
GFM 中可以显示的展示 diff 效果。可以用 + 开头表示新增,- 开头表示删除。

1
2
+ 新增内容
- 删除内容

UML 图

💡 注意:部分 Markdown 引擎支持 mermaid

mermaid 提供了多种 UML 图。详情请参考:mermaid 文档

流程图

1
2
3
4
5
graph LR
A[Hard edge] -->|Link text| B(Round edge)
B --> C{Decision}
C -->|One| D[Result one]
C -->|Two| E[Result two]

时序图

1
2
3
4
5
6
7
8
9
10
sequenceDiagram
Alice->>Bob: Hello Bob, how are you?
alt is sick
Bob->>Alice: Not so good :(
else is well
Bob->>Alice: Feeling fresh like a daisy
end
opt Extra response
Bob->>Alice: Thanks for asking
end

甘特图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
gantt
dateFormat YYYY-MM-DD
title Adding GANTT diagram functionality to mermaid

section A section
Completed task :done, des1, 2014-01-06,2014-01-08
Active task :active, des2, 2014-01-09, 3d
Future task : des3, after des2, 5d
Future task2 : des4, after des3, 5d

section Critical tasks
Completed task in the critical line :crit, done, 2014-01-06,24h
Implement parser and jison :crit, done, after des1, 2d
Create tests for parser :crit, active, 3d
Future task in critical line :crit, 5d
Create tests for renderer :2d
Add to mermaid :1d

section Documentation
Describe gantt syntax :active, a1, after des1, 3d
Add gantt diagram to demo page :after a1 , 20h
Add another diagram to demo page :doc1, after a1 , 48h

section Last section
Describe gantt syntax :after doc1, 3d
Add gantt diagram to demo page :20h
Add another diagram to demo page :48h

HTML

有些 Markdown 引擎支持在文档中嵌入的 html 元素。

有些 Markdown 语法所不支持的特性,可以使用 html 元素来支持。

折叠

折叠内容一

展开才能看到的内容

折叠内容二

展开才能看到的内容

居中

居中显示的文本

图片尺寸

编辑器

推荐 Markdown 编辑器

  • Typora - 个人认为是功能最强的 Markdown 编辑器。
  • Visual Studio Code - 可以通过安装插件,量身打造 Markdown 编辑器。
  • marktext - 一款简单优雅的 Markdown 编辑器。
  • StackEdit - 在线 Markdown 编辑器。
  • Editor.md - 在线 Markdown 编辑器。
  • Marxico - 一款专为印象笔记(Evernote)打造的 Markdown 编辑器。

想了解更多 Markdown 编辑器可以参考:主流 Markdown 编辑器推荐

参考资料

流量控制

在高并发场景下,为了应对瞬时海量请求的压力,保障系统的平稳运行,必须预估系统的流量阈值,通过限流规则阻断处理不过来的请求。

限流简介

限流可以认为是服务降级的一种。限流就是限制系统的输入和输出流量已达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。

限流规则包含三个部分:时间粒度,接口粒度,最大限流值。限流规则设置是否合理直接影响到限流是否合理有效。

限流算法

固定窗口限流算法

固定窗口限流算法的原理

固定窗口限流算法的基本策略是:

  1. 设置一个固定时间窗口,以及这个固定时间窗口内的最大请求数;
  2. 为每个固定时间窗口设置一个计数器,用于统计请求数;
  3. 一旦请求数超过最大请求数,则请求会被拦截。

固定窗口限流算法的利弊

固定窗口限流算法的优点是:实现简单。

固定窗口限流算法的缺点是:存在临界问题。所谓临界问题,是指:流量分别集中在一个固定时间窗口的尾部和一个固定时间窗口的头部。举例来说,假设限流规则为每分钟不超过 100 次请求。在第一个时间窗口中,起初没有任何请求,在最后 1 s,收到 100 次请求,由于没有达到阈值,所有请求都通过;在第二个时间窗口中,第 1 秒就收到 100 次请求,而后续没有任何请求。虽然,这两个时间窗口内的流量都符合限流要求,但是在两个时间窗口临界的这 2s 内,实际上有 200 次请求,显然是超过预期吞吐量的,存在压垮系统的可能。

固定窗口限流算法的实现

【示例】Java 版本的固定窗口限流算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class SlidingWindowRateLimiter implements RateLimiter {

/**
* 允许的最大请求数
*/
private final long maxPermits;

/**
* 窗口期时长
*/
private long periodMillis;

/**
* 分片窗口期时长
*/
private final long shardPeriodMillis;

/**
* 窗口期截止时间
*/
private long lastPeriodMillis;

/**
* 分片窗口数
*/
private final int shardNum;

/**
* 请求总计数
*/
private final AtomicLong totalCount = new AtomicLong(0);

/**
* 分片窗口计数列表
*/
private final List<AtomicLong> countList = new LinkedList<>();

public SlidingWindowRateLimiter(long qps, int shardNum) {
this(qps, 1000, TimeUnit.MILLISECONDS, shardNum);
}

public SlidingWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, int shardNum) {
this.maxPermits = maxPermits;
this.periodMillis = timeUnit.toMillis(period);
this.lastPeriodMillis = System.currentTimeMillis();
this.shardPeriodMillis = timeUnit.toMillis(period) / shardNum;
this.shardNum = shardNum;
for (int i = 0; i < shardNum; i++) {
countList.add(new AtomicLong(0));
}
}

@Override
public synchronized boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
if (now > lastPeriodMillis) {
for (int shardId = 0; shardId < shardNum; shardId++) {
long shardCount = countList.get(shardId).get();
totalCount.addAndGet(-shardCount);
countList.set(shardId, new AtomicLong(0));
lastPeriodMillis += shardPeriodMillis;
}
}
int shardId = (int) (now % periodMillis / shardPeriodMillis);
if (totalCount.get() + permits <= maxPermits) {
countList.get(shardId).addAndGet(permits);
totalCount.addAndGet(permits);
return true;
} else {
return false;
}
}

}

滑动窗口限流算法

滑动窗口限流算法的原理

滑动窗口限流算法是对固定窗口限流算法的改进,解决了临界问题。

滑动窗口限流算法的基本策略是:

  • 将固定时间窗口分片为多个子窗口,每个子窗口的访问次数独立统计;
  • 当请求时间大于当前子窗口的最大时间时,则将当前子窗口废弃,并将计时窗口向前滑动,并将下一个子窗口置为当前窗口。
  • 要保证所有子窗口的统计数之和不能超过阈值。

滑动窗口限流算法就是针对固定窗口限流算法的更细粒度的控制,分片越多,则限流越精准。

滑动窗口限流算法的利弊

滑动窗口限流算法的优点是:在滑动窗口限流算法中,临界位置的突发请求都会被算到时间窗口内,因此可以解决计数器算法的临界问题。

滑动窗口限流算法的缺点是:

  • 额外的内存开销 - 滑动时间窗口限流算法的时间窗口是持续滑动的,并且除了需要一个计数器来记录时间窗口内接口请求次数之外,还需要记录在时间窗口内每个接口请求到达的时间点,所以存在额外的内存开销。
  • 限流的控制粒度受限于窗口分片粒度 - 滑动窗口限流算法,只能在选定的时间粒度上限流,对选定时间粒度内的更加细粒度的访问频率不做限制。但是,由于每个分片窗口都有额外的内存开销,所以也并不是分片数越多越好的。

滑动窗口限流算法的实现

【示例】Java 版本的滑动窗口限流算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class SlidingWindowRateLimiter implements RateLimiter {

/**
* 允许的最大请求数
*/
private final long maxPermits;

/**
* 窗口期时长
*/
private final long periodMillis;

/**
* 分片窗口期时长
*/
private final long shardPeriodMillis;

/**
* 窗口期截止时间
*/
private long lastPeriodMillis;

/**
* 分片窗口数
*/
private final int shardNum;

/**
* 请求总计数
*/
private final AtomicLong totalCount = new AtomicLong(0);

/**
* 分片窗口计数列表
*/
private final List<AtomicLong> countList = new LinkedList<>();

public SlidingWindowRateLimiter(long qps, int shardNum) {
this(qps, 1000, TimeUnit.MILLISECONDS, shardNum);
}

public SlidingWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, int shardNum) {
this.maxPermits = maxPermits;
this.periodMillis = timeUnit.toMillis(period);
this.lastPeriodMillis = System.currentTimeMillis();
this.shardPeriodMillis = timeUnit.toMillis(period) / shardNum;
this.shardNum = shardNum;
for (int i = 0; i < shardNum; i++) {
countList.add(new AtomicLong(0));
}
}

@Override
public synchronized boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
if (now > lastPeriodMillis) {
for (int shardId = 0; shardId < shardNum; shardId++) {
long shardCount = countList.get(shardId).get();
totalCount.addAndGet(-shardCount);
countList.set(shardId, new AtomicLong(0));
lastPeriodMillis += shardPeriodMillis;
}
}
int shardId = (int) (now % periodMillis / shardPeriodMillis);
if (totalCount.get() + permits <= maxPermits) {
countList.get(shardId).addAndGet(permits);
totalCount.addAndGet(permits);
return true;
} else {
return false;
}
}

}

漏桶限流算法

漏桶限流算法的原理

漏桶限流算法的基本策略是:

  • 水(请求)以任意速率由入口进入到漏桶中;
  • 水以固定的速率由出口出水(请求通过);
  • 漏桶的容量是固定的,如果水的流入速率大于流出速率,最终会导致漏桶中的水溢出(这意味着请求拒绝)。

漏桶限流算法的利弊

漏桶限流算法的优点是:消费速率固定——即无论流量多大,即便是突发的大流量,处理请求的速度始终是固定的。

漏桶限流算法的缺点是:不能灵活的调整流量。例如:一个集群通过增减节点的方式,弹性伸缩了其吞吐能力,漏桶限流算法无法随之调整。

漏桶策略适用于间隔性突发流量且流量不用即时处理的场景

漏桶限流算法的实现

【示例】Java 版本的漏桶限流算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import java.util.concurrent.atomic.AtomicLong;

public class LeakyBucketRateLimiter implements RateLimiter {

/**
* QPS
*/
private final int qps;

/**
* 桶的容量
*/
private final long capacity;

/**
* 计算的起始时间
*/
private long beginTimeMillis;

/**
* 桶中当前的水量
*/
private final AtomicLong waterNum = new AtomicLong(0);

public LeakyBucketRateLimiter(int qps, int capacity) {
this.qps = qps;
this.capacity = capacity;
}

@Override
public synchronized boolean tryAcquire(int permits) {

// 如果桶中没有水,直接通过
if (waterNum.get() == 0) {
beginTimeMillis = System.currentTimeMillis();
waterNum.addAndGet(permits);
return true;
}

// 计算水量
long leakedWaterNum = ((System.currentTimeMillis() - beginTimeMillis) / 1000) * qps;
long currentWaterNum = waterNum.get() - leakedWaterNum;
waterNum.set(Math.max(0, currentWaterNum));

// 重置时间
beginTimeMillis = System.currentTimeMillis();

if (waterNum.get() + permits < capacity) {
waterNum.addAndGet(permits);
return true;
} else {
return false;
}
}

}

令牌桶限流算法

令牌桶限流算法的原理

令牌桶算法的原理

  1. 接口限制 T 秒内最大访问次数为 N,则每隔 T/N 秒会放一个 token 到桶中
  2. 桶内最多存放 M 个 token,如果 token 到达时令牌桶已经满了,那么这个 token 就会被丢弃
  3. 接口请求会先从令牌桶中取 token,拿到 token 则处理接口请求,拿不到 token 则进行限流处理

令牌桶限流算法的利弊

因为令牌桶存放了很多令牌,那么大量的突发请求会被执行,但是它不会出现临界问题,在令牌用完之后,令牌是以一个恒定的速率添加到令牌桶中的,因此不能再次发送大量突发请求。

规定固定容量的桶,token 以固定速度往桶内填充,当桶满时 token 不会被继续放入,每过来一个请求把 token 从桶中移除,如果桶中没有 token 不能请求。

令牌桶算法适用于有突发特性的流量,且流量需要即时处理的场景

令牌桶限流算法的实现

【示例】Java 实现令牌桶算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.concurrent.atomic.AtomicLong;

/**
* 令牌桶限流算法
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2024-01-18
*/
public class TokenBucketRateLimiter implements RateLimiter {

/**
* QPS
*/
private final long qps;

/**
* 桶的容量
*/
private final long capacity;

/**
* 上一次令牌发放时间
*/
private long endTimeMillis;

/**
* 桶中当前的令牌数量
*/
private final AtomicLong tokenNum = new AtomicLong(0);

public TokenBucketRateLimiter(long qps, long capacity) {
this.qps = qps;
this.capacity = capacity;
this.endTimeMillis = System.currentTimeMillis();
}

@Override
public synchronized boolean tryAcquire(int permits) {

long now = System.currentTimeMillis();
long gap = now - endTimeMillis;

// 计算令牌数
long newTokenNum = (gap * qps / 1000);
long currentTokenNum = tokenNum.get() + newTokenNum;
tokenNum.set(Math.min(capacity, currentTokenNum));

if (tokenNum.get() < permits) {
return false;
} else {
tokenNum.addAndGet(-permits);
endTimeMillis = now;
return true;
}
}

}

扩展

Guava 的 RateLimiter 工具类就是基于令牌桶算法实现,其源码分析可以参考:RateLimiter 基于漏桶算法,但它参考了令牌桶算法

限流算法测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class RateLimiterDemo {

public static void main(String[] args) {

// ============================================================================

int qps = 20;

System.out.println("======================= 固定时间窗口限流算法 =======================");
FixedWindowRateLimiter fixedWindowRateLimiter = new FixedWindowRateLimiter(qps);
testRateLimit(fixedWindowRateLimiter, qps);

System.out.println("======================= 滑动时间窗口限流算法 =======================");
SlidingWindowRateLimiter slidingWindowRateLimiter = new SlidingWindowRateLimiter(qps, 10);
testRateLimit(slidingWindowRateLimiter, qps);

System.out.println("======================= 漏桶限流算法 =======================");
LeakyBucketRateLimiter leakyBucketRateLimiter = new LeakyBucketRateLimiter(qps, 100);
testRateLimit(leakyBucketRateLimiter, qps);

System.out.println("======================= 令牌桶限流算法 =======================");
TokenBucketRateLimiter tokenBucketRateLimiter = new TokenBucketRateLimiter(qps, 100);
testRateLimit(tokenBucketRateLimiter, qps);
}

private static void testRateLimit(RateLimiter rateLimiter, int qps) {

AtomicInteger okNum = new AtomicInteger(0);
AtomicInteger limitNum = new AtomicInteger(0);
ExecutorService executorService = ThreadUtil.newFixedExecutor(10, "限流测试", true);
long beginTime = System.currentTimeMillis();

int threadNum = 4;
final CountDownLatch latch = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
executorService.submit(() -> {
try {
batchRequest(rateLimiter, okNum, limitNum, 1000);
} catch (Exception e) {
log.error("发生异常!", e);
} finally {
latch.countDown();
}
});
}

try {
latch.await(10, TimeUnit.SECONDS);
long endTime = System.currentTimeMillis();
long gap = endTime - beginTime;
log.info("限流 QPS: {} -> 实际结果:耗时 {} ms,{} 次请求成功,{} 次请求被限流,实际 QPS: {}",
qps, gap, okNum.get(), limitNum.get(), okNum.get() * 1000 / gap);
if (okNum.get() == qps) {
log.info("限流符合预期");
}
} catch (Exception e) {
log.error("发生异常!", e);
} finally {
executorService.shutdown();
}
}

private static void batchRequest(RateLimiter rateLimiter, AtomicInteger okNum, AtomicInteger limitNum, int num)
throws InterruptedException {
for (int j = 0; j < num; j++) {
if (rateLimiter.tryAcquire(1)) {
log.info("请求成功");
okNum.getAndIncrement();
} else {
log.info("请求限流");
limitNum.getAndIncrement();
}
TimeUnit.MILLISECONDS.sleep(RandomUtil.randomInt(0, 10));
}
}

}

分布式限流

前文中,基于 Java 实现的限流算法示例只能运行在单节点,无法有效应对集群部署的服务,这中场景下就需要分布式限流。

实现分布式限流的一种简单解决方案是使用 Redis + Lua 来实现。使用二者来开发的原因是:1. Redis 的性能极高;2. Redis 支持以原子操作的方式执行 Lua 脚本。

Redis + Lua 实现的固定窗口限流算法

Redis + Lua 实现的固定窗口限流算法实现思路:

  • 根据实际需要,将当前时间格式化为天(yyyyMMdd)、时(yyyyMMddHH)、分(yyyyMMddHHmm)、秒(yyyyMMddHHmmss),并作为 Redis 的 String 类型 Key。该 Key 可以视为一个固定时间窗口,其中的 value 用于统计访问量;
  • 用于代表不同粒度的时间窗口按需设置过期时间;
  • 一旦达到窗口的限流阈值时,请求被限流;否则请求通过。

【示例】Redis + Lua 实现的固定窗口限流算法

下面的代码片段模拟通过一个大小为 1 分钟的固定时间窗口进行限流,阈值为 100,过期时间 60s。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
private final String key = "rate:limit:202401222100";
private final int limit = 100;
private final int seconds = 60;

public boolean tryAcquire(int permits) {
// -- 缓存 Key
// local key = KEYS[1]
// -- 访问请求数
// local permits = tonumber(ARGV[1])
// -- 过期时间
// local seconds = tonumber(ARGV[2])
// -- 限流阈值
// local limit = tonumber(ARGV[3])
//
// -- 获取统计值
// local count = tonumber(redis.call('GET', key) or "0")
//
// if count + permits > limit then
// -- 触发限流
// return 0
// else
// redis.call('INCRBY', key, permits)
// redis.call('EXPIRE', key, seconds)
// return count + permits
// end
String script =
"-- 缓存 Key\n"
+ "local key = KEYS[1]\n"
+ "-- 访问请求数\n"
+ "local permits = tonumber(ARGV[1])\n"
+ "-- 过期时间\n"
+ "local seconds = tonumber(ARGV[2])\n"
+ "-- 限流阈值\n"
+ "local limit = tonumber(ARGV[3])\n"
+ "\n"
+ "-- 获取统计值\n"
+ "local count = tonumber(redis.call('GET', key) or \"0\")\n"
+ "\n"
+ "if count + permits > limit then\n"
+ " -- 触发限流\n"
+ " return 0\n"
+ "else\n"
+ " redis.call('INCRBY', key, permits)\n"
+ " redis.call('EXPIRE', key, seconds)\n"
+ " return count + permits\n"
+ "end";
List<String> keys = Collections.singletonList(key);
List<String> args = Arrays.asList(String.valueOf(permits), String.valueOf(seconds), String.valueOf(limit));
Object eval = jedis.eval(script, keys, args);
long value = (long) eval;
return value != 0;
}

@Test
public void test() {

for (int i = 0; i < 11; i++) {
if (tryAcquire(10)) {
System.out.println("请求成功");
} else {
System.out.println("请求失败");
}
}
}
// 请求成功
// 请求成功
// 请求成功
// 请求成功
// 请求成功
// 请求成功
// 请求成功
// 请求成功
// 请求成功
// 请求成功
// 请求失败
// rate:limit:202401222100 统计值达到 100

Redis + Lua 实现的令牌桶限流算法

【示例】基于 Redis Lua 令牌桶限流算法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
-- 令牌桶限流

-- 令牌的唯一标识
local bucketKey = KEYS[1]
-- 上次请求的时间
local last_mill_request_key = KEYS[2]
-- 令牌桶的容量
local limit = tonumber(ARGV[1])
-- 请求令牌的数量
local permits = tonumber(ARGV[2])
-- 令牌流入的速率
local rate = tonumber(ARGV[3])
-- 当前时间
local curr_mill_time = tonumber(ARGV[4])

-- 添加令牌

-- 获取当前令牌的数量
local current_limit = tonumber(redis.call('get', bucketKey) or "0")
-- 获取上次请求的时间
local last_mill_request_time = tonumber(redis.call('get', last_mill_request_key) or "0")
-- 计算向桶里添加令牌的数量
if last_mill_request_time == 0 then
-- 令牌桶初始化
-- 更新上次请求时间
redis.call("HSET", last_mill_request_key, curr_mill_time)
return 0
else
local add_token_num = math.floor((curr_mill_time - last_mill_request_time) * rate)
end

-- 更新令牌的数量
if current_limit + add_token_num > limit then
current_limit = limit
else
current_limit = current_limit + add_token_num
end
redis.pcall("HSET",bucketKey, current_limit)
-- 设置过期时间
redis.call("EXPIRE", bucketKey, 2)

-- 限流判断
if current_limit - permits < 1 then
-- 达到限流大小
return 0
else
-- 没有达到限流大小
current_limit = current_limit - permits
redis.pcall("HSET", bucketKey, current_limit)
-- 设置过期时间
redis.call("EXPIRE", bucketKey, 2)
-- 更新上次请求的时间
redis.call("HSET", last_mill_request_key, curr_mill_time)
end

限流工具

前面介绍了限流算法的基本原理和一些简单的实现。但在生产环境,我们一般应该使用更成熟的限流工具。

参考资料