Dunwu Blog

大道至简,知易行难

Java 并发之容器

同步容器

同步容器简介

在 Java 中,同步容器主要包括 2 类:

  • VectorStackHashtable
    • Vector - Vector 实现了 List 接口。Vector 实际上就是一个数组,和 ArrayList 类似。但是 Vector 中的方法都是 synchronized 方法,即进行了同步措施。
    • Stack - Stack 也是一个同步容器,它的方法也用 synchronized 进行了同步,它实际上是继承于 Vector 类。
    • Hashtable- Hashtable 实现了 Map 接口,它和 HashMap 很相似,但是 Hashtable 进行了同步处理,而 HashMap 没有。
  • Collections 类中提供的静态工厂方法创建的类(由 Collections.synchronizedXXX 等方法)

同步容器的问题

同步容器的同步原理就是在其 getsetsize 等主要方法上用 synchronized 修饰。 synchronized 可以保证在同一个时刻,只有一个线程可以执行某个方法或者某个代码块

想详细了解 synchronized 用法和原理可以参考:Java 并发核心机制

性能问题

synchronized 的互斥同步会产生阻塞和唤醒线程的开销。显然,这种方式比没有使用 synchronized 的容器性能要差很多。

注:尤其是在 Java 1.6 没有对 synchronized 进行优化前,阻塞开销很高。

安全问题

同步容器真的绝对安全吗?

其实也未必。在做复合操作(非原子操作)时,仍然需要加锁来保护。常见复合操作如下:

  • 迭代:反复访问元素,直到遍历完全部元素;
  • 跳转:根据指定顺序寻找当前元素的下一个(下 n 个)元素;
  • 条件运算:例如若没有则添加等;

❌ 不安全的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class VectorDemo {

static Vector<Integer> vector = new Vector<>();

public static void main(String[] args) {
while (true) {
vector.clear();

for (int i = 0; i < 10; i++) {
vector.add(i);
}

Thread thread1 = new Thread() {
@Override
public void run() {
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
};

Thread thread2 = new Thread() {
@Override
public void run() {
for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
}
};

thread1.start();
thread2.start();

while (Thread.activeCount() > 10) {
System.out.println("同时存在 10 个以上线程,退出");
return;
}
}
}

}

以上程序执行时可能会出现数组越界错误。

Vector 是线程安全的,那为什么还会报这个错?

这是因为,对于 Vector,虽然能保证每一个时刻只能有一个线程访问它,但是不排除这种可能:

当某个线程在某个时刻执行这句时:

1
2
for(int i=0;i<vector.size();i++)
vector.get(i);

假若此时 vector 的 size 方法返回的是 10,i 的值为 9

然后另外一个线程执行了这句:

1
2
for(int i=0;i<vector.size();i++)
vector.remove(i);

将下标为 9 的元素删除了。

那么通过 get 方法访问下标为 9 的元素肯定就会出问题了。

✔️️️ 安全示例

因此为了保证线程安全,必须在方法调用端做额外的同步措施,如下面所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class VectorDemo2 {

static Vector<Integer> vector = new Vector<Integer>();

public static void main(String[] args) {
while (true) {
for (int i = 0; i < 10; i++) {
vector.add(i);
}

Thread thread1 = new Thread() {
@Override
public void run() {
synchronized (VectorDemo2.class) { //进行额外的同步
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
}
};

Thread thread2 = new Thread() {
@Override
public void run() {
synchronized (VectorDemo2.class) {
for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
}
}
};

thread1.start();
thread2.start();

while (Thread.activeCount() > 10) {
System.out.println("同时存在 10 个以上线程,退出");
return;
}
}
}

}

ConcurrentModificationException 异常

在对 Vector 等容器并发地进行迭代修改时,会报 ConcurrentModificationException 异常,关于这个异常将会在后续文章中讲述。

但是在并发容器中不会出现这个问题。

并发容器简介

同步容器将所有对容器状态的访问都串行化,以保证线程安全性,这种策略会严重降低并发性。

Java 1.5 后提供了多种并发容器,使用并发容器来替代同步容器,可以极大地提高伸缩性并降低风险

J.U.C 包中提供了几个非常有用的并发容器作为线程安全的容器:

并发容器 对应的普通容器 描述
ConcurrentHashMap HashMap Java 1.8 之前采用分段锁机制细化锁粒度,降低阻塞,从而提高并发性;Java 1.8 之后基于 CAS 实现。
ConcurrentSkipListMap SortedMap 基于跳表实现的
CopyOnWriteArrayList ArrayList
CopyOnWriteArraySet Set 基于 CopyOnWriteArrayList 实现。
ConcurrentSkipListSet SortedSet 基于 ConcurrentSkipListMap 实现。
ConcurrentLinkedQueue Queue 线程安全的无界队列。底层采用单链表。支持 FIFO。
ConcurrentLinkedDeque Deque 线程安全的无界双端队列。底层采用双向链表。支持 FIFO 和 FILO。
ArrayBlockingQueue Queue 数组实现的阻塞队列。
LinkedBlockingQueue Queue 链表实现的阻塞队列。
LinkedBlockingDeque Deque 双向链表实现的双端阻塞队列。

J.U.C 包中提供的并发容器命名一般分为三类:

  • Concurrent
    • 这类型的锁竞争相对于 CopyOnWrite 要高一些,但写操作代价要小一些。
    • 此外,Concurrent 往往提供了较低的遍历一致性,即:当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历。代价就是,在获取容器大小 size() ,容器是否为空等方法,不一定完全精确,但这是为了获取并发吞吐量的设计取舍,可以理解。与之相比,如果是使用同步容器,就会出现 fail-fast 问题,即:检测到容器在遍历过程中发生了修改,则抛出 ConcurrentModificationException,不再继续遍历。
  • CopyOnWrite - 一个线程写,多个线程读。读操作时不加锁,写操作时通过在副本上加锁保证并发安全,空间开销较大。
  • Blocking - 内部实现一般是基于锁,提供阻塞队列的能力。

:x: 错误示例,产生 ConcurrentModificationException 异常:

1
2
3
public void removeKeys(Map<String, Object> map, final String... keys) {
map.keySet().removeIf(key -> ArrayUtil.contains(keys, key));
}

:x: 错误示例,产生 ConcurrentModificationException 异常:

1
2
3
4
5
6
public static <K, V> Map<K, V> removeKeys(Map<String, Object> map, final String... keys) {
for (K key : keys) {
map.remove(key);
}
return map;
}

并发场景下的 Map

如果对数据有强一致要求,则需使用 Hashtable;在大部分场景通常都是弱一致性的情况下,使用 ConcurrentHashMap 即可;如果数据量在千万级别,且存在大量增删改操作,则可以考虑使用 ConcurrentSkipListMap

并发场景下的 List

读多写少用 CopyOnWriteArrayList

写多读少用 ConcurrentLinkedQueue ,但由于是无界的,要有容量限制,避免无限膨胀,导致内存溢出。

Map

Map 接口的两个实现是 ConcurrentHashMap 和 ConcurrentSkipListMap,它们从应用的角度来看,主要区别在于ConcurrentHashMap 的 key 是无序的,而 ConcurrentSkipListMap 的 key 是有序的。所以如果你需要保证 key 的顺序,就只能使用 ConcurrentSkipListMap。

使用 ConcurrentHashMap 和 ConcurrentSkipListMap 需要注意的地方是,它们的 key 和 value 都不能为空,否则会抛出NullPointerException这个运行时异常。

ConcurrentHashMap

ConcurrentHashMap 是线程安全的 HashMap ,用于替代 Hashtable

ConcurrentHashMap 的特性

ConcurrentHashMap 实现了 ConcurrentMap 接口,而 ConcurrentMap 接口扩展了 Map 接口。

1
2
3
4
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
// ...
}

ConcurrentHashMap 的实现包含了 HashMap 所有的基本特性,如:数据结构、读写策略等。

ConcurrentHashMap 没有实现对 Map 加锁以提供独占访问。因此无法通过在客户端加锁的方式来创建新的原子操作。但是,一些常见的复合操作,如:“若没有则添加”、“若相等则移除”、“若相等则替换”,都已经实现为原子操作,并且是围绕 ConcurrentMap 的扩展接口而实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface ConcurrentMap<K, V> extends Map<K, V> {

// 仅当 K 没有相应的映射值才插入
V putIfAbsent(K key, V value);

// 仅当 K 被映射到 V 时才移除
boolean remove(Object key, Object value);

// 仅当 K 被映射到 oldValue 时才替换为 newValue
boolean replace(K key, V oldValue, V newValue);

// 仅当 K 被映射到某个值时才替换为 newValue
V replace(K key, V value);
}

不同于 HashtableConcurrentHashMap 提供的迭代器不会抛出 ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。

:bell: 注意:一些需要对整个 Map 进行计算的方法,如 sizeisEmpty ,由于返回的结果在计算时可能已经过期,所以并非实时的精确值。这是一种策略上的权衡,在并发环境下,这类方法由于总在不断变化,所以获取其实时精确值的意义不大。ConcurrentHashMap 弱化这类方法,以换取更重要操作(如:getputcontainesKeyremove 等)的性能。

ConcurrentHashMap 的用法

示例:不会出现 ConcurrentModificationException

ConcurrentHashMap 的基本操作与 HashMap 的用法基本一样。不同于 HashMapHashtableConcurrentHashMap 提供的迭代器不会抛出 ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ConcurrentHashMapDemo {

public static void main(String[] args) throws InterruptedException {

// HashMap 在并发迭代访问时会抛出 ConcurrentModificationException 异常
// Map<Integer, Character> map = new HashMap<>();
Map<Integer, Character> map = new ConcurrentHashMap<>();

Thread wthread = new Thread(() -> {
System.out.println("写操作线程开始执行");
for (int i = 0; i < 26; i++) {
map.put(i, (char) ('a' + i));
}
});
Thread rthread = new Thread(() -> {
System.out.println("读操作线程开始执行");
for (Integer key : map.keySet()) {
System.out.println(key + " - " + map.get(key));
}
});
wthread.start();
rthread.start();
Thread.sleep(1000);
}
}

ConcurrentHashMap 的原理

ConcurrentHashMap 一直在演进,尤其在 Java 1.7 和 Java 1.8,其数据结构和并发机制有很大的差异。

  • Java 1.7
    • 数据结构:数组+单链表
    • 并发机制:采用分段锁机制细化锁粒度,降低阻塞,从而提高并发性。
  • Java 1.8
    • 数据结构:数组+单链表+红黑树
    • 并发机制:取消分段锁,之后基于 CAS + synchronized 实现。
Java 1.7 的实现

分段锁,是将内部进行分段(Segment),里面是 HashEntry 数组,和 HashMap 类似,哈希相同的条目也是以链表形式存放。
HashEntry 内部使用 volatilevalue 字段来保证可见性,也利用了不可变对象的机制,以改进利用 Unsafe 提供的底层能力,比如 volatile access,去直接完成部分操作,以最优化性能,毕竟 Unsafe 中的很多操作都是 JVM intrinsic 优化过的。

img

在进行并发写操作时,ConcurrentHashMap 会获取可重入锁(ReentrantLock),以保证数据一致性。所以,在并发修改期间,相应 Segment 是被锁定的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {

// 将整个hashmap分成几个小的map,每个segment都是一个锁;与hashtable相比,这么设计的目的是对于put, remove等操作,可以减少并发冲突,对
// 不属于同一个片段的节点可以并发操作,大大提高了性能
final Segment<K,V>[] segments;

// 本质上Segment类就是一个小的hashmap,里面table数组存储了各个节点的数据,继承了ReentrantLock, 可以作为互拆锁使用
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile HashEntry<K,V>[] table;
transient int count;
}

// 基本节点,存储Key, Value值
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
}
Java 1.8 的实现
  • 数据结构改进:与 HashMap 一样,将原先 数组+单链表 的数据结构,变更为 数组+单链表+红黑树 的结构。当出现哈希冲突时,数据会存入数组指定桶的单链表,当链表长度达到 8,则将其转换为红黑树结构,这样其查询的时间复杂度可以降低到 $$O(logN)$$,以改进性能。
  • 并发机制改进:
    • 取消 segments 字段,直接采用 transient volatile HashEntry<K,V>[] table 保存数据,采用 table 数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率
    • 使用 CAS + sychronized 操作,在特定场景进行无锁并发操作。使用 Unsafe、LongAdder 之类底层手段,进行极端情况的优化。现代 JDK 中,synchronized 已经被不断优化,可以不再过分担心性能差异,另外,相比于 ReentrantLock,它可以减少内存消耗,这是个非常大的优势。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果table为空,初始化;否则,根据hash值计算得到数组索引i,如果tab[i]为空,直接新建节点Node即可。注:tab[i]实质为链表或者红黑树的首节点。
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果tab[i]不为空并且hash值为MOVED,说明该链表正在进行transfer操作,返回扩容完成后的table。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 针对首个节点进行加锁操作,而不是segment,进一步减少线程冲突
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果在链表中找到值为key的节点e,直接设置e.val = value即可。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 如果没有找到值为key的节点,直接新建Node并加入链表即可。
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果首节点为TreeBin类型,说明为红黑树结构,执行putTreeVal操作。
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果节点数>=8,那么转换链表结构为红黑树结构。
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 计数增加1,有可能触发transfer操作(扩容)。
addCount(1L, binCount);
return null;
}

ConcurrentHashMap 的实战

示例摘自:极客时间教程 - Java 业务开发常见错误 100 例

ConcurrentHashMap 错误示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//线程个数
private static int THREAD_COUNT = 10;
//总元素数量
private static int ITEM_COUNT = 1000;

public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
//初始900个元素
System.out.println("init size:" + concurrentHashMap.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
//使用线程池并发处理逻辑
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
//查询还需要补充多少个元素
int gap = ITEM_COUNT - concurrentHashMap.size();
System.out.println("gap size:" + gap);
//补充元素
concurrentHashMap.putAll(getData(gap));
}));
//等待所有任务完成
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
//最后元素个数会是1000吗?
System.out.println("finish size:" + concurrentHashMap.size());
}

private static ConcurrentHashMap<String, Long> getData(int count) {
return LongStream.rangeClosed(1, count)
.boxed()
.collect(
Collectors.toConcurrentMap(
i -> UUID.randomUUID().toString(),
i -> i,
(o1, o2) -> o1,
ConcurrentHashMap::new));
}

初始大小 900 符合预期,还需要填充 100 个元素。

预期结果为 1000 个元素,实际大于 1000 个元素。

【分析】

ConcurrentHashMap 对外提供的方法或能力的限制:

  • 使用了 ConcurrentHashMap,不代表对它的多个操作之间的状态是一致的,是没有其他线程在操作它的,如果需要确保需要手动加锁。
  • 诸如 size、isEmpty 和 containsValue 等聚合方法,在并发情况下可能会反映 ConcurrentHashMap 的中间状态。因此在并发情况下,这些方法的返回值只能用作参考,而不能用于流程控制。显然,利用 size 方法计算差异值,是一个流程控制。
  • 诸如 putAll 这样的聚合方法也不能确保原子性,在 putAll 的过程中去获取数据可能会获取到部分数据。
ConcurrentHashMap 错误示例修正 1.0 版

通过 synchronized 加锁,当然可以保证数据一致性,但是牺牲了 ConcurrentHashMap 的性能,没哟真正发挥出 ConcurrentHashMap 的特性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

//线程个数
private static int THREAD_COUNT = 10;
//总元素数量
private static int ITEM_COUNT = 1000;

public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
//初始900个元素
System.out.println("init size:" + concurrentHashMap.size());
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
//使用线程池并发处理逻辑
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
//查询还需要补充多少个元素
synchronized (concurrentHashMap) {
int gap = ITEM_COUNT - concurrentHashMap.size();
System.out.println("gap size:" + gap);
//补充元素
concurrentHashMap.putAll(getData(gap));
}
}));
//等待所有任务完成
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
//最后元素个数会是1000吗?
System.out.println("finish size:" + concurrentHashMap.size());
}

private static ConcurrentHashMap<String, Long> getData(int count) {
return LongStream.rangeClosed(1, count)
.boxed()
.collect(
Collectors.toConcurrentMap(
i -> UUID.randomUUID().toString(),
i -> i,
(o1, o2) -> o1,
ConcurrentHashMap::new));
}
ConcurrentHashMap 错误示例修正 2.0 版
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

//循环次数
private static int LOOP_COUNT = 10000000;
//线程个数
private static int THREAD_COUNT = 10;
//总元素数量
private static int ITEM_COUNT = 1000;

public static void main(String[] args) throws InterruptedException {
StopWatch stopWatch = new StopWatch();
stopWatch.start("normaluse");
Map<String, Long> normaluse = normaluse();
stopWatch.stop();
Assert.isTrue(normaluse.size() == ITEM_COUNT, "normaluse size error");
Assert.isTrue(normaluse.values().stream()
.mapToLong(aLong -> aLong).reduce(0, Long::sum) == LOOP_COUNT
, "normaluse count error");
stopWatch.start("gooduse");
Map<String, Long> gooduse = gooduse();
stopWatch.stop();
Assert.isTrue(gooduse.size() == ITEM_COUNT, "gooduse size error");
Assert.isTrue(gooduse.values().stream()
.mapToLong(l -> l)
.reduce(0, Long::sum) == LOOP_COUNT
, "gooduse count error");
System.out.println(stopWatch.prettyPrint());
}

private static Map<String, Long> normaluse() throws InterruptedException {
ConcurrentHashMap<String, Long> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
synchronized (freqs) {
if (freqs.containsKey(key)) {
freqs.put(key, freqs.get(key) + 1);
} else {
freqs.put(key, 1L);
}
}
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
return freqs;
}

private static Map<String, Long> gooduse() throws InterruptedException {
ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
return freqs.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().longValue())
);
}

List

CopyOnWriteArrayList

CopyOnWriteArrayList 是线程安全的 ArrayListCopyOnWrite 字面意思为写的时候会将共享变量新复制一份出来。复制的好处在于读操作是无锁的(也就是无阻塞)。

CopyOnWriteArrayList 仅适用于写操作非常少的场景,而且能够容忍读写的短暂不一致。如果读写比例均衡或者有大量写操作的话,使用 CopyOnWriteArrayList 的性能会非常糟糕。

CopyOnWriteArrayList 原理

CopyOnWriteArrayList 内部维护了一个数组,成员变量 array 就指向这个内部数组,所有的读操作都是基于 array 进行的,如下图所示,迭代器 Iterator 遍历的就是 array 数组。

img

  • lock - 执行写时复制操作,需要使用可重入锁加锁
  • array - 对象数组,用于存放元素
1
2
3
4
5
/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock();

/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;

img

(1)读操作

CopyOnWriteAarrayList 中,读操作不同步,因为它们在内部数组的快照上工作,所以多个迭代器可以同时遍历而不会相互阻塞(图 1,2,4)。

CopyOnWriteArrayList 的读操作是不用加锁的,性能很高。

1
2
3
4
5
6
public E get(int index) {
return get(getArray(), index);
}
private E get(Object[] a, int index) {
return (E) a[index];
}

(2)写操作

所有的写操作都是同步的。他们在备份数组(图 3)的副本上工作。写操作完成后,后备阵列将被替换为复制的阵列,并释放锁定。支持数组变得易变,所以替换数组的调用是原子(图 5)。

写操作后创建的迭代器将能够看到修改的结构(图 6,7)。

写时复制集合返回的迭代器不会抛出 ConcurrentModificationException,因为它们在数组的快照上工作,并且无论后续的修改(2,4)如何,都会像迭代器创建时那样完全返回元素。

添加操作 - 添加的逻辑很简单,先将原容器 copy 一份,然后在新副本上执行写操作,之后再切换引用。当然此过程是要加锁的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean add(E e) {
//ReentrantLock加锁,保证线程安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
//拷贝原容器,长度为原容器长度加一
Object[] newElements = Arrays.copyOf(elements, len + 1);
//在新副本上执行添加操作
newElements[len] = e;
//将原容器引用指向新副本
setArray(newElements);
return true;
} finally {
//解锁
lock.unlock();
}
}

删除操作 - 删除操作同理,将除要删除元素之外的其他元素拷贝到新副本中,然后切换引用,将原容器引用指向新副本。同属写操作,需要加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public E remove(int index) {
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
if (numMoved == 0)
//如果要删除的是列表末端数据,拷贝前len-1个数据到新副本上,再切换引用
setArray(Arrays.copyOf(elements, len - 1));
else {
//否则,将除要删除元素之外的其他元素拷贝到新副本中,并切换引用
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
setArray(newElements);
}
return oldValue;
} finally {
//解锁
lock.unlock();
}
}

CopyOnWriteArrayList 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class CopyOnWriteArrayListDemo {

static class ReadTask implements Runnable {

List<String> list;

ReadTask(List<String> list) {
this.list = list;
}

public void run() {
for (String str : list) {
System.out.println(str);
}
}
}

static class WriteTask implements Runnable {

List<String> list;
int index;

WriteTask(List<String> list, int index) {
this.list = list;
this.index = index;
}

public void run() {
list.remove(index);
list.add(index, "write_" + index);
}
}

public void run() {
final int NUM = 10;
// ArrayList 在并发迭代访问时会抛出 ConcurrentModificationException 异常
// List<String> list = new ArrayList<>();
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < NUM; i++) {
list.add("main_" + i);
}
ExecutorService executorService = Executors.newFixedThreadPool(NUM);
for (int i = 0; i < NUM; i++) {
executorService.execute(new ReadTask(list));
executorService.execute(new WriteTask(list, i));
}
executorService.shutdown();
}

public static void main(String[] args) {
new CopyOnWriteArrayListDemo().run();
}
}

CopyOnWriteArrayList 实战

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Slf4j
public class WrongCopyOnWriteList {

public static void main(String[] args) {
testRead();
testWrite();
}

public static Map testWrite() {
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
StopWatch stopWatch = new StopWatch();
int loopCount = 100000;
stopWatch.start("Write:copyOnWriteArrayList");
IntStream.rangeClosed(1, loopCount)
.parallel()
.forEach(__ -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
stopWatch.start("Write:synchronizedList");
IntStream.rangeClosed(1, loopCount)
.parallel()
.forEach(__ -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
Map result = new HashMap();
result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
result.put("synchronizedList", synchronizedList.size());
return result;
}

private static void addAll(List<Integer> list) {
list.addAll(IntStream.rangeClosed(1, 1000000).boxed().collect(Collectors.toList()));
}

public static Map testRead() {
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
addAll(copyOnWriteArrayList);
addAll(synchronizedList);
StopWatch stopWatch = new StopWatch();
int loopCount = 1000000;
int count = copyOnWriteArrayList.size();
stopWatch.start("Read:copyOnWriteArrayList");
IntStream.rangeClosed(1, loopCount)
.parallel()
.forEach(__ -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
stopWatch.start("Read:synchronizedList");
IntStream.range(0, loopCount)
.parallel()
.forEach(__ -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
Map result = new HashMap();
result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
result.put("synchronizedList", synchronizedList.size());
return result;
}

}

读性能差不多是写性能的一百倍。

Set

Set 接口的两个实现是 CopyOnWriteArraySet 和 ConcurrentSkipListSet,使用场景可以参考前面讲述的 CopyOnWriteArrayList 和 ConcurrentSkipListMap,它们的原理都是一样的。

Queue

Java 并发包里面 Queue 这类并发容器是最复杂的,你可以从以下两个维度来分类。一个维度是阻塞与非阻塞,所谓阻塞指的是:当队列已满时,入队操作阻塞;当队列已空时,出队操作阻塞。另一个维度是单端与双端,单端指的是只能队尾入队,队首出队;而双端指的是队首队尾皆可入队出队。Java 并发包里阻塞队列都用 Blocking 关键字标识,单端队列使用 Queue 标识,双端队列使用 Deque 标识

BlockingQueue

BlockingQueue 顾名思义,是一个阻塞队列。**BlockingQueue 基本都是基于锁实现。在 BlockingQueue 中,当队列已满时,入队操作阻塞;当队列已空时,出队操作阻塞**。

BlockingQueue 接口定义如下:

1
public interface BlockingQueue<E> extends Queue<E> {}

核心 API:

1
2
3
4
// 获取并移除队列头结点,如果必要,其会等待直到队列出现元素
E take() throws InterruptedException;
// 插入元素,如果队列已满,则等待直到队列出现空闲空间
void put(E e) throws InterruptedException;

BlockingQueue 对插入操作、移除操作、获取元素操作提供了四种不同的方法用于不同的场景中使用:

  • 抛出异常;
  • 返回特殊值(nulltrue/false,取决于具体的操作);
  • 阻塞等待此操作,直到这个操作成功;
  • 阻塞等待此操作,直到成功或者超时指定时间。

总结如下:

Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable

BlockingQueue 的各个实现类都遵循了这些规则。

BlockingQueue 不接受 null 值元素。

JDK 提供了以下阻塞队列:

  • ArrayBlockingQueue - 一个由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue - 一个由链表结构组成的有界阻塞队列
  • PriorityBlockingQueue - 一个支持优先级排序的无界阻塞队列
  • SynchronousQueue - 一个不存储元素的阻塞队列
  • DelayQueue - 一个使用优先级队列实现的无界阻塞队列。
  • LinkedTransferQueue - 一个由链表结构组成的无界阻塞队列

BlockingQueue 基本都是基于锁实现。

PriorityBlockingQueue 类

PriorityBlockingQueue 类定义如下:

1
2
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {}

PriorityBlockingQueue 要点

  • PriorityBlockingQueue 可以视为 PriorityQueue 的线程安全版本。
  • PriorityBlockingQueue 实现了 BlockingQueue,也是一个阻塞队列。
  • PriorityBlockingQueue 实现了 Serializable,支持序列化。
  • PriorityBlockingQueue 不接受 null 值元素。
  • PriorityBlockingQueue 的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。

PriorityBlockingQueue 原理

PriorityBlockingQueue 有两个重要成员:

1
2
private transient Object[] queue;
private final ReentrantLock lock;
  • queue 是一个 Object 数组,用于保存 PriorityBlockingQueue 的元素。
  • 而可重入锁 lock 则用于在执行插入、删除操作时,保证这个方法在当前线程释放锁之前,其他线程不能访问。

PriorityBlockingQueue 的容量虽然有初始化大小,但是不限制大小,如果当前容量已满,插入新元素时会自动扩容。

ArrayBlockingQueue 类

ArrayBlockingQueue 是由数组结构组成的有界阻塞队列

ArrayBlockingQueue 要点

ArrayBlockingQueue 类定义如下:

1
2
3
4
5
6
7
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 数组的大小就决定了队列的边界,所以初始化时必须指定容量
public ArrayBlockingQueue(int capacity) { //... }
public ArrayBlockingQueue(int capacity, boolean fair) { //... }
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { //... }
}

说明:

  • ArrayBlockingQueue 实现了 BlockingQueue,也是一个阻塞队列。
  • ArrayBlockingQueue 实现了 Serializable,支持序列化。
  • ArrayBlockingQueue 是基于数组实现的有界阻塞队列。所以初始化时必须指定容量。

ArrayBlockingQueue 原理

ArrayBlockingQueue 的重要成员如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 用于存放元素的数组
final Object[] items;
// 下一次读取操作的位置
int takeIndex;
// 下一次写入操作的位置
int putIndex;
// 队列中的元素数量
int count;

// 以下几个就是控制并发用的同步器
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

ArrayBlockingQueue 内部以 final 的数组保存数据,数组的大小就决定了队列的边界。

ArrayBlockingQueue 实现并发同步的原理就是,读操作和写操作都需要获取到 AQS 独占锁才能进行操作。

  • 如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程。
  • 如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除,然后唤醒写线程队列的第一个等待线程。

对于 ArrayBlockingQueue,我们可以在构造的时候指定以下三个参数:

  • 队列容量,其限制了队列中最多允许的元素个数;
  • 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁;
  • 可以指定用一个集合来初始化,将此集合中的元素在构造方法期间就先添加到队列中。

LinkedBlockingQueue 类

LinkedBlockingQueue 是由链表结构组成的有界阻塞队列。容易被误解为无边界,但其实其行为和内部代码都是基于有界的逻辑实现的,只不过如果我们没有在创建队列时就指定容量,那么其容量限制就自动被设置为 Integer.MAX_VALUE,成为了无界队列。

LinkedBlockingQueue 要点

LinkedBlockingQueue 类定义如下:

1
2
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {}
  • LinkedBlockingQueue 实现了 BlockingQueue,也是一个阻塞队列。
  • LinkedBlockingQueue 实现了 Serializable,支持序列化。
  • LinkedBlockingQueue 是基于单链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。
  • LinkedBlockingQueue 中元素按照插入顺序保存(FIFO)。

LinkedBlockingQueue 原理

LinkedBlockingQueue 中的重要数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 队列容量
private final int capacity;
// 队列中的元素数量
private final AtomicInteger count = new AtomicInteger(0);
// 队头
private transient Node<E> head;
// 队尾
private transient Node<E> last;

// take, poll, peek 等读操作的方法需要获取到这个锁
private final ReentrantLock takeLock = new ReentrantLock();
// 如果读操作的时候队列是空的,那么等待 notEmpty 条件
private final Condition notEmpty = takeLock.newCondition();
// put, offer 等写操作的方法需要获取到这个锁
private final ReentrantLock putLock = new ReentrantLock();
// 如果写操作的时候队列是满的,那么等待 notFull 条件
private final Condition notFull = putLock.newCondition();

参考资料

深入剖析共识性算法 Raft

Raft 简介

Raft 是一种为了管理日志复制的分布式共识性算法。从本质上说,Raft 算法是通过一切以领导者为准的方式,实现一系列值的共识和各节点日志的一致

Raft 出现之前,Paxos 一直是分布式共识性算法的标准。Paxos 难以理解,更难以实现。Raft 的设计目标是简化 Paxos,使得算法既容易理解,也容易实现

Paxos 和 Raft 都是分布式共识性算法,这个过程如同投票选举领袖(Leader),参选者(Candidate)需要说服大多数投票者(Follower)投票给他,一旦选举出领袖,就由领袖发号施令。Paxos 和 Raft 的区别在于选举的具体过程不同。

Raft 可以解决分布式 CAP 理论中的 CP,即 一致性(C:Consistency) 和 _分区容忍性(P:Partition Tolerance)_,并不能解决 可用性(A:Availability) 的问题。

分布式共识性

分布式共识性 (distributed consensus) 是分布式系统中最基本的问题,用来保证一个分布式系统的可靠性以及容错能力。简单来说,**分布式共识性是指多个服务器的保持状态一致**。

在分布式系统中,可能出现各种意外(断电、网络拥塞、CPU/内存耗尽等等),使得服务器宕机或无法访问,最终导致无法和其他服务器保持状态一致。为了应对这种情况,就需要有一种一致性协议来进行容错,使得分布式系统中即使有部分服务器宕机或无法访问,整体依然可以对外提供服务。

以容错方式达成一致,自然不能要求所有服务器都达成一致状态,只要超过半数以上的服务器达成一致就可以了。假设有 N 台服务器, 大于等于 N/2 + 1 台服务器就算是半数以上了 。

复制状态机

复制状态机(Replicated State Machines) 是指一组服务器上的状态机产生相同状态的副本,并且在一些机器宕掉的情况下也可以继续运行。一致性算法管理着来自客户端指令的复制日志。状态机从日志中处理相同顺序的相同指令,所以产生的结果也是相同的。

复制状态机通常都是基于复制日志实现的,如上图。每一个服务器存储一个包含一系列指令的日志,并且按照日志的顺序进行执行。每一个日志都按照相同的顺序包含相同的指令,所以每一个服务器都执行相同的指令序列。因为每个状态机都是确定的,每一次执行操作都产生相同的状态和同样的序列。

保证复制日志相同就是一致性算法的工作了。在一台服务器上,一致性模块接收客户端发送来的指令然后增加到自己的日志中去。它和其他服务器上的一致性模块进行通信来保证每一个服务器上的日志最终都以相同的顺序包含相同的请求,尽管有些服务器会宕机。一旦指令被正确的复制,每一个服务器的状态机按照日志顺序处理他们,然后输出结果被返回给客户端。因此,服务器集群看起来形成一个高可靠的状态机。

实际系统中使用的一致性算法通常含有以下特性:

  • 安全性保证(绝对不会返回一个错误的结果):在非拜占庭错误情况下,包括网络延迟、分区、丢包、冗余和乱序等错误都可以保证正确。
  • 可用性:集群中只要有大多数的机器可运行并且能够相互通信、和客户端通信,就可以保证可用。因此,一个典型的包含 5 个节点的集群可以容忍两个节点的失败。服务器被停止就认为是失败。他们当有稳定的存储的时候可以从状态中恢复回来并重新加入集群。
  • 不依赖时序来保证一致性:物理时钟错误或者极端的消息延迟只有在最坏情况下才会导致可用性问题。
  • 通常情况下,一条指令可以尽可能快的在集群中大多数节点响应一轮远程过程调用时完成。小部分比较慢的节点不会影响系统整体的性能。

RAFT 应用

RAFT 可以做什么?

通过 RAFT 提供的复制状态机,可以解决分布式系统的复制、修复、节点管理等问题。Raft 极大的简化当前分布式系统的设计与实现,让开发者只关注于业务逻辑,将其抽象实现成对应的状态机即可。基于这套框架,可以构建很多分布式应用:

  • 分布式存储系统:比如分布式消息队列、分布式块系统、分布式文件系统、分布式表格系统等。代表有:Redis、Etcd、Consul
  • 高可靠元信息管理:比如各类 Master 模块的 HA

Raft 基础

Raft 将一致性问题分解成了三个子问题:

  • 选举 Leader
  • 日志复制
  • 安全性

在后续章节,会详细讲解这个子问题。现在,先了解一下 Raft 的一些核心概念。

服务器角色

在 Raft 中,任何时刻,每个服务器都处于这三个角色之一 :

  • Leader - 领导者,通常一个系统中是一主(Leader)多从(Follower)。Leader 负责处理所有的客户端请求
  • Follower - 跟随者,不会发送任何请求,只是简单的 响应来自 Leader 或者 Candidate 的请求
  • Candidate - 参选者,选举新 Leader 时的临时角色。

:bulb: 图示说明:

  • Follower 只响应来自其他服务器的请求。在一定时限内,如果 Follower 接收不到消息,就会转变成 Candidate,并发起选举。
  • Candidate 向 Follower 发起投票请求,如果获得集群中半数以上的选票,就会转变为 Leader。
  • 在一个 Term 内,Leader 始终保持不变,直到下线了。Leader 需要周期性向所有 Follower 发送心跳消息,以阻止 Follower 转变为 Candidate。

任期

Raft 把时间分割成任意长度的 任期(Term),任期用连续的整数标记。每一段任期从一次选举开始。Raft 保证了在一个给定的任期内,最多只有一个领导者

  • 如果选举成功,Leader 会管理整个集群直到任期结束。
  • 如果选举失败,那么这个任期就会因为没有 Leader 而结束。

不同服务器节点观察到的任期转换状态可能不一样

  • 服务器节点可能观察到多次的任期转换。
  • 服务器节点也可能观察不到任何一次任期转换。

任期在 Raft 算法中充当逻辑时钟的作用,使得服务器节点可以查明一些过期的信息(比如过期的 Leader)。每个服务器节点都会存储一个当前任期号,这一编号在整个时期内单调的增长。当服务器之间通信的时候会交换当前任期号。

  • 如果一个服务器的当前任期号比其他人小,那么他会更新自己的编号到较大的编号值。
  • 如果一个 Candidate 或者 Leader 发现自己的任期号过期了,那么他会立即恢复成跟随者状态。
  • 如果一个节点接收到一个包含过期的任期号的请求,那么他会直接拒绝这个请求。

RPC

Raft 算法中服务器节点之间的通信使用 **_远程过程调用(RPC)_**。

基本的一致性算法只需要两种 RPC:

  • RequestVote RPC - 请求投票 RPC,由 Candidate 在选举期间发起。
  • AppendEntries RPC - 附加条目 RPC,由 Leader 发起,用来复制日志和提供一种心跳机制。

选举 Leader

选举规则

领导者心跳消息:Raft 使用一种心跳机制来触发 Leader 选举。Leader 需要周期性的向所有 Follower 发送心跳消息,以此维持 Leader 身份。

随机的竞选超时时间:每个 Follower 都设置了一个随机的竞选超时时间,一般为 150ms ~ 300ms,如果在竞选超时时间内没有收到 Leader 的心跳消息,就会认为当前 Term 没有可用的 Leader,并发起选举来选出新的 Leader。开始一次选举过程,Follower 先要增加自己的当前 Term 号,并转换为 Candidate

Candidate 会并行的向集群中的所有服务器节点发送投票请求(RequestVote RPC,它会保持当前状态直到以下三件事情之一发生:

  • 自己成为 Leader
  • 其他的服务器成为 Leader
  • 没有任何服务器成为 Leader

自己成为 Leader

  • 当一个 Candidate 从整个集群半数以上的服务器节点获得了针对同一个 Term 的选票,那么它就赢得了这次选举并成为 Leader。每个服务器最多会对一个 Term 投出一张选票,按照先来先服务(FIFO)的原则。_要求半数以上选票的规则确保了最多只会有一个 Candidate 赢得此次选举_。
  • 一旦 Candidate 赢得选举,就立即成为 Leader。然后它会向其他的服务器发送心跳消息来建立自己的权威并且阻止新的领导人的产生。

其他的服务器成为 Leader

等待投票期间,Candidate 可能会从其他的服务器接收到声明它是 Leader 的 AppendEntries RPC

  • 如果这个 Leader 的 Term 号(包含在此次的 RPC 中)不小于 Candidate 当前的 Term,那么 Candidate 会承认 Leader 合法并回到 Follower 状态。
  • 如果此次 RPC 中的 Term 号比自己小,那么 Candidate 就会拒绝这个消息并继续保持 Candidate 状态。

没有任何服务器成为 Leader

如果有多个 Follower 同时成为 Candidate,那么选票可能会被瓜分以至于没有 Candidate 可以赢得半数以上的投票。当这种情况发生的时候,每一个 Candidate 都会竞选超时,然后通过增加当前 Term 号来开始一轮新的选举。然而,没有其他机制的话,选票可能会被无限的重复瓜分。

Raft 算法使用随机选举超时时间的方法来确保很少会发生选票瓜分的情况,就算发生也能很快的解决。为了阻止选票起初就被瓜分,竞选超时时间是一个随机的时间,在一个固定的区间(例如 150-300 毫秒)随机选择,这样可以把选举都分散开。

  • 以至于在大多数情况下,只有一个服务器会超时,然后它赢得选举,成为 Leader,并在其他服务器超时之前发送心跳包。
  • 同样的机制也被用在选票瓜分的情况下:每一个 Candidate 在开始一次选举的时候会重置一个随机的选举超时时间,然后在超时时间内等待投票的结果;这样减少了在新的选举中另外的选票瓜分的可能性。

理解了上面的选举规则后,我们通过动图来加深认识。

单 Candidate 选举

(1)下图表示一个分布式系统的最初阶段,此时只有 Follower,没有 Leader。Follower A 等待一个随机的选举超时时间之后,没收到 Leader 发来的心跳消息。因此,将 Term 由 0 增加为 1,转换为 Candidate,进入选举状态。

(2)此时,A 向所有其他节点发送投票请求。

(3)其它节点会对投票请求进行回复,如果超过半数以上的节点投票了,那么该 Candidate 就会立即变成 Term 为 1 的 Leader。

(4)Leader 会周期性地发送心跳消息给所有 Follower,Follower 接收到心跳包,会重新开始计时。

多 Candidate 选举

(1)如果有多个 Follower 成为 Candidate,并且所获得票数相同,那么就需要重新开始投票。例如下图中 Candidate B 和 Candidate D 都发起 Term 为 4 的选举,且都获得两票,因此需要重新开始投票。

(2)当重新开始投票时,由于每个节点设置的随机竞选超时时间不同,因此能下一次再次出现多个 Candidate 并获得同样票数的概率很低。

小结

Raft 算法通过:领导者心跳消息、随机选举超时时间、得到大多数选票才通过原则、任期最新者优先、先来先服务等投票原则,保证了一个任期只有一位领导,也极大地减少了选举失败的情况。

日志复制

日志格式

日志由含日志索引(log index)的日志条目(log entry)组成。每个日志条目包含它被创建时的 Term 号(下图中方框中的数字),和一个复制状态机需要执行的指令。如果一个日志条目被复制到半数以上的服务器上,就被认为可以提交(Commit)了。

  • 日志条目中的 Term 号被用来检查是否出现不一致的情况,它实际上是创建这条日志的领导者的任期编号。
  • 日志条目中的日志索引用来表明它在日志中的位置,它是一个单调递增的整数。

Raft 日志同步保证如下两点:

  • 如果不同日志中的两个日志条目有着相同的日志索引和 Term,则它们所存储的命令是相同的
    • 这个特性基于这条原则:Leader 最多在一个 Term 内、在指定的一个日志索引上创建一条日志条目,同时日志条目在日志中的位置也从来不会改变。
  • 如果不同日志中的两个日志条目有着相同的日志索引和 Term,则它们之前的所有条目都是完全一样的
    • 这个特性由 AppendEntries RPC 的一个简单的一致性检查所保证。在发送 AppendEntries RPC 时,Leader 会把新日志条目之前的日志条目的日志索引和 Term 号一起发送。如果 Follower 在它的日志中找不到包含相同日志索引和 Term 号的日志条目,它就会拒绝接收新的日志条目。

日志复制流程

  1. Leader 负责处理所有客户端的请求。
  2. Leader 把请求作为日志条目加入到它的日志中,然后并行的向其他服务器发送 AppendEntries RPC 请求,要求 Follower 复制日志条目。
  3. Follower 复制成功后,返回确认消息。
  4. 当这个日志条目被半数以上的服务器复制后,Leader 提交这个日志条目到它的复制状态机,并向客户端返回执行结果。

注意:如果 Follower 崩溃或者运行缓慢,再或者网络丢包,Leader 会不断的重复尝试发送 AppendEntries RPC 请求 (尽管已经回复了客户端),直到所有的跟随者都最终复制了所有的日志条目。

下面,通过一组动图来加深认识:

(1)来自客户端的修改都会被传入 Leader。注意该修改还未被提交,只是写入日志中。

(2)Leader 会把修改复制到所有 Follower。

(3)Leader 会等待大多数的 Follower 也进行了修改,然后才将修改提交。

(4)此时 Leader 会通知的所有 Follower 让它们也提交修改,此时所有节点的值达成一致。

日志一致性

一般情况下,Leader 和 Followers 的日志保持一致,因此日志条目一致性检查通常不会失败。然而,Leader 崩溃可能会导致日志不一致:旧的 Leader 可能没有完全复制完日志中的所有条目。

Leader 和 Follower 日志不一致的可能

Leader 和 Follower 可能存在多种日志不一致的可能。

:bulb: 图示说明:

上图阐述了 Leader 和 Follower 可能存在多种日志不一致的可能,每一个方框表示一个日志条目,里面的数字表示任期号 。

当一个 Leader 成功当选时,Follower 可能出现以下情况(a-f):

  • 存在未更新日志条目,如(a、b)。
  • 存在未提交日志条目,如(c、d)。
  • 两种情况都存在,如(e、f)。

_例如,场景 f 可能会这样发生,某服务器在 Term2 的时候是 Leader,已附加了一些日志条目到自己的日志中,但在提交之前就崩溃了;很快这个机器就被重启了,在 Term3 重新被选为 Leader,并且又增加了一些日志条目到自己的日志中;在 Term 2 和 Term 3 的日志被提交之前,这个服务器又宕机了,并且在接下来的几个任期里一直处于宕机状态_。

Leader 和 Follower 日志一致的保证

Leader 通过强制 Followers 复制它的日志来处理日志的不一致,Followers 上的不一致的日志会被 Leader 的日志覆盖

  • Leader 为了使 Followers 的日志同自己的一致,Leader 需要找到 Followers 同它的日志一致的地方,然后覆盖 Followers 在该位置之后的条目。
  • Leader 会从后往前试,每次日志条目失败后尝试前一个日志条目,直到成功找到每个 Follower 的日志一致位点,然后向后逐条覆盖 Followers 在该位置之后的条目。

安全性

前面描述了 Raft 算法是如何选举 Leader 和复制日志的。

Raft 还增加了一些限制来完善 Raft 算法,以保证安全性:保证了任意 Leader 对于给定的 Term,都拥有了之前 Term 的所有被提交的日志条目。

选举限制

拥有最新的已提交的日志条目的 Follower 才有资格成为 Leader。

Raft 使用投票的方式来阻止一个 Candidate 赢得选举除非这个 Candidate 包含了所有已经提交的日志条目。 Candidate 为了赢得选举必须联系集群中的大部分节点,这意味着每一个已经提交的日志条目在这些服务器节点中肯定存在于至少一个节点上。如果 Candidate 的日志至少和大多数的服务器节点一样新(这个新的定义会在下面讨论),那么他一定持有了所有已经提交的日志条目。

RequestVote RPC 实现了这样的限制:RequestVote RPC 中包含了 Candidate 的日志信息, Follower 会拒绝掉那些日志没有自己新的投票请求

如何判断哪个日志条目比较新?

Raft 通过比较两份日志中最后一条日志条目的日志索引和 Term 来判断哪个日志比较新。

  • 先判断 Term,哪个数值大即代表哪个日志比较新。
  • 如果 Term 相同,再比较 日志索引,哪个数值大即代表哪个日志比较新。

提交旧任期的日志条目

一个当前 Term 的日志条目被复制到了半数以上的服务器上,Leader 就认为它是可以被提交的。如果这个 Leader 在提交日志条目前就下线了,后续的 Leader 可能会覆盖掉这个日志条目。

💡 图示说明:

上图解释了为什么 Leader 无法对旧 Term 的日志条目进行提交。

  • 阶段 (a) ,S1 是 Leader,且 S1 写入日志条目为 (Term 2,日志索引 2),只有 S2 复制了这个日志条目。
  • 阶段 (b),S1 下线,S5 被选举为 Term3 的 Leader。S5 写入日志条目为 (Term 3,日志索引 2)。
  • 阶段 (c),S5 下线,S1 重新上线,并被选举为 Term4 的 Leader。此时,Term 2 的那条日志条目已经被复制到了集群中的大多数节点上,但是还没有被提交。
  • 阶段 (d),S1 再次下线,S5 重新上线,并被重新选举为 Term3 的 Leader。然后 S5 覆盖了日志索引 2 处的日志。
  • 阶段 (e),如果阶段 (d) 还未发生,即 S1 再次下线之前,S1 把自己主导的日志条目复制到了大多数节点上,那么在后续 Term 里面这些新日志条目就会被提交。这样在同一时刻就同时保证了,之前的所有旧日志条目就会被提交。

Raft 永远不会通过计算副本数目的方式去提交一个之前 Term 内的日志条目。只有 Leader 当前 Term 里的日志条目通过计算副本数目可以被提交;一旦当前 Term 的日志条目以这种方式被提交,那么由于日志匹配特性,之前的日志条目也都会被间接的提交。

当 Leader 复制之前任期里的日志时,Raft 会为所有日志保留原始的 Term,这在提交规则上产生了额外的复杂性。在其他的一致性算法中,如果一个新的领导人要重新复制之前的任期里的日志时,它必须使用当前新的任期号。Raft 使用的方法更加容易辨别出日志,因为它可以随着时间和日志的变化对日志维护着同一个任期编号。另外,和其他的算法相比,Raft 中的新领导人只需要发送更少日志条目(其他算法中必须在他们被提交之前发送更多的冗余日志条目来为他们重新编号)。

日志压缩

在实际的系统中,不能让日志无限膨胀,否则系统重启时需要花很长的时间进行恢复,从而影响可用性。Raft 采用对整个系统进行快照来解决,快照之前的日志都可以丢弃。

每个副本独立的对自己的系统状态生成快照,并且只能对已经提交的日志条目生成快照。

快照包含以下内容:

  • 日志元数据。最后一条已提交的日志条目的日志索引和 Term。这两个值在快照之后的第一条日志条目的 AppendEntries RPC 的完整性检查的时候会被用上。
  • 系统当前状态。

当 Leader 要发送某个日志条目,落后太多的 Follower 的日志条目会被丢弃,Leader 会将快照发给 Follower。或者新上线一台机器时,也会发送快照给它。

生成快照的频率要适中,频率过高会消耗大量 I/O 带宽;频率过低,一旦需要执行恢复操作,会丢失大量数据,影响可用性。推荐当日志达到某个固定的大小时生成快照。

生成一次快照可能耗时过长,影响正常日志同步。可以通过使用 copy-on-write 技术避免快照过程影响正常日志同步。

说明:本文仅阐述 Raft 算法的核心内容,不包括算法论证、评估等

参考资料

Redis 事务

Redis 仅支持“非严格”的事务。所谓“非严格”是指:Redis 事务保证“全部执行命令”;但是,Redis 事务“不支持回滚”。

关键词:事务ACIDMULTIEXECDISCARDWATCH

Redis 事务简介

什么是 ACID

ACID 是数据库事务正确执行的四个基本要素。

  • 原子性(Atomicity)
    • 事务被视为不可分割的最小单元,事务中的所有操作要么全部提交成功,要么全部失败回滚
    • 回滚可以用日志来实现,日志记录着事务所执行的修改操作,在回滚时反向执行这些修改操作即可。
  • 一致性(Consistency)
    • 数据库在事务执行前后都保持一致性状态。
    • 在一致性状态下,所有事务对一个数据的读取结果都是相同的。
  • 隔离性(Isolation)
    • 一个事务所做的修改在最终提交以前,对其它事务是不可见的。
  • 持久性(Durability)
    • 一旦事务提交,则其所做的修改将会永远保存到数据库中。即使系统发生崩溃,事务执行的结果也不能丢失。
    • 可以通过数据库备份和恢复来实现,在系统发生奔溃时,使用备份的数据库进行数据恢复。

一个支持事务(Transaction)中的数据库系统,必需要具有这四种特性,否则在事务过程(Transaction processing)当中无法保证数据的正确性。

  • 只有满足一致性,事务的执行结果才是正确的。
  • 在无并发的情况下,事务串行执行,隔离性一定能够满足。此时只要能满足原子性,就一定能满足一致性。
  • 在并发的情况下,多个事务并行执行,事务不仅要满足原子性,还需要满足隔离性,才能满足一致性。
  • 事务满足持久化是为了能应对系统崩溃的情况。

ACID

Redis 事务的特性

Redis 的事务总是支持 ACID 中的原子性、一致性和隔离性, 当服务器运行在 AOF 持久化模式下, 并且 appendfsync 选项的值为 always 时, 事务也具有持久性。

但需要注意的是:Redis 仅支持“非严格”的事务。这里的“非严格”,其实指的是 Redis 事务只能部分保证 ACID 中的原子性。

  • Redis 事务保证全部执行命令 - Redis 事务中的多个命令会被打包到事务队列中,然后按先进先出(FIFO)的顺序执行。事务在执行过程中不会被中断,当事务队列中的所有命令都被执行完毕之后,事务才会结束。
  • Redis 事务不支持回滚 - 如果命令执行失败不会回滚,而是会继续执行下去。

Redis 官方的事务特性文档给出的不支持回滚的理由是:

  • Redis 命令只会因为错误的语法而失败,或是命令用在了错误类型的键上面。
  • 因为不需要对回滚进行支持,所以 Redis 的内部可以保持简单且快速。

Redis 事务应用

MULTIEXECDISCARDWATCH 是 Redis 事务相关的命令。

事务可以一次执行多个命令, 并且有以下两个重要的保证:

  • 事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
  • 事务是一个原子操作:事务中的命令要么全部被执行,要么全部都不执行。

Redis 有天然解决这个并发竞争问题的类 CAS 乐观锁方案:每次要写之前,先判断一下当前这个 value 的时间戳是否比缓存里的 value 的时间戳要新。如果是的话,那么可以写,否则,就不能用旧的数据覆盖新的数据。

MULTI

MULTI 命令用于开启一个事务,它总是返回 OK 。

MULTI 执行之后, 客户端可以继续向服务器发送任意多条命令, 这些命令不会立即被执行, 而是被放到一个队列中, 当 EXEC 命令被调用时, 所有队列中的命令才会被执行。

以下是一个事务例子, 它原子地增加了 foobar 两个键的值:

1
2
3
4
5
6
7
8
9
> MULTI
OK
> INCR foo
QUEUED
> INCR bar
QUEUED
> EXEC
1) (integer) 1
2) (integer) 1

EXEC

EXEC 命令负责触发并执行事务中的所有命令。

  • 如果客户端在使用 MULTI 开启了一个事务之后,却因为断线而没有成功执行 EXEC ,那么事务中的所有命令都不会被执行。
  • 另一方面,如果客户端成功在开启事务之后执行 EXEC ,那么事务中的所有命令都会被执行。

MULTIEXEC 中的操作将会一次性发送给服务器,而不是一条一条发送,这种方式称为流水线,它可以减少客户端与服务器之间的网络通信次数从而提升性能。

DISCARD

当执行 DISCARD 命令时, 事务会被放弃, 事务队列会被清空, 并且客户端会从事务状态中退出。

示例:

1
2
3
4
5
6
7
8
9
10
> SET foo 1
OK
> MULTI
OK
> INCR foo
QUEUED
> DISCARD
OK
> GET foo
"1"

WATCH

WATCH 命令可以为 Redis 事务提供 check-and-set (CAS)行为。

WATCH 的键会被监视,并会发觉这些键是否被改动过了。 如果有至少一个被监视的键在 EXEC 执行之前被修改了, 那么整个事务都会被取消, EXEC 返回 nil-reply 来表示事务已经失败。

1
2
3
4
5
6
WATCH mykey
val = GET mykey
val = val + 1
MULTI
SET mykey $val
EXEC

使用上面的代码, 如果在 WATCH 执行之后, EXEC 执行之前, 有其他客户端修改了 mykey 的值, 那么当前客户端的事务就会失败。 程序需要做的, 就是不断重试这个操作, 直到没有发生碰撞为止。

这种形式的锁被称作乐观锁, 它是一种非常强大的锁机制。 并且因为大多数情况下, 不同的客户端会访问不同的键, 碰撞的情况一般都很少, 所以通常并不需要进行重试。

WATCH 使得 EXEC 命令需要有条件地执行:事务只能在所有被监视键都没有被修改的前提下执行,如果这个前提不能满足的话,事务就不会被执行。

WATCH 命令可以被调用多次。对键的监视从 WATCH 执行之后开始生效,直到调用 EXEC 为止。

用户还可以在单个 WATCH 命令中监视任意多个键,例如:

1
2
redis> WATCH key1 key2 key3
OK

取消 WATCH 的场景

EXEC 被调用时, 不管事务是否成功执行, 对所有键的监视都会被取消。另外, 当客户端断开连接时, 该客户端对键的监视也会被取消。

使用无参数的 UNWATCH 命令可以手动取消对所有键的监视。 对于一些需要改动多个键的事务, 有时候程序需要同时对多个键进行加锁, 然后检查这些键的当前值是否符合程序的要求。 当值达不到要求时, 就可以使用 UNWATCH 命令来取消目前对键的监视, 中途放弃这个事务, 并等待事务的下次尝试。

使用 WATCH 创建原子操作

WATCH 可以用于创建 Redis 没有内置的原子操作。

举个例子,以下代码实现了原创的 ZPOP 命令,它可以原子地弹出有序集合中分值(score)最小的元素:

1
2
3
4
5
WATCH zset
element = ZRANGE zset 0 0
MULTI
ZREM zset element
EXEC

Lua 脚本

为什么使用 Lua

前面提到了,Redis 仅支持“非严格”的事务

Lua 是一种轻量小巧的脚本语言,用标准 C 语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

Redis 从 2.6 版本开始支持执行 Lua 脚本,它的功能和事务非常类似。我们可以利用 Lua 脚本来批量执行多条 Redis 命令,这些 Redis 命令会被提交到 Redis 服务器一次性执行完成,大幅减小了网络开销。

Redis 执行 Lua 是原子操作。因为 Redis 使用串行化的方式来执行 Redis 命令, 所以在任何特定时间里, 最多都只会有一个脚本能够被放进 Lua 环境里面运行, 因此, 整个 Redis 服务器只需要创建一个 Lua 环境即可。由于,Redis 执行 Lua 具有原子性,所以常被用于需要原子性执行多命令的场景。

不过,如果 Lua 脚本运行时出错并中途结束,出错之后的命令是不会被执行的。并且,出错之前执行的命令是无法被撤销的,无法实现类似关系型数据库执行失败可以回滚的那种原子性效果。因此, 严格来说的话,通过 Lua 脚本来批量执行 Redis 命令实际也是不完全满足原子性的。

如果想要让 Lua 脚本中的命令全部执行,必须保证语句语法和命令都是对的。

另外,Redis 7.0 新增了 Redis functions 特性,你可以将 Redis functions 看作是比 Lua 更强大的脚本。

Redis 脚本命令

命令 说明
EVAL EVAL 命令为客户端输入的脚本在 Lua 环境中定义一个函数, 并通过调用这个函数来执行脚本。
EVALSHA EVALSHA 命令通过直接调用 Lua 环境中已定义的函数来执行脚本。
SCRIPT_FLUSH SCRIPT_FLUSH 命令会清空服务器 lua_scripts 字典中保存的脚本, 并重置 Lua 环境。
SCRIPT_EXISTS SCRIPT_EXISTS 命令接受一个或多个 SHA1 校验和为参数, 并通过检查 lua_scripts 字典来确认校验和对应的脚本是否存在。
SCRIPT_LOAD SCRIPT_LOAD 命令接受一个 Lua 脚本为参数, 为该脚本在 Lua 环境中创建函数, 并将脚本保存到 lua_scripts 字典中。
SCRIPT_KILL SCRIPT_KILL 命令用于停止正在执行的脚本。

Redis 执行 Lua 的工作流程

为了在 Redis 服务器中执行 Lua 脚本, Redis 在服务器内嵌了一个 Lua 环境(environment), 并对这个 Lua 环境进行了一系列修改, 从而确保这个 Lua 环境可以满足 Redis 服务器的需要。

Redis 服务器创建并修改 Lua 环境的整个过程由以下步骤组成:

  1. 创建一个基础的 Lua 环境, 之后的所有修改都是针对这个环境进行的。
  2. 载入多个函数库到 Lua 环境里面, 让 Lua 脚本可以使用这些函数库来进行数据操作。
  3. 创建全局表格 redis , 这个表格包含了对 Redis 进行操作的函数, 比如用于在 Lua 脚本中执行 Redis 命令的 redis.call 函数。
  4. 使用 Redis 自制的随机函数来替换 Lua 原有的带有副作用的随机函数, 从而避免在脚本中引入副作用。
  5. 创建排序辅助函数, Lua 环境使用这个辅佐函数来对一部分 Redis 命令的结果进行排序, 从而消除这些命令的不确定性。
  6. 创建 redis.pcall 函数的错误报告辅助函数, 这个函数可以提供更详细的出错信息。
  7. 对 Lua 环境里面的全局环境进行保护, 防止用户在执行 Lua 脚本的过程中, 将额外的全局变量添加到了 Lua 环境里面。
  8. 将完成修改的 Lua 环境保存到服务器状态的 lua 属性里面, 等待执行服务器传来的 Lua 脚本。

Redis 执行 Lua 的要点

  • Redis 服务器专门使用一个伪客户端来执行 Lua 脚本中包含的 Redis 命令。
  • Redis 使用脚本字典来保存所有被 EVAL 命令执行过, 或者被 SCRIPT_LOAD 命令载入过的 Lua 脚本, 这些脚本可以用于实现 SCRIPT_EXISTS 命令, 以及实现脚本复制功能。
  • 服务器在执行脚本之前, 会为 Lua 环境设置一个超时处理钩子, 当脚本出现超时运行情况时, 客户端可以通过向服务器发送 SCRIPT_KILL 命令来让钩子停止正在执行的脚本, 或者发送 SHUTDOWN nosave 命令来让钩子关闭整个服务器。
  • 主服务器复制 EVALSCRIPT_FLUSHSCRIPT_LOAD 三个命令的方法和复制普通 Redis 命令一样 —— 只要将相同的命令传播给从服务器就可以了。
  • 主服务器在复制 EVALSHA 命令时, 必须确保所有从服务器都已经载入了 EVALSHA 命令指定的 SHA1 校验和所对应的 Lua 脚本, 如果不能确保这一点的话, 主服务器会将 EVALSHA 命令转换成等效的 EVAL 命令, 并通过传播 EVAL 命令来获得相同的脚本执行效果。

Redis + Lua 实现分布式锁

Redis 应用 Lua 的一个经典使用场景是实现分布式锁。其实现有 3 个重要的考量点:

  1. 互斥(只能有一个客户端获取锁)
  2. 不能死锁
  3. 容错(只要大部分 redis 节点创建了这把锁就可以)

对应的 Redis 指令如下:

  • setnx - setnx key val:当且仅当 key 不存在时,set 一个 key 为 val 的字符串,返回 1;若 key 存在,则什么都不做,返回 0。
  • expire - expire key timeout:为 key 设置一个超时时间,单位为 second,超过这个时间锁会自动释放,避免死锁。
  • delete - delete key:删除 key

注意:

不要将 setnxexpire 作为两个命令组合实现加锁,这样就无法保证原子性。如果客户端在 setnx 之后崩溃,那么将导致锁无法释放。正确的做法应是在 setnx 命令中指定 expire 时间。

(1)申请锁

1
SET resource_name my_random_value NX PX 30000

执行这个命令就 ok。

  • NX:表示只有 key 不存在的时候才会设置成功。(如果此时 redis 中存在这个 key,那么设置失败,返回 nil
  • PX 30000:意思是 30s 后锁自动释放。别人创建的时候如果发现已经有了就不能加锁了。

(2)释放锁

释放锁就是删除 key ,但是一般可以用 lua 脚本删除,判断 value 一样才删除:

1
2
3
4
5
6
-- 删除锁的时候,找到 key 对应的 value,跟自己传过去的 value 做比较,如果是一样的才删除。
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

参考资料

Redis 脚本

Redis 脚本使用 Lua 解释器来执行脚本。 Redis 2.6 版本通过内嵌支持 Lua 环境。

关键词:Lua

为什么使用 Lua

Lua 是一种轻量小巧的脚本语言,用标准 C 语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

在 Redis 中,执行单一命令是原子性操作,所以不会出现并发问题。但有的业务场景下,需要执行多个命令,同时确保不出现并发问题,这就需要用到 Lua 脚本了。

Redis 执行 Lua 是原子操作。因为 Redis 使用串行化的方式来执行 Redis 命令, 所以在任何特定时间里, 最多都只会有一个脚本能够被放进 Lua 环境里面运行, 因此, 整个 Redis 服务器只需要创建一个 Lua 环境即可。

由于,Redis 执行 Lua 具有原子性,所以常被用于需要原子性执行多命令的场景。

Redis 脚本命令

命令 说明
EVAL EVAL 命令为客户端输入的脚本在 Lua 环境中定义一个函数, 并通过调用这个函数来执行脚本。
EVALSHA EVALSHA 命令通过直接调用 Lua 环境中已定义的函数来执行脚本。
SCRIPT_FLUSH SCRIPT_FLUSH 命令会清空服务器 lua_scripts 字典中保存的脚本, 并重置 Lua 环境。
SCRIPT_EXISTS SCRIPT_EXISTS 命令接受一个或多个 SHA1 校验和为参数, 并通过检查 lua_scripts 字典来确认校验和对应的脚本是否存在。
SCRIPT_LOAD SCRIPT_LOAD 命令接受一个 Lua 脚本为参数, 为该脚本在 Lua 环境中创建函数, 并将脚本保存到 lua_scripts 字典中。
SCRIPT_KILL SCRIPT_KILL 命令用于停止正在执行的脚本。

Redis 执行 Lua 的工作流程

为了在 Redis 服务器中执行 Lua 脚本, Redis 在服务器内嵌了一个 Lua 环境(environment), 并对这个 Lua 环境进行了一系列修改, 从而确保这个 Lua 环境可以满足 Redis 服务器的需要。

Redis 服务器创建并修改 Lua 环境的整个过程由以下步骤组成:

  1. 创建一个基础的 Lua 环境, 之后的所有修改都是针对这个环境进行的。
  2. 载入多个函数库到 Lua 环境里面, 让 Lua 脚本可以使用这些函数库来进行数据操作。
  3. 创建全局表格 redis , 这个表格包含了对 Redis 进行操作的函数, 比如用于在 Lua 脚本中执行 Redis 命令的 redis.call 函数。
  4. 使用 Redis 自制的随机函数来替换 Lua 原有的带有副作用的随机函数, 从而避免在脚本中引入副作用。
  5. 创建排序辅助函数, Lua 环境使用这个辅佐函数来对一部分 Redis 命令的结果进行排序, 从而消除这些命令的不确定性。
  6. 创建 redis.pcall 函数的错误报告辅助函数, 这个函数可以提供更详细的出错信息。
  7. 对 Lua 环境里面的全局环境进行保护, 防止用户在执行 Lua 脚本的过程中, 将额外的全局变量添加到了 Lua 环境里面。
  8. 将完成修改的 Lua 环境保存到服务器状态的 lua 属性里面, 等待执行服务器传来的 Lua 脚本。

Redis 执行 Lua 的要点

  • Redis 服务器专门使用一个伪客户端来执行 Lua 脚本中包含的 Redis 命令。
  • Redis 使用脚本字典来保存所有被 EVAL 命令执行过, 或者被 SCRIPT_LOAD 命令载入过的 Lua 脚本, 这些脚本可以用于实现 SCRIPT_EXISTS 命令, 以及实现脚本复制功能。
  • 服务器在执行脚本之前, 会为 Lua 环境设置一个超时处理钩子, 当脚本出现超时运行情况时, 客户端可以通过向服务器发送 SCRIPT_KILL 命令来让钩子停止正在执行的脚本, 或者发送 SHUTDOWN nosave 命令来让钩子关闭整个服务器。
  • 主服务器复制 EVALSCRIPT_FLUSHSCRIPT_LOAD 三个命令的方法和复制普通 Redis 命令一样 —— 只要将相同的命令传播给从服务器就可以了。
  • 主服务器在复制 EVALSHA 命令时, 必须确保所有从服务器都已经载入了 EVALSHA 命令指定的 SHA1 校验和所对应的 Lua 脚本, 如果不能确保这一点的话, 主服务器会将 EVALSHA 命令转换成等效的 EVAL 命令, 并通过传播 EVAL 命令来获得相同的脚本执行效果。

参考资料

Markdown 极简教程

目录

标题

Markdown 支持六个级别的标题。

1
2
3
4
5
6
7
语法:
# 一级标题
## 二级标题
### 三级标题
#### 四级标题
##### 五级标题
###### 六级标题

文本样式

:bulb: 粗体、斜体、删除线可以混合使用。

在 Markdown 中,粗体文本、斜体文本可以使用 *_ 符号标记。建议统一风格,始终只用一种符号。

语法 效果
普通文本 普通文本
*斜体文本* _斜体文本_ 斜体文本 斜体文本
**粗体文本** __粗体文本__ 粗体文本 粗体文本
~~删除文本~~ 删除文本
***粗斜体文本*** ___粗斜体文本___ 粗斜体文本 粗斜体文本

列表

无序列表

  • RED
  • YELLOW
  • BLUE

有序列表

  1. 第一步
  2. 第二步
  3. 第三步

任务列表

  • 完成任务
  • 计划任务

多级列表

  • 数据结构
    • 线性表
      • 顺序表
      • 链表
        • 单链表
        • 双链表
      • 二叉树
        • 二叉平衡树

分割线

***---___ 都可以作为分割线。




链接

普通链接

语法:

1
[钝悟的博客](https://dunwu.github.io/waterdrop/)
  • [] 中标记链接名。类似 HTML 中 <a> 元素的 title 属性。
  • () 中标记链接的 url,也支持相对路径(前提是资源可以访问)。类似 HTML 中 <a> 元素的 href 属性。

效果:

图片

Markdown 引用图片的语法:

1
![alt](url title)

alt 和 title 即对应 HTML 中 img 元素的 alt 和 title 属性(都可省略):

  • alt - 表示图片显示失败时的替换文本。

  • title - 表示鼠标悬停在图片时的显示文本(注意这里要加引号)

  • url - 即图片的 url 地址

logo

图片链接

可以将图片和链接混合使用。

logo

锚点

其实呢,每一个标题都是一个锚点,和 HTML 的锚点(#)类似,比如:回到顶部

引用

普通引用:

:question: 什么是 Markdown

Markdown是一种轻量级标记语言,创始人为约翰·格鲁伯(英语:John Gruber)。它允许人们“使用易读易写的纯文本格式编写文档,然后转换成有效的XHTML(或者HTML)文档”。[4]这种语言吸收了很多在电子邮件中已有的纯文本标记的特性。 —— 摘自 Wiki

嵌套引用:

数据结构

二叉树

平衡二叉树

满二叉树

代码高亮

标签

语法:

1
`Markdown` `Doc`

效果:

Markdown, Doc

代码块

语法一:在文本前后都使用三个反引号进行标记。【✔️️️️ 推荐】

1
2
3
这是一个文本块。
这是一个文本块。
这是一个文本块。

语法二:在连续几行的文本开头加入 1 个 Tab 或者 4 个空格。【❌ 不推荐】

这是一个文本块。
这是一个文本块。
这是一个文本块。

语法

在三个反引号后面加上编程语言的名字,另起一行开始写代码,最后一行再加上三个反引号。

1
public static void main(String[]args){} //Java
1
int main(int argc, char *argv[]) //C
1
echo "hello GitHub" #Bash
1
document.getElementById('myH1').innerHTML = 'Welcome to my Homepage' //javascipt
1
string &operator+(const string& A,const string& B) //cpp

表格

一般表格:

表头 1 表头 2
表格单元 表格单元
表格单元 表格单元

表格可以指定对齐方式:

序号 商品 价格
1 电脑 6000.0
2 鼠标 100.0
3 键盘 200.0

Emoji 表情

:bulb: 注意:部分 Markdown 引擎支持 Emoji。

合理使用 Emoji 表情,往往可以使得文章内容更加丰富生动。例如::heavy_check_mark: :x: :bulb: :bell: :heavy_exclamation_mark: :question:

更多 Emoji 表情请参考:

注脚

:bulb: 注意:部分 Markdown 引擎支持注脚。

一个具有注脚的文本。^1

数学公式

:bulb: 注意:部分 Markdown 引擎支持 Latex。

很多文档中,需要引入一些数学符号、特殊符号,其排版问题比较头疼。这种问题,可以用 Latex 来解决,大部分 Markdown 引擎都支持 Latex。

Latex 可以使用 $ 符号来标记 Latex 表达式,下面是一个数学公式示例:

$$
\Gamma(z) = \int_0^\infty t^{z-1}e^{-t}dt,.
$$

列举一些常用数学符号:

符号 语法 描述
$\leq$ $\leq$ 小于等于
$\geq$ $\geq$ 大于等于
$\neq$ $\neq$ 不等于
$\approx$ $\approx$ 约等于
$\infty$ $\infty$ 无穷
$\prod_{x}^{y}$ $\prod_{x}^{y}$ 累乘
$\sum_{i=0}^n$ $\sum_{i=0}^n$ 求和
$\int$ $\int$ 积分
$\iint$ $\iint$ 双重积分
$\log_x{y}$ $\log_x{y}$ 对数
$x^{y+1}$ $x^{y+1}$ 上标
$x_{y+1}$ $x_{y+1}$ 下标
$\frac{x}{y}$ $\frac{x}{y}$ 分数
$\sqrt[y]{x}$ $\sqrt[y]{x}$ 开方
$\sin$ $\sin$ 正弦
$\cos$ $\cos$ 余弦
$\tan$ $\tan$ 正切

更多数学符号支持请参考:

Diff

:bulb: 注意:部分 Markdown 引擎支持 Diff。

版本控制的系统中都少不了 diff 的功能,即展示一个文件内容的增加与删除。
GFM 中可以显示的展示 diff 效果。可以用 + 开头表示新增,- 开头表示删除。

1
2
+ 新增内容
- 删除内容

UML 图

💡 注意:部分 Markdown 引擎支持 mermaid

mermaid 提供了多种 UML 图。详情请参考:mermaid 文档

流程图

1
2
3
4
5
graph LR
A[Hard edge] -->|Link text| B(Round edge)
B --> C{Decision}
C -->|One| D[Result one]
C -->|Two| E[Result two]

时序图

1
2
3
4
5
6
7
8
9
10
sequenceDiagram
Alice->>Bob: Hello Bob, how are you?
alt is sick
Bob->>Alice: Not so good :(
else is well
Bob->>Alice: Feeling fresh like a daisy
end
opt Extra response
Bob->>Alice: Thanks for asking
end

甘特图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
gantt
dateFormat YYYY-MM-DD
title Adding GANTT diagram functionality to mermaid

section A section
Completed task :done, des1, 2014-01-06,2014-01-08
Active task :active, des2, 2014-01-09, 3d
Future task : des3, after des2, 5d
Future task2 : des4, after des3, 5d

section Critical tasks
Completed task in the critical line :crit, done, 2014-01-06,24h
Implement parser and jison :crit, done, after des1, 2d
Create tests for parser :crit, active, 3d
Future task in critical line :crit, 5d
Create tests for renderer :2d
Add to mermaid :1d

section Documentation
Describe gantt syntax :active, a1, after des1, 3d
Add gantt diagram to demo page :after a1 , 20h
Add another diagram to demo page :doc1, after a1 , 48h

section Last section
Describe gantt syntax :after doc1, 3d
Add gantt diagram to demo page :20h
Add another diagram to demo page :48h

HTML

有些 Markdown 引擎支持在文档中嵌入的 html 元素。

有些 Markdown 语法所不支持的特性,可以使用 html 元素来支持。

折叠

折叠内容一

展开才能看到的内容

折叠内容二

展开才能看到的内容

居中

居中显示的文本

图片尺寸

编辑器

推荐 Markdown 编辑器

  • Typora - 个人认为是功能最强的 Markdown 编辑器。
  • Visual Studio Code - 可以通过安装插件,量身打造 Markdown 编辑器。
  • marktext - 一款简单优雅的 Markdown 编辑器。
  • StackEdit - 在线 Markdown 编辑器。
  • Editor.md - 在线 Markdown 编辑器。
  • Marxico - 一款专为印象笔记(Evernote)打造的 Markdown 编辑器。

想了解更多 Markdown 编辑器可以参考:主流 Markdown 编辑器推荐

参考资料

流量控制

在高并发场景下,为了应对瞬时海量请求的压力,保障系统的平稳运行,必须预估系统的流量阈值,通过限流规则阻断处理不过来的请求。

流量控制简介

什么是流量控制

流量控制(Flow Control),根据流量、并发线程数、响应时间等指标,把随机到来的流量调整成合适的形状,即流量塑形。避免应用被瞬时的流量高峰冲垮,从而保障应用的高可用性。

为什么需要流量控制

复杂的分布式系统架构中的应用程序往往具有数十个依赖项,每个依赖项都会不可避免地在某个时刻失败。 如果主机应用程序未与这些外部故障隔离开来,则可能会被波及。

例如,对于依赖于 30 个服务的应用程序,假设每个服务的正常运行时间为 99.99%,则可以期望:

99.9930 = 99.7% 的正常运行时间

10 亿个请求中的 0.3%= 3,000,000 个失败

即使所有依赖项都具有出色的正常运行时间,每月也会有 2 个小时以上的停机时间。

然而,现实情况一般比这种估量情况更糟糕。


当一切正常时,整体系统如下所示:

img

图片来自 Hystrix Wiki

在分布式系统架构下,这些强依赖的子服务稳定与否对系统的影响非常大。但是,依赖的子服务可能有很多不可控问题:如网络连接、资源繁忙、服务宕机等。例如:下图中有一个 QPS 为 50 的依赖服务 I 出现不可用,但是其他依赖服务是可用的。

img

图片来自 Hystrix Wiki

当流量很大的情况下,某个依赖的阻塞,会导致上游服务请求被阻塞。当这种级联故障愈演愈烈,就可能造成整个线上服务不可用的雪崩效应,如下图。这种情况若持续恶化,如果上游服务本身还被其他服务所依赖,就可能出现多米洛骨牌效应,导致多个服务都无法正常工作。

img

图片来自 Hystrix Wiki

流量控制有哪些保护机制

流量控制常见的手段就是限流、熔断、降级。

什么是降级?

降级是保障服务能够稳定运行的一种保护方式:面对突增的流量,牺牲一些吞吐量以换取系统的稳定。常见的降级实现方式有:开关降级、限流降级、熔断降级。

什么是限流?

限流一般针对下游服务,当上游流量较大时,避免被上游服务的请求撑爆。

限流就是限制系统的输入和输出流量,以达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。

限流规则包含三个部分:时间粒度,接口粒度,最大限流值。限流规则设置是否合理直接影响到限流是否合理有效。

什么是熔断?

熔断一般针对上游服务,当下游服务超时/异常较多时,避免被下游服务拖垮。

当调用链路中某个资源出现不稳定,例如,超时异常比例升高的时候,则对这个资源的调用进行限制,并让请求快速失败,避免影响到其它的资源,最终产生雪崩的效果。

熔断尽最大的可能去完成所有的请求,容忍一些失败,熔断也能自动恢复。熔断的常见策略有:

  • 在每秒请求异常数超过多少时触发熔断降级
  • 在每秒请求异常错误率超过多少时触发熔断降级
  • 在每秒请求平均耗时超过多少时触发熔断降级

流量控制有哪些衡量指标

流量控制有以下几个角度:

  • 流量指标,例如 QPS、并发线程数等。
  • 资源的调用关系,例如资源的调用链路,资源和资源之间的关系,调用来源等。
  • 控制效果,例如排队等待、直接拒绝、Warm Up(预热)等。

限流算法

限流的本质是:在一定的时间范围内,限制某一个资源被访问的频率。如何去限制流量,就需要采用一定的策略,即限流算法。常见的限流算法有:固定窗口限流算法、滑动窗口限流算法、漏桶限流算法、令牌桶限流算法。

下面,将对这几种算法进行一一介绍。

固定窗口限流算法

固定窗口限流算法的原理

固定窗口限流算法的基本策略是:

  1. 设置一个固定时间窗口,以及这个固定时间窗口内的最大请求数;
  2. 为每个固定时间窗口设置一个计数器,用于统计请求数;
  3. 一旦请求数超过最大请求数,则请求会被拦截。

img

固定窗口限流算法的利弊

固定窗口限流算法的优点是:实现简单。

固定窗口限流算法的缺点是:存在临界问题。所谓临界问题,是指:流量分别集中在一个固定时间窗口的尾部和一个固定时间窗口的头部。举例来说,假设限流规则为每分钟不超过 100 次请求。在第一个时间窗口中,起初没有任何请求,在最后 1 s,收到 100 次请求,由于没有达到阈值,所有请求都通过;在第二个时间窗口中,第 1 秒就收到 100 次请求,而后续没有任何请求。虽然,这两个时间窗口内的流量都符合限流要求,但是在两个时间窗口临界的这 2s 内,实际上有 200 次请求,显然是超过预期吞吐量的,存在压垮系统的可能。

img

固定窗口限流算法的实现

:::details Java 版本的固定窗口限流算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class SlidingWindowRateLimiter implements RateLimiter {

/**
* 允许的最大请求数
*/
private final long maxPermits;

/**
* 窗口期时长
*/
private long periodMillis;

/**
* 分片窗口期时长
*/
private final long shardPeriodMillis;

/**
* 窗口期截止时间
*/
private long lastPeriodMillis;

/**
* 分片窗口数
*/
private final int shardNum;

/**
* 请求总计数
*/
private final AtomicLong totalCount = new AtomicLong(0);

/**
* 分片窗口计数列表
*/
private final List<AtomicLong> countList = new LinkedList<>();

public SlidingWindowRateLimiter(long qps, int shardNum) {
this(qps, 1000, TimeUnit.MILLISECONDS, shardNum);
}

public SlidingWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, int shardNum) {
this.maxPermits = maxPermits;
this.periodMillis = timeUnit.toMillis(period);
this.lastPeriodMillis = System.currentTimeMillis();
this.shardPeriodMillis = timeUnit.toMillis(period) / shardNum;
this.shardNum = shardNum;
for (int i = 0; i < shardNum; i++) {
countList.add(new AtomicLong(0));
}
}

@Override
public synchronized boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
if (now > lastPeriodMillis) {
for (int shardId = 0; shardId < shardNum; shardId++) {
long shardCount = countList.get(shardId).get();
totalCount.addAndGet(-shardCount);
countList.set(shardId, new AtomicLong(0));
lastPeriodMillis += shardPeriodMillis;
}
}
int shardId = (int) (now % periodMillis / shardPeriodMillis);
if (totalCount.get() + permits <= maxPermits) {
countList.get(shardId).addAndGet(permits);
totalCount.addAndGet(permits);
return true;
} else {
return false;
}
}

}

:::

滑动窗口限流算法

滑动窗口限流算法的原理

滑动窗口限流算法是对固定窗口限流算法的改进,解决了临界问题。

滑动窗口限流算法的基本策略是:

  • 将固定时间窗口分片为多个子窗口,每个子窗口的访问次数独立统计;
  • 当请求时间大于当前子窗口的最大时间时,则将当前子窗口废弃,并将计时窗口向前滑动,并将下一个子窗口置为当前窗口。
  • 要保证所有子窗口的统计数之和不能超过阈值。

滑动窗口限流算法就是针对固定窗口限流算法的更细粒度的控制,分片越多,则限流越精准。

img

滑动窗口限流算法的利弊

滑动窗口限流算法的优点是:在滑动窗口限流算法中,临界位置的突发请求都会被算到时间窗口内,因此可以解决计数器算法的临界问题。

滑动窗口限流算法的缺点是:

  • 额外的内存开销 - 滑动时间窗口限流算法的时间窗口是持续滑动的,并且除了需要一个计数器来记录时间窗口内接口请求次数之外,还需要记录在时间窗口内每个接口请求到达的时间点,所以存在额外的内存开销。
  • 限流的控制粒度受限于窗口分片粒度 - 滑动窗口限流算法,只能在选定的时间粒度上限流,对选定时间粒度内的更加细粒度的访问频率不做限制。但是,由于每个分片窗口都有额外的内存开销,所以也并不是分片数越多越好的。

滑动窗口限流算法的实现

:::details Java 版本的滑动窗口限流算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class SlidingWindowRateLimiter implements RateLimiter {

/**
* 允许的最大请求数
*/
private final long maxPermits;

/**
* 窗口期时长
*/
private final long periodMillis;

/**
* 分片窗口期时长
*/
private final long shardPeriodMillis;

/**
* 窗口期截止时间
*/
private long lastPeriodMillis;

/**
* 分片窗口数
*/
private final int shardNum;

/**
* 请求总计数
*/
private final AtomicLong totalCount = new AtomicLong(0);

/**
* 分片窗口计数列表
*/
private final List<AtomicLong> countList = new LinkedList<>();

public SlidingWindowRateLimiter(long qps, int shardNum) {
this(qps, 1000, TimeUnit.MILLISECONDS, shardNum);
}

public SlidingWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, int shardNum) {
this.maxPermits = maxPermits;
this.periodMillis = timeUnit.toMillis(period);
this.lastPeriodMillis = System.currentTimeMillis();
this.shardPeriodMillis = timeUnit.toMillis(period) / shardNum;
this.shardNum = shardNum;
for (int i = 0; i < shardNum; i++) {
countList.add(new AtomicLong(0));
}
}

@Override
public synchronized boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
if (now > lastPeriodMillis) {
for (int shardId = 0; shardId < shardNum; shardId++) {
long shardCount = countList.get(shardId).get();
totalCount.addAndGet(-shardCount);
countList.set(shardId, new AtomicLong(0));
lastPeriodMillis += shardPeriodMillis;
}
}
int shardId = (int) (now % periodMillis / shardPeriodMillis);
if (totalCount.get() + permits <= maxPermits) {
countList.get(shardId).addAndGet(permits);
totalCount.addAndGet(permits);
return true;
} else {
return false;
}
}

}

:::

漏桶限流算法

漏桶限流算法的原理

漏桶限流算法的基本策略是:

  • 水(请求)以任意速率由入口进入到漏桶中;
  • 水以固定的速率由出口出水(请求通过);
  • 漏桶的容量是固定的,如果水的流入速率大于流出速率,最终会导致漏桶中的水溢出(这意味着请求拒绝)。

img

漏桶限流算法的利弊

漏桶限流算法的优点是:流量速率固定——即无论流量多大,即便是突发的大流量,处理请求的速度始终是固定的。

漏桶限流算法的缺点是:不能灵活的调整流量。例如:一个集群通过增减节点的方式,弹性伸缩了其吞吐能力,漏桶限流算法无法随之调整。

漏桶策略适用于间隔性突发流量且流量不用即时处理的场景

漏桶限流算法的实现

:::details Java 版本的漏桶限流算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import java.util.concurrent.atomic.AtomicLong;

public class LeakyBucketRateLimiter implements RateLimiter {

/**
* QPS
*/
private final int qps;

/**
* 桶的容量
*/
private final long capacity;

/**
* 计算的起始时间
*/
private long beginTimeMillis;

/**
* 桶中当前的水量
*/
private final AtomicLong waterNum = new AtomicLong(0);

public LeakyBucketRateLimiter(int qps, int capacity) {
this.qps = qps;
this.capacity = capacity;
}

@Override
public synchronized boolean tryAcquire(int permits) {

// 如果桶中没有水,直接通过
if (waterNum.get() == 0) {
beginTimeMillis = System.currentTimeMillis();
waterNum.addAndGet(permits);
return true;
}

// 计算水量
long leakedWaterNum = ((System.currentTimeMillis() - beginTimeMillis) / 1000) * qps;
long currentWaterNum = waterNum.get() - leakedWaterNum;
waterNum.set(Math.max(0, currentWaterNum));

// 重置时间
beginTimeMillis = System.currentTimeMillis();

if (waterNum.get() + permits < capacity) {
waterNum.addAndGet(permits);
return true;
} else {
return false;
}
}

}

:::

令牌桶限流算法

令牌桶限流算法的原理

img

令牌桶算法的原理

  1. 接口限制 T 秒内最大访问次数为 N,则每隔 T/N 秒会放一个 token 到桶中
  2. 桶内最多存放 M 个 token,如果 token 到达时令牌桶已经满了,那么这个 token 就会被丢弃
  3. 接口请求会先从令牌桶中取 token,拿到 token 则处理接口请求,拿不到 token 则进行限流处理

令牌桶限流算法的利弊

因为令牌桶存放了很多令牌,那么大量的突发请求会被执行,但是它不会出现临界问题,在令牌用完之后,令牌是以一个恒定的速率添加到令牌桶中的,因此不能再次发送大量突发请求。

规定固定容量的桶,token 以固定速度往桶内填充,当桶满时 token 不会被继续放入,每过来一个请求把 token 从桶中移除,如果桶中没有 token 不能请求。

令牌桶算法适用于有突发特性的流量,且流量需要即时处理的场景

令牌桶限流算法的实现

:::details Java 实现令牌桶算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.concurrent.atomic.AtomicLong;

/**
* 令牌桶限流算法
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2024-01-18
*/
public class TokenBucketRateLimiter implements RateLimiter {

/**
* QPS
*/
private final long qps;

/**
* 桶的容量
*/
private final long capacity;

/**
* 上一次令牌发放时间
*/
private long endTimeMillis;

/**
* 桶中当前的令牌数量
*/
private final AtomicLong tokenNum = new AtomicLong(0);

public TokenBucketRateLimiter(long qps, long capacity) {
this.qps = qps;
this.capacity = capacity;
this.endTimeMillis = System.currentTimeMillis();
}

@Override
public synchronized boolean tryAcquire(int permits) {

long now = System.currentTimeMillis();
long gap = now - endTimeMillis;

// 计算令牌数
long newTokenNum = (gap * qps / 1000);
long currentTokenNum = tokenNum.get() + newTokenNum;
tokenNum.set(Math.min(capacity, currentTokenNum));

if (tokenNum.get() < permits) {
return false;
} else {
tokenNum.addAndGet(-permits);
endTimeMillis = now;
return true;
}
}

}

:::

扩展

Guava 的 RateLimiter 工具类就是基于令牌桶算法实现,其源码分析可以参考:RateLimiter 基于漏桶算法,但它参考了令牌桶算法

限流算法测试

:::details 限流算法测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class RateLimiterDemo {

public static void main(String[] args) {

// ============================================================================

int qps = 20;

System.out.println("======================= 固定时间窗口限流算法 =======================");
FixedWindowRateLimiter fixedWindowRateLimiter = new FixedWindowRateLimiter(qps);
testRateLimit(fixedWindowRateLimiter, qps);

System.out.println("======================= 滑动时间窗口限流算法 =======================");
SlidingWindowRateLimiter slidingWindowRateLimiter = new SlidingWindowRateLimiter(qps, 10);
testRateLimit(slidingWindowRateLimiter, qps);

System.out.println("======================= 漏桶限流算法 =======================");
LeakyBucketRateLimiter leakyBucketRateLimiter = new LeakyBucketRateLimiter(qps, 100);
testRateLimit(leakyBucketRateLimiter, qps);

System.out.println("======================= 令牌桶限流算法 =======================");
TokenBucketRateLimiter tokenBucketRateLimiter = new TokenBucketRateLimiter(qps, 100);
testRateLimit(tokenBucketRateLimiter, qps);
}

private static void testRateLimit(RateLimiter rateLimiter, int qps) {

AtomicInteger okNum = new AtomicInteger(0);
AtomicInteger limitNum = new AtomicInteger(0);
ExecutorService executorService = ThreadUtil.newFixedExecutor(10, "限流测试", true);
long beginTime = System.currentTimeMillis();

int threadNum = 4;
final CountDownLatch latch = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
executorService.submit(() -> {
try {
batchRequest(rateLimiter, okNum, limitNum, 1000);
} catch (Exception e) {
log.error("发生异常!", e);
} finally {
latch.countDown();
}
});
}

try {
latch.await(10, TimeUnit.SECONDS);
long endTime = System.currentTimeMillis();
long gap = endTime - beginTime;
log.info("限流 QPS: {} -> 实际结果:耗时 {} ms,{} 次请求成功,{} 次请求被限流,实际 QPS: {}",
qps, gap, okNum.get(), limitNum.get(), okNum.get() * 1000 / gap);
if (okNum.get() == qps) {
log.info("限流符合预期");
}
} catch (Exception e) {
log.error("发生异常!", e);
} finally {
executorService.shutdown();
}
}

private static void batchRequest(RateLimiter rateLimiter, AtomicInteger okNum, AtomicInteger limitNum, int num)
throws InterruptedException {
for (int j = 0; j < num; j++) {
if (rateLimiter.tryAcquire(1)) {
log.info("请求成功");
okNum.getAndIncrement();
} else {
log.info("请求限流");
limitNum.getAndIncrement();
}
TimeUnit.MILLISECONDS.sleep(RandomUtil.randomInt(0, 10));
}
}

}

:::

限流框架 - Hystrix

Hystrix 是由 Netflix 开源,用于处理分布式系统的延迟和容错的一个开源组件。在分布式系统里,许多依赖不可避免的会调用失败,比如超时、异常等。Hystrix 采用断路器模式来实现服务间的彼此隔离,从而避免级联故障,以提高分布式系统整体的弹性。

“断路器”本身是一种开关装置,当某个服务单元发生故障之后,通过断路器的故障监控(类似熔断保险丝),向调用方返回一个符合预期的、可处理的备选响应(FallBack),而不是长时间的等待或者抛出调用方无法处理的异常,这样就保证了服务调用方的线程不会被长时间、不必要地占用,从而避免了故障在分布式系统中的蔓延,乃至雪崩。

Hystrix 官方已宣布不再发布新版本。但是,Hystrix 的断路器设计理念,有非常高的学习价值。

如果使用 Hystrix 对每个基础依赖服务进行过载保护,则整个系统架构将会类似下图所示,每个依赖项彼此隔离,受到延迟时发生饱和的资源的被限制访问,并包含 fallback 逻辑(用于降级处理),该逻辑决定了在依赖项中发生任何类型的故障时做出对应的处理。

img

Hystrix 原理

如下图所示,Hystrix 的工作流程大致可以分为 9 个步骤。

img

(一)构建一个 HystrixCommand 或 HystrixObservableCommand 对象

Hystrix 进行资源隔离,其实是提供了一个抽象,叫做命令模式。这也是 Hystrix 最基本的资源隔离技术

在使用 Hystrix 的过程中,会对依赖服务的调用请求封装成命令对象,Hystrix 对 命令对象抽象了两个抽象类:HystrixCommandHystrixObservableCommand

  • HystrixCommand 表示的命令对象会返回一个唯一返回值。
  • HystrixObservableCommand 表示的命令对象会返回多个返回值。
1
2
HystrixCommand command = new HystrixCommand(arg1, arg2);
HystrixObservableCommand command = new HystrixObservableCommand(arg1, arg2);

(二)执行命令

Hystrix 中共有 4 种方式执行命令,如下所示:

执行方式 说明 可用对象
execute() 阻塞式同步执行,返回依赖服务的单一返回结果(或者抛出异常) HystrixCommand
queue() 异步执行,通过 Future 返回依赖服务的单一返回结果(或者抛出异常) HystrixCommand
observe() 基于 Rxjava 的 Observable 方式,返回通过 Observable 表示的依赖服务返回结果。代调用代码先执行 (Hot Obserable) HystrixObservableCommand
toObservable() 基于 Rxjava 的 Observable 方式,返回通过 Observable 表示的依赖服务返回结果。执行代码等到真正订阅的时候才会执行 (cold observable) HystrixObservableCommand

这四种命令中,exeucte()queue()observe() 的表示其实是通过 toObservable() 实现的,其转换关系如下图所示:

img

HystrixCommand 执行方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
K value   = command.execute();
// 等价语句:
K value = command.execute().queue().get();

Future<K> fValue = command.queue();
//等价语句:
Future<K> fValue = command.toObservable().toBlocking().toFuture();

Observable<K> ohValue = command.observe(); //hot observable,立刻订阅,命令立刻执行
//等价语句:
Observable<K> ohValue = command.toObservable().subscribe(subject);

// 上述执行最终实现还是基于 toObservable()
Observable<K> ocValue = command.toObservable(); //cold observable,延后订阅,订阅发生后,执行才真正执行

(三)是否缓存

如果当前命令对象启用了请求缓存,并且请求的响应存在于缓存中,则缓存的响应会立刻以 Observable 的形式返回。

(四)是否开启断路器

如果第三步没有缓存没有命中,则判断一下当前断路器的断路状态是否打开。如果断路器状态为打开状态,则 Hystrix 将不会执行此 Command 命令,直接执行步骤 8 调用 Fallback;

如果断路器状态是关闭,则执行步骤 5 检查是否有足够的资源运行 Command 命令

(五)信号量、线程池是否拒绝

当您执行该命令时,Hystrix 会检查断路器以查看电路是否打开。

如果电路开路(或“跳闸”),则 Hystrix 将不会执行该命令,而是将流程路由到 (8) 获取回退。

如果电路闭合,则流程前进至 (5) 以检查是否有可用容量来运行命令。

如果当前要执行的 Command 命令 先关连的线程池 和队列(或者信号量)资源已经满了,Hystrix 将不会运行 Command 命令,直接执行 步骤 8 的 Fallback 降级处理;如果未满,表示有剩余的资源执行 Command 命令,则执行步骤 6

(六)construct()run()

当经过步骤 5 判断,有足够的资源执行 Command 命令时,本步骤将调用 Command 命令运行方法,基于不同类型的 Command,有如下两种两种运行方式:

运行方式 说明
HystrixCommand.run() 返回一个处理结果或者抛出一个异常
HystrixObservableCommand.construct() 返回一个 Observable 表示的结果(可能多个),或者 基于onError的错误通知

如果run() 或者construct()方法 的真实执行时间超过了 Command 设置的超时时间阈值, 则当前则执行线程(或者是独立的定时器线程)将会抛出TimeoutException。抛出超时异常 TimeoutException,后,将执行**步骤 8 **的 Fallback 降级处理。即使run()或者construct()执行没有被取消或中断,最终能够处理返回结果,但在降级处理逻辑中,将会抛弃run()construct()方法的返回结果,而返回 Fallback 降级处理结果。

注意事项
需要注意的是,Hystrix 无法强制 将正在运行的线程停止掉–Hystrix 能够做的最好的方式就是在 JVM 中抛出一个InterruptedException。如果 Hystrix 包装的工作不抛出中断异常InterruptedException, 则在 Hystrix 线程池中的线程将会继续执行,尽管调用的客户端已经接收到了TimeoutException。这种方式会使 Hystrix 的线程池处于饱和状态。大部分的 Java Http Client 开源库并不会解析 InterruptedException。所以确认 HTTP client 相关的连接和读/写相关的超时时间设置。
如果 Command 命令没有抛出任何异常,并且有返回结果,则 Hystrix 将会在做完日志记录和统计之后会将结果返回。 如果是通过run()方式运行,则返回一个Obserable对象,包含一个唯一值,并且发送一个onCompleted通知;如果是通过consturct()方式运行 ,则返回一个Observable 对象

(七)健康检查

Hystrix 会统计 Command 命令执行执行过程中的成功数失败数拒绝数超时数, 将这些信息记录到断路器 (Circuit Breaker) 中。断路器将上述统计按照时间窗的形式记录到一个定长数组中。断路器根据时间窗内的统计数据去判定请求什么时候可以被熔断,熔断后,在接下来一段恢复周期内,相同的请求过来后会直接被熔断。当再次校验,如果健康监测通过后,熔断开关将会被关闭。

(八)获取 Fallback

当以下场景出现后,Hystrix 将会尝试触发 Fallback:

  • 步骤 6 Command 执行时抛出了任何异常;
  • 步骤 4 断路器已经被打开
  • 步骤 5 执行命令的线程池、队列或者信号量资源已满
  • 命令执行的时间超过阈值

(九)返回结果

如果 Hystrix 命令对象执行成功,将会返回结果,或者以Observable形式包装的结果。根据**步骤 2 **的 command 调用方式,返回的Observable 会按照如下图说是的转换关系进行返回:

img

  • execute() — 用和 .queue() 相同的方式获取 Future,然后调用 Futureget() 以获取 Observable 的单个值。
  • queue() —将 Observable 转换为 BlockingObservable,以便可以将其转换为 Future 并返回。
  • watch() —订阅 Observable 并开始执行命令的流程; 返回一个 Observable,当订阅该 Observable 时,它会重新通知。
  • toObservable() —返回不变的 Observable; 必须订阅它才能真正开始执行命令的流程。

断路器工作原理

img

  1. 断路器时间窗内的请求数是否超过了请求数断路器生效阈值circuitBreaker.requestVolumeThreshold,如果超过了阈值,则将会触发断路,断路状态为开启
    例如,如果当前阈值设置的是20,则当时间窗内统计的请求数共计 19 个,即使 19 个全部失败了,都不会触发断路器。
  2. 并且请求错误率超过了请求错误率阈值errorThresholdPercentage
  3. 如果两个都满足,则将断路器由关闭迁移到开启
  4. 如果断路器开启,则后续的所有相同请求将会被断路掉;
  5. 直到过了沉睡时间窗sleepWindowInMilliseconds后,再发起请求时,允许其通过(此时的状态为半开起状态)。如果请求失败了,则保持断路器状态为开启状态,并更新沉睡时间窗。如果请求成功了,则将断路器状态改为关闭状态;

核心的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public void onNext(HealthCounts hc) {
// check if we are past the statisticalWindowVolumeThreshold
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
//we are not past the minimum error threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
// our failure rate is too high, we need to set the state to OPEN
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}

限流框架 - Sentinel

其他限流解决方案

Guava RateLimiter

Guava 是 Google 开源的 Java 类库,提供了一个工具类 RateLimiter,它基于令牌桶算法实现了本地限流器。

:::details RateLimiter 限流示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 限流器流速:2 个请求/秒
RateLimiter limiter = RateLimiter.create(2.0);
// 执行任务的线程池
ExecutorService es = Executors.newFixedThreadPool(1);
// 记录上一次执行时间
prev = System.nanoTime();
// 测试执行 20 次
for (int i = 0; i < 20; i++) {
// 限流器限流
limiter.acquire();
// 提交任务异步执行
es.execute(() -> {
long cur = System.nanoTime();
// 打印时间间隔:毫秒
System.out.println((cur - prev) / 1000000);
prev = cur;
});
}

// 输出结果:
// ...
// 500
// 499
// 500
// 499

:::

Redis + Lua

如果想要针对分布式系统资源进行限流,则必须具备两个要素:

  1. 对于资源的访问统计,必须是所有分布式节点都可以共享访问的数据存储;并且,由于在高并发场景下,读写访问统计数据会很频繁,该数据存储必须有很高的读写性能。
  2. 访问统计、限流计算都以原子操作方式进行。

满足以上要素的一种简单解决方案是,采用 Redis + Lua 来实现,原因在于:

  • Redis 数据库的读写性能极高;
  • Redis 支持以原子操作的方式执行 Lua 脚本。

Redis + Lua 实现的固定窗口限流算法

Redis + Lua 实现的固定窗口限流算法实现思路:

  • 根据实际需要,将当前时间格式化为天(yyyyMMdd)、时(yyyyMMddHH)、分(yyyyMMddHHmm)、秒(yyyyMMddHHmmss),并作为 Redis 的 String 类型 Key。该 Key 可以视为一个固定时间窗口,其中的 value 用于统计访问量;
  • 用于代表不同粒度的时间窗口按需设置过期时间;
  • 一旦达到窗口的限流阈值时,请求被限流;否则请求通过。

:::details Redis + Lua 实现的固定窗口限流算法

下面的代码片段模拟通过一个大小为 1 分钟的固定时间窗口进行限流,阈值为 100,过期时间 60s。

限流脚本 fixed_window_rate_limit.lua 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-- 缓存 Key
local key = KEYS[1]
-- 访问请求数
local permits = tonumber(ARGV[1])
-- 过期时间
local seconds = tonumber(ARGV[2])
-- 限流阈值
local limit = tonumber(ARGV[3])

-- 获取统计值
local count = tonumber(redis.call('GET', key) or "0")

if count + permits > limit then
-- 触发限流
return 0
else
redis.call('INCRBY', key, permits)
redis.call('EXPIRE', key, seconds)
return count + permits
end

调用 lua 的实际限流代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.resource.ResourceUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class RedisFixedWindowRateLimiter implements RateLimiter {

private static final String REDIS_HOST = "localhost";

private static final int REDIS_PORT = 6379;

private static final Jedis JEDIS;

public static final String SCRIPT;

static {
// Jedis 有多种构造方法,这里选用最简单的一种情况
JEDIS = new Jedis(REDIS_HOST, REDIS_PORT);

// 触发 ping 命令
try {
JEDIS.ping();
System.out.println("jedis 连接成功");
} catch (JedisConnectionException e) {
e.printStackTrace();
}

SCRIPT = FileUtil.readString(ResourceUtil.getResource("scripts/fixed_window_rate_limit.lua"),
StandardCharsets.UTF_8);
}

private final long maxPermits;
private final long periodSeconds;
private final String key;

public RedisFixedWindowRateLimiter(long qps, String key) {
this(qps * 60, 60, TimeUnit.SECONDS, key);
}

public RedisFixedWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, String key) {
this.maxPermits = maxPermits;
this.periodSeconds = timeUnit.toSeconds(period);
this.key = key;
}

@Override
public boolean tryAcquire(int permits) {
List<String> keys = Collections.singletonList(key);
List<String> args = CollectionUtil.newLinkedList(String.valueOf(permits), String.valueOf(periodSeconds),
String.valueOf(maxPermits));
Object eval = JEDIS.eval(SCRIPT, keys, args);
long value = (long) eval;
return value != -1;
}

public static void main(String[] args) throws InterruptedException {

int qps = 20;
RateLimiter jedisFixedWindowRateLimiter = new RedisFixedWindowRateLimiter(qps, "rate:limit:20240122210000");

// 模拟在一分钟内,不断收到请求,限流是否有效
int seconds = 60;
long okNum = 0L;
long total = 0L;
long beginTime = System.currentTimeMillis();
int num = RandomUtil.randomInt(qps, 100);
for (int second = 0; second < seconds; second++) {
for (int i = 0; i < num; i++) {
total++;
if (jedisFixedWindowRateLimiter.tryAcquire(1)) {
okNum++;
System.out.println("请求成功");
} else {
System.out.println("请求限流");
}
}
TimeUnit.SECONDS.sleep(1);
}
long endTime = System.currentTimeMillis();
long time = (endTime - beginTime) / 1000;
System.out.println(StrUtil.format("请求通过数:{},总请求数:{},实际 QPS:{}", okNum, total, okNum / time));
}

}

:::

Redis + Lua 实现的令牌桶限流算法

:::details Redis + Lua 实现的令牌桶限流算法

限流脚本 token_bucket_rate_limit.lua 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
local tokenKey = KEYS[1]
local timeKey = KEYS[2]

-- 申请令牌数
local permits = tonumber(ARGV[1])
-- QPS
local qps = tonumber(ARGV[2])
-- 桶的容量
local capacity = tonumber(ARGV[3])
-- 当前时间(单位:毫秒)
local nowMillis = tonumber(ARGV[4])
-- 填满令牌桶所需要的时间
local fillTime = capacity / qps
local ttl = math.min(capacity, math.floor(fillTime * 2))

local currentTokenNum = tonumber(redis.call("GET", tokenKey))
if currentTokenNum == nil then
currentTokenNum = capacity
end

local endTimeMillis = tonumber(redis.call("GET", timeKey))
if endTimeMillis == nil then
endTimeMillis = 0
end

local gap = nowMillis - endTimeMillis
local newTokenNum = math.max(0, gap * qps / 1000)
local currentTokenNum = math.min(capacity, currentTokenNum + newTokenNum)

if currentTokenNum < permits then
-- 请求拒绝
return -1
else
-- 请求通过
local finalTokenNum = currentTokenNum - permits
redis.call("SETEX", tokenKey, ttl, finalTokenNum)
redis.call("SETEX", timeKey, ttl, nowMillis)
return finalTokenNum
end

调用 lua 的实际限流代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package io.github.dunwu.distributed.ratelimit;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.resource.ResourceUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* 基于 Redis + Lua 实现的令牌桶限流算法
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2024-01-23
*/
public class RedisTokenBucketRateLimiter implements RateLimiter {

private static final String REDIS_HOST = "localhost";

private static final int REDIS_PORT = 6379;

private static final Jedis JEDIS;

public static final String SCRIPT;

static {
// Jedis 有多种构造方法,这里选用最简单的一种情况
JEDIS = new Jedis(REDIS_HOST, REDIS_PORT);

// 触发 ping 命令
try {
JEDIS.ping();
System.out.println("jedis 连接成功");
} catch (JedisConnectionException e) {
e.printStackTrace();
}

SCRIPT = FileUtil.readString(ResourceUtil.getResource("scripts/token_bucket_rate_limit.lua"),
StandardCharsets.UTF_8);
}

private final long qps;
private final long capacity;
private final String tokenKey;
private final String timeKey;

public RedisTokenBucketRateLimiter(long qps, long capacity, String tokenKey, String timeKey) {
this.qps = qps;
this.capacity = capacity;
this.tokenKey = tokenKey;
this.timeKey = timeKey;
}

@Override
public boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
List<String> keys = CollectionUtil.newLinkedList(tokenKey, timeKey);
List<String> args = CollectionUtil.newLinkedList(String.valueOf(permits), String.valueOf(qps),
String.valueOf(capacity), String.valueOf(now));
Object eval = JEDIS.eval(SCRIPT, keys, args);
long value = (long) eval;
return value != -1;
}

public static void main(String[] args) throws InterruptedException {

int qps = 20;
int bucket = 100;
RedisTokenBucketRateLimiter redisTokenBucketRateLimiter =
new RedisTokenBucketRateLimiter(qps, bucket, "token:rate:limit", "token:rate:limit:time");

// 先将令牌桶预热令牌申请完,后续才能真实反映限流 QPS
redisTokenBucketRateLimiter.tryAcquire(bucket);
TimeUnit.SECONDS.sleep(1);

// 模拟在一分钟内,不断收到请求,限流是否有效
int seconds = 60;
long okNum = 0L;
long total = 0L;
long beginTime = System.currentTimeMillis();
for (int second = 0; second < seconds; second++) {
int num = RandomUtil.randomInt(qps, 100);
for (int i = 0; i < num; i++) {
total++;
if (redisTokenBucketRateLimiter.tryAcquire(1)) {
okNum++;
System.out.println("请求成功");
} else {
System.out.println("请求限流");
}
}
TimeUnit.SECONDS.sleep(1);
}
long endTime = System.currentTimeMillis();
long time = (endTime - beginTime) / 1000;
System.out.println(StrUtil.format("请求通过数:{},总请求数:{},实际 QPS:{}", okNum, total, okNum / time));
}

}

:::

参考资料

Java 容器之 Map

Map 简介

Map 架构

Map 家族主要成员功能如下:

  • Map 是 Map 容器家族的祖先,Map 是一个用于保存键值对(key-value)的接口。Map 中不能包含重复的键;每个键最多只能映射到一个值。
  • AbstractMap 继承了 Map 的抽象类,它实现了 Map 中的核心 API。其它 Map 的实现类可以通过继承 AbstractMap 来减少重复编码。
  • SortedMap 继承了 Map 的接口。SortedMap 中的内容是排序的键值对,排序的方法是通过实现比较器(Comparator)完成的。
  • NavigableMap 继承了 SortedMap 的接口。相比于 SortedMapNavigableMap 有一系列的“导航”方法;如”获取大于/等于某对象的键值对”、“获取小于/等于某对象的键值对”等等。
  • HashMap 继承了 AbstractMap,但没实现 NavigableMap 接口。HashMap 的主要作用是储存无序的键值对,而 Hash 也体现了它的查找效率很高。HashMap 是使用最广泛的 Map
  • Hashtable 虽然没有继承 AbstractMap,但它继承了 DictionaryDictionary 也是键值对的接口),而且也实现 Map 接口。因此,Hashtable 的主要作用是储存无序的键值对。和 HashMap 相比,Hashtable 在它的主要方法中使用 synchronized 关键字修饰,来保证线程安全。但是,由于它的锁粒度太大,非常影响读写速度,所以,现代 Java 程序几乎不会使用 Hashtable ,如果需要保证线程安全,一般会用 ConcurrentHashMap 来替代。
  • TreeMap 继承了 AbstractMap,且实现了 NavigableMap 接口。TreeMap 的主要作用是储存有序的键值对,排序依据根据元素类型的 Comparator 而定。
  • WeakHashMap 继承了 AbstractMapWeakHashMap 的键是弱引用 (即 WeakReference),它的主要作用是当 GC 内存不足时,会自动将 WeakHashMap 中的 key 回收,这避免了 WeakHashMap 的内存空间无限膨胀。很明显,WeakHashMap 适用于作为缓存。

Map 接口

Map 的定义如下:

1
public interface Map<K,V> { }

Map 是一个用于保存键值对(key-value)的接口。Map 中不能包含重复的键;每个键最多只能映射到一个值。

Map 接口提供三种 Collection 视图,允许以键集值集键-值映射关系集的形式访问数据。

Map 有些实现类,可以有序的保存元素,如 TreeMap;另一些实现类则不保证顺序,如 HashMap 类。

Map 的实现类应该提供 2 个“标准的”构造方法:

  • void(无参数)构造方法,用于创建空 Map;
  • 带有单个 Map 类型参数的构造方法,用于创建一个与其参数具有相同键-值映射关系的新 Map。

实际上,后一个构造方法允许用户复制任意 Map,生成所需类的一个等价 Map。尽管无法强制执行此建议(因为接口不能包含构造方法),但是 JDK 中所有通用的 Map 实现都遵从它。

Map.Entry 接口

Map.Entry 一般用于通过迭代器(Iterator)访问问 Map

Map.Entry 是 Map 中内部的一个接口,Map.Entry 代表了 键值对 实体,Map 通过 entrySet() 获取 Map.Entry 集合,从而通过该集合实现对键值对的操作。

AbstractMap 抽象类

AbstractMap 的定义如下:

1
public abstract class AbstractMap<K,V> implements Map<K,V> {}

AbstractMap 抽象类提供了 Map 接口的核心实现,以最大限度地减少实现 Map 接口所需的工作。

要实现不可修改的 Map,编程人员只需扩展此类并提供 entrySet() 方法的实现即可,该方法将返回 Map 的映射关系 Set 视图。通常,返回的 set 将依次在 AbstractSet 上实现。此 Set 不支持 add()remove() 方法,其迭代器也不支持 remove() 方法。

要实现可修改的 Map,编程人员必须另外重写此类的 put 方法(否则将抛出 UnsupportedOperationException),entrySet().iterator() 返回的迭代器也必须另外实现其 remove() 方法。

SortedMap 接口

SortedMap 的定义如下:

1
public interface SortedMap<K,V> extends Map<K,V> { }

SortedMap 继承了 Map ,它是一个有序的 Map

SortedMap 的排序方式有两种:自然排序或者用户指定比较器插入有序 SortedMap 的所有元素都必须实现 Comparable 接口(或者被指定的比较器所接受)

另外,所有 SortedMap 实现类都应该提供 4 个“标准”构造方法:

  1. void(无参数)构造方法,它创建一个空的有序 Map,按照键的自然顺序进行排序。
  2. 带有一个 Comparator 类型参数的构造方法,它创建一个空的有序 Map,根据指定的比较器进行排序。
  3. 带有一个 Map 类型参数的构造方法,它创建一个新的有序 Map,其键-值映射关系与参数相同,按照键的自然顺序进行排序。
  4. 带有一个 SortedMap 类型参数的构造方法,它创建一个新的有序 Map,其键-值映射关系和排序方法与输入的有序 Map 相同。无法保证强制实施此建议,因为接口不能包含构造方法。

NavigableMap 的定义如下:

1
public interface NavigableMap<K,V> extends SortedMap<K,V> { }

NavigableMap 继承了 SortedMap ,它提供了丰富的查找方法。

NavigableMap 分别提供了获取“键”、“键-值对”、“键集”、“键-值对集”的相关方法。

NavigableMap 提供的功能可以分为 4 类:

  • 获取键-值对
    • lowerEntryfloorEntryceilingEntryhigherEntry 方法,它们分别返回与小于、小于等于、大于等于、大于给定键的键关联的 Map.Entry 对象。
    • firstEntrypollFirstEntrylastEntrypollLastEntry 方法,它们返回和/或移除最小和最大的映射关系(如果存在),否则返回 null。
  • 获取键。这个和第 1 类比较类似。
    • lowerKeyfloorKeyceilingKeyhigherKey 方法,它们分别返回与小于、小于等于、大于等于、大于给定键的键。
  • 获取键的集合
    • navigableKeySetdescendingKeySet 分别获取正序/反序的键集。
  • 获取键-值对的子集

Dictionary 抽象类

Dictionary 的定义如下:

1
public abstract class Dictionary<K,V> {}

Dictionary 是 JDK 1.0 定义的操作键值对的抽象类,它包括了操作键值对的基本方法。

HashMap 类

HashMap 的命名,也可以看出:**HashMap 以散列方式存储键值对**。HashMap 是非线程安全的。

HashMap 允许使用空值和空键,但 null 作为键只能有一个,null 作为值可以有多个。

HashMap 类大致等同于 Hashtable,除了它是不同步的并且允许为空值。)这个类不保序;特别是,它的元素顺序可能会随着时间的推移变化。

JDK1.8 之前 HashMap 由 数组+链表 组成的,数组是 HashMap 的主体,链表则是主要为了解决哈希冲突而存在的(“拉链法”解决冲突)。 JDK1.8 以后的 HashMap 在解决哈希冲突时有了较大的变化,当链表长度大于等于阈值(默认为 8)(将链表转换成红黑树前会判断,如果当前数组的长度小于 64,那么会选择先进行数组扩容,而不是转换为红黑树)时,将链表转化为红黑树,以减少搜索时间。

HashMap 默认的初始化大小为 16。之后每次扩充,容量变为原来的 2 倍。并且, HashMap 总是使用 2 的幂作为哈希表的大小。

JDK8 之前 HashMap 数据结构

之前 HashMap 底层是 数组和链表 结合在一起使用也就是 链表散列

HashMap 通过 key 的 hashCode 经过扰动函数处理过后得到 hash 值,然后通过 (n - 1) & hash 判断当前元素存放的位置(这里的 n 指的是数组的长度),如果当前位置存在元素的话,就判断该元素与要存入的元素的 hash 值以及 key 是否相同,如果相同的话,直接覆盖,不相同就通过拉链法解决冲突。

所谓扰动函数指的就是 HashMap 的 hash 方法。使用 hash 方法也就是扰动函数是为了防止一些实现比较差的 hashCode() 方法 换句话说使用扰动函数之后可以减少碰撞。

JDK7 的 HashMap 的 hash 方法:

1
2
3
4
static int hash(int h) {
h ^= (h >>> 20) ^ (h >>> 12);
return h ^ (h >>> 7) ^ (h >>> 4);
}

JDK8 之后 HashMap 数据结构

HashMap 的主要字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class HashMap<K,V> extends AbstractMap<K,V>
implements Map<K,V>, Cloneable, Serializable {

// 该表在初次使用时初始化,并根据需要调整大小。分配时,长度总是2的幂。
transient Node<K,V>[] table;
// 保存缓存的 entrySet()。请注意,AbstractMap 字段用于 keySet() 和 values()。
transient Set<Map.Entry<K,V>> entrySet;
// map 中的键值对数
transient int size;
// 这个HashMap被结构修改的次数结构修改是那些改变HashMap中的映射数量或者修改其内部结构(例如,重新散列)的修改。
transient int modCount;
// 下一个调整大小的值(容量*负载因子)。
int threshold;
// 散列表的负载因子
final float loadFactor;
}

HashMap 有两个影响其性能的参数:初始容量和负载因子

  • size - 初始容量。默认为 16,每次容量不够自动扩容。容量是哈希表中桶的数量,初始容量就是哈希表创建时的容量。
  • loadFactor - 负载因子。自动扩容之前被允许的最大饱和量,默认 0.75。负载因子是散列表在其容量自动扩容之前被允许的最大饱和量。当哈希表中的 entry 数量超过负载因子和当前容量的乘积时,散列表就会被重新映射(即重建内部数据结构),一般散列表大约是存储桶数量的两倍。

通常,默认负载因子(0.75)在时间和空间成本之间提供了良好的平衡。较高的值会减少空间开销,但会增加查找成本(反映在大部分 HashMap 类的操作中,包括 getput)。在设置初始容量时,应考虑映射中的条目数量及其负载因子,以尽量减少重新运行操作的次数。如果初始容量大于最大入口数除以负载因子,则不会发生重新刷新操作。

如果许多映射要存储在 HashMap 实例中,使用足够大的容量创建映射将允许映射存储的效率高于根据需要执行自动重新散列以增长表。请注意,使用多个具有相同 hashCode() 的密钥是降低任何散列表性能的一个可靠方法。为了改善影响,当键是 Comparable 时,该类可以使用键之间的比较顺序来帮助断开关系。

JDK8 的 HashMap 的 hash 方法:

1
2
3
4
5
6
7
static final int hash(Object key) {
int h;
// key.hashCode():返回散列值也就是hashcode
// ^:按位异或
// >>>:无符号右移,忽略符号位,空位都以0补齐
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

getput 的过程中,计算下标时,先对 hashCode 进行 hash 操作,然后再通过 hash 值进一步计算下标,如下图所示:

在对 hashCode() 计算 hash 时具体实现是这样的:

1
2
3
4
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

可以看到这个方法大概的作用就是:高 16bit 不变,低 16bit 和高 16bit 做了一个异或。

在设计 hash 方法时,因为目前的 table 长度 n 为 2 的幂,而计算下标的时候,是这样实现的(使用 & 位操作,而非 % 求余):

1
(n - 1) & hash

设计者认为这方法很容易发生碰撞。为什么这么说呢?不妨思考一下,在 n - 1 为 15(0x1111) 时,其实散列真正生效的只是低 4bit 的有效位,当然容易碰撞了。

因此,设计者想了一个顾全大局的方法(综合考虑了速度、作用、质量),就是把高 16bit 和低 16bit 异或了一下。设计者还解释到因为现在大多数的 hashCode 的分布已经很不错了,就算是发生了碰撞也用 O(logn)的 tree 去做了。仅仅异或一下,既减少了系统的开销,也不会造成的因为高位没有参与下标的计算(table 长度比较小时),从而引起的碰撞。

如果还是产生了频繁的碰撞,会发生什么问题呢?作者注释说,他们使用树来处理频繁的碰撞(we use trees to handle large sets of collisions in bins),在 JEP-180 中,描述了这个问题:

Improve the performance of java.util.HashMap under high hash-collision conditions by using balanced trees rather than linked lists to store map entries. Implement the same improvement in the LinkedHashMap class.

之前已经提过,在获取 HashMap 的元素时,基本分两步:

  1. 首先根据 hashCode() 做 hash,然后确定 bucket 的 index;

  2. 如果 bucket 的节点的 key 不是我们需要的,则通过 keys.equals()在链中找。

在 JDK8 之前的实现中是用链表解决冲突的,在产生碰撞的情况下,进行 get 时,两步的时间复杂度是 O(1)+O(n)。因此,当碰撞很厉害的时候 n 很大,O(n)的速度显然是影响速度的。

因此在 JDK8 中,利用红黑树替换链表,这样复杂度就变成了 O(1)+O(logn)了,这样在 n 很大的时候,能够比较理想的解决这个问题,在 JDK8:HashMap 的性能提升一文中有性能测试的结果。

HashMap 构造方法

1
2
3
4
public HashMap(); // 默认负载因子0.75
public HashMap(int initialCapacity); // 默认负载因子0.75;以 initialCapacity 初始化容量
public HashMap(int initialCapacity, float loadFactor); // 以 initialCapacity 初始化容量;以 loadFactor 初始化负载因子
public HashMap(Map<? extends K, ? extends V> m) // 默认负载因子0.75

put 方法的实现

put 方法大致的思路为:

  • 对 key 的 hashCode() 做 hash 计算,然后根据 hash 值再计算 Node 的存储位置;
  • 如果没有哈希碰撞,直接放到桶里;如果有哈希碰撞,以链表的形式存在桶后。
  • 如果哈希碰撞导致链表过长(大于等于 TREEIFY_THRESHOLD,数值为 8),就把链表转换成红黑树;
  • 如果节点已经存在就替换旧值
  • 桶数量超过容量*负载因子(即 load factor * current capacity),HashMap 调用 resize 自动扩容一倍

具体代码的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public V put(K key, V value) {
return putVal(hash(key), key, value, false, true);
}

// hashcode 无符号位移 16 位
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
// tab 为空则创建
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
// 计算 index,并对 null 做处理
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
// 节点存在
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
// 该链为树
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
// 该链为链表
else {
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
// 写入
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}

为什么计算 hash 使用 hashcode 无符号位移 16 位。

假设要添加两个对象 a 和 b,如果数组长度是 16,这时对象 a 和 b 通过公式 (n - 1) & hash 运算,也就是 (16-1)&a.hashCode 和 (16-1)&b.hashCode,15 的二进制为 0000000000000000000000000001111,假设对象 A 的 hashCode 为 1000010001110001000001111000000,对象 B 的 hashCode 为 0111011100111000101000010100000,你会发现上述与运算结果都是 0。这样的哈希结果就太让人失望了,很明显不是一个好的哈希算法。

但如果我们将 hashCode 值右移 16 位(h >>> 16 代表无符号右移 16 位),也就是取 int 类型的一半,刚好可以将该二进制数对半切开,并且使用位异或运算(如果两个数对应的位置相反,则结果为 1,反之为 0),这样的话,就能避免上面的情况发生。这就是 hash() 方法的具体实现方式。简而言之,就是尽量打乱 hashCode 真正参与运算的低 16 位。

get 方法的实现

在理解了 put 之后,get 就很简单了。大致思路如下:

  • 对 key 的 hashCode() 做 hash 计算,然后根据 hash 值再计算桶的 index

  • 如果桶中的第一个节点命中,直接返回;

  • 如果有冲突,则通过 key.equals(k) 去查找对应的 entry

    • 若为树,则在红黑树中通过 key.equals(k) 查找,O(logn);

    • 若为链表,则在链表中通过 key.equals(k) 查找,O(n)。

具体代码的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public V get(Object key) {
Node<K,V> e;
return (e = getNode(hash(key), key)) == null ? null : e.value;
}

final Node<K,V> getNode(int hash, Object key) {
Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & hash]) != null) {
// 直接命中
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k))))
return first;
// 未命中
if ((e = first.next) != null) {
// 在树中 get
if (first instanceof TreeNode)
return ((TreeNode<K,V>)first).getTreeNode(hash, key);
// 在链表中 get
do {
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}
return null;
}

resize 的实现

put 时,如果发现目前的 bucket 占用程度已经超过了 Load Factor 所希望的比例,那么就会发生 resize。在 resize 的过程,简单的说就是把 bucket 扩充为 2 倍,之后重新计算 index,把节点再放到新的 bucket 中。

当超过限制的时候会 resize,然而又因为我们使用的是 2 次幂的扩展(指长度扩为原来 2 倍),所以,元素的位置要么是在原位置,要么是在原位置再移动 2 次幂的位置。

怎么理解呢?例如我们从 16 扩展为 32 时,具体的变化如下所示:

因此元素在重新计算 hash 之后,因为 n 变为 2 倍,那么 n-1 的 mask 范围在高位多 1bit(红色),因此新的 index 就会发生这样的变化:

因此,我们在扩充 HashMap 的时候,不需要重新计算 hash,只需要看看原来的 hash 值新增的那个 bit 是 1 还是 0 就好了,是 0 的话索引没变,是 1 的话索引变成“原索引+oldCap”。可以看看下图为 16 扩充为 32 的 resize 示意图:

这个设计确实非常的巧妙,既省去了重新计算 hash 值的时间,而且同时,由于新增的 1bit 是 0 还是 1 可以认为是随机的,因此 resize 的过程,均匀的把之前的冲突的节点分散到新的 bucket 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
final Node<K,V>[] resize() {
Node<K,V>[] oldTab = table;
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
int newCap, newThr = 0;
if (oldCap > 0) {
// 超过最大值就不再扩充了,就只好随你碰撞去吧
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
// 没超过最大值,就扩充为原来的 2 倍
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; // double threshold
}
else if (oldThr > 0) // initial capacity was placed in threshold
newCap = oldThr;
else { // zero initial threshold signifies using defaults
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}

// 计算新的 resize 上限
if (newThr == 0) {
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
if (oldTab != null) {
// 把每个 bucket 都移动到新的 buckets 中
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
if (e.next == null)
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode)
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else { // preserve order
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
// 原索引
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
// 原索引+oldCap
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
// 原索引放到bucket里
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
// 原索引+oldCap放到bucket里
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}

LinkedHashMap 类

LinkedHashMap 要点

LinkedHashMap 通过维护一个保存所有条目(Entry)的双向链表,保证了元素迭代的顺序(即插入顺序)

关注点 结论
是否允许键值对为 null Key 和 Value 都允许 null
是否允许重复数据 Key 重复会覆盖、Value 允许重复
是否有序 按照元素插入顺序存储
是否线程安全 非线程安全

LinkedHashMap 数据结构

LinkedHashMap 通过维护一对 LinkedHashMap.Entry<K,V> 类型的头尾指针,以双链表形式,保存所有数据

学习过数据结构的双链表,就能理解其元素存储以及访问必然是有序的。

1
2
3
4
5
6
7
8
9
10
11
public class LinkedHashMap<K,V>
extends HashMap<K,V>
implements Map<K,V> {

// 双链表的头指针
transient LinkedHashMap.Entry<K,V> head;
// 双链表的尾指针
transient LinkedHashMap.Entry<K,V> tail;
// 迭代排序方法:true 表示访问顺序;false 表示插入顺序
final boolean accessOrder;
}

LinkedHashMap 继承了 HashMapput 方法,本身没有实现 put 方法。

TreeMap 类

TreeMap 要点

TreeMap 基于红黑树实现。

TreeMap 是有序的。它的排序规则是:根据 map 中的 key 的自然语义顺序或提供的比较器(Comparator)的自定义比较顺序。

TreeMap 不是线程安全的。

TreeMap 原理

put 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public V put(K key, V value) {
Entry<K,V> t = root;
// 如果根节点为 null,插入第一个节点
if (t == null) {
compare(key, key); // type (and possibly null) check

root = new Entry<>(key, value, null);
size = 1;
modCount++;
return null;
}
int cmp;
Entry<K,V> parent;
// split comparator and comparable paths
Comparator<? super K> cpr = comparator;
// 每个节点的左孩子节点的值小于它;右孩子节点的值大于它
// 如果有比较器,使用比较器进行比较
if (cpr != null) {
do {
parent = t;
cmp = cpr.compare(key, t.key);
if (cmp < 0)
t = t.left;
else if (cmp > 0)
t = t.right;
else
return t.setValue(value);
} while (t != null);
}
// 没有比较器,使用 key 的自然顺序进行比较
else {
if (key == null)
throw new NullPointerException();
@SuppressWarnings("unchecked")
Comparable<? super K> k = (Comparable<? super K>) key;
do {
parent = t;
cmp = k.compareTo(t.key);
if (cmp < 0)
t = t.left;
else if (cmp > 0)
t = t.right;
else
return t.setValue(value);
} while (t != null);
}
// 通过上面的遍历未找到 key 值,则新插入节点
Entry<K,V> e = new Entry<>(key, value, parent);
if (cmp < 0)
parent.left = e;
else
parent.right = e;
// 插入后,为了维持红黑树的平衡需要调整
fixAfterInsertion(e);
size++;
modCount++;
return null;
}

get 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public V get(Object key) {
Entry<K,V> p = getEntry(key);
return (p==null ? null : p.value);
}

final Entry<K,V> getEntry(Object key) {
// Offload comparator-based version for sake of performance
if (comparator != null)
return getEntryUsingComparator(key);
if (key == null)
throw new NullPointerException();
@SuppressWarnings("unchecked")
Comparable<? super K> k = (Comparable<? super K>) key;
Entry<K,V> p = root;
// 按照二叉树搜索的方式进行搜索,搜到返回
while (p != null) {
int cmp = k.compareTo(p.key);
if (cmp < 0)
p = p.left;
else if (cmp > 0)
p = p.right;
else
return p;
}
return null;
}

remove 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public V remove(Object key) {
Entry<K,V> p = getEntry(key);
if (p == null)
return null;

V oldValue = p.value;
deleteEntry(p);
return oldValue;
}
private void deleteEntry(Entry<K,V> p) {
modCount++;
size--;

// 如果当前节点有左右孩子节点,使用后继节点替换要删除的节点
// If strictly internal, copy successor's element to p and then make p
// point to successor.
if (p.left != null && p.right != null) {
Entry<K,V> s = successor(p);
p.key = s.key;
p.value = s.value;
p = s;
} // p has 2 children

// Start fixup at replacement node, if it exists.
Entry<K,V> replacement = (p.left != null ? p.left : p.right);

if (replacement != null) { // 要删除的节点有一个孩子节点
// Link replacement to parent
replacement.parent = p.parent;
if (p.parent == null)
root = replacement;
else if (p == p.parent.left)
p.parent.left = replacement;
else
p.parent.right = replacement;

// Null out links so they are OK to use by fixAfterDeletion.
p.left = p.right = p.parent = null;

// Fix replacement
if (p.color == BLACK)
fixAfterDeletion(replacement);
} else if (p.parent == null) { // return if we are the only node.
root = null;
} else { // No children. Use self as phantom replacement and unlink.
if (p.color == BLACK)
fixAfterDeletion(p);

if (p.parent != null) {
if (p == p.parent.left)
p.parent.left = null;
else if (p == p.parent.right)
p.parent.right = null;
p.parent = null;
}
}
}

TreeMap 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class TreeMapDemo {

private static final String[] chars = "A B C D E F G H I J K L M N O P Q R S T U V W X Y Z".split(" ");

public static void main(String[] args) {
TreeMap<Integer, String> treeMap = new TreeMap<>();
for (int i = 0; i < chars.length; i++) {
treeMap.put(i, chars[i]);
}
System.out.println(treeMap);
Integer low = treeMap.firstKey();
Integer high = treeMap.lastKey();
System.out.println(low);
System.out.println(high);
Iterator<Integer> it = treeMap.keySet().iterator();
for (int i = 0; i <= 6; i++) {
if (i == 3) { low = it.next(); }
if (i == 6) { high = it.next(); } else { it.next(); }
}
System.out.println(low);
System.out.println(high);
System.out.println(treeMap.subMap(low, high));
System.out.println(treeMap.headMap(high));
System.out.println(treeMap.tailMap(low));
}
}

WeakHashMap

WeakHashMap 的定义如下:

1
2
3
public class WeakHashMap<K,V>
extends AbstractMap<K,V>
implements Map<K,V> {}

WeakHashMap 继承了 AbstractMap,实现了 Map 接口。

和 HashMap 一样,WeakHashMap 也是一个散列表,它存储的内容也是键值对(key-value)映射,而且键和值都可以是 null。

不过 WeakHashMap 的键是弱键。在 WeakHashMap 中,当某个键不再被其它对象引用,会被从 WeakHashMap 中被自动移除。更精确地说,对于一个给定的键,其映射的存在并不阻止垃圾回收器对该键的丢弃,这就使该键成为可终止的,被终止,然后被回收。某个键被终止时,它对应的键值对也就从映射中有效地移除了。

这个弱键的原理呢?大致上就是,通过 WeakReference 和 ReferenceQueue 实现的。

WeakHashMap 的 key 是弱键,即是 WeakReference 类型的;ReferenceQueue 是一个队列,它会保存被 GC 回收的弱键。实现步骤是:

  1. 新建 WeakHashMap,将键值对添加到 WeakHashMap 中。实际上,WeakHashMap 是通过数组 table 保存 Entry(键值对);每一个 Entry 实际上是一个单向链表,即 Entry 是键值对链表。
  2. 当某弱键不再被其它对象引用,并被 GC 回收时。在 GC 回收该弱键时,这个弱键也同时会被添加到 ReferenceQueue(queue)队列中。
  3. 当下一次我们需要操作 WeakHashMap 时,会先同步 table 和 queue。table 中保存了全部的键值对,而 queue 中保存被 GC 回收的键值对;同步它们,就是删除 table 中被 GC 回收的键值对。

这就是弱键如何被自动从 WeakHashMap 中删除的步骤了。

和 HashMap 一样,WeakHashMap 是不同步的。可以使用 Collections.synchronizedMap 方法来构造同步的 WeakHashMap。

总结

Map 简介

img

HashMap

img

其他 Map

img

参考资料

Java 容器之 Set

Set 简介

Set 家族成员简介:

  • Set 继承了 Collection 的接口。实际上 Set 就是 Collection,只是行为略有不同:Set 集合不允许有重复元素。
  • SortedSet 继承了 Set 的接口。SortedSet 中的内容是排序的唯一值,排序的方法是通过比较器(Comparator)。
  • NavigableSet 继承了 SortedSet 的接口。它提供了丰富的查找方法:如”获取大于/等于某值的元素”、“获取小于/等于某值的元素”等等。
  • AbstractSet 是一个抽象类,它继承于 AbstractCollectionAbstractCollection 实现了 Set 中的绝大部分方法,为实现 Set 的实例类提供了便利。
  • HashSet 类依赖于 HashMap,它实际上是通过 HashMap 实现的。HashSet 中的元素是无序的、散列的。
  • TreeSet 类依赖于 TreeMap,它实际上是通过 TreeMap 实现的。TreeSet 中的元素是有序的,它是按自然排序或者用户指定比较器排序的 Set。
  • LinkedHashSet 是按插入顺序排序的 Set。
  • EnumSet 是只能存放 Emum 枚举类型的 Set。

Set 接口

Set 继承了 Collection 的接口。实际上,Set 就是 Collection,二者提供的方法完全相同。

Set 接口定义如下:

1
public interface Set<E> extends Collection<E> {}

SortedSet 接口

继承了 Set 的接口。SortedSet 中的内容是排序的唯一值,排序的方法是通过比较器(Comparator)。

SortedSet 接口定义如下:

1
public interface SortedSet<E> extends Set<E> {}

SortedSet 接口新扩展的方法:

  • comparator - 返回 Comparator
  • subSet - 返回指定区间的子集
  • headSet - 返回小于指定元素的子集
  • tailSet - 返回大于指定元素的子集
  • first - 返回第一个元素
  • last - 返回最后一个元素
  • spliterator

NavigableSet 继承了 SortedSet。它提供了丰富的查找方法。

NavigableSet 接口定义如下:

1
public interface NavigableSet<E> extends SortedSet<E> {}

NavigableSet 接口新扩展的方法:

  • lower - 返回小于指定值的元素中最接近的元素
  • higher - 返回大于指定值的元素中最接近的元素
  • floor - 返回小于或等于指定值的元素中最接近的元素
  • ceiling - 返回大于或等于指定值的元素中最接近的元素
  • pollFirst - 检索并移除第一个(最小的)元素
  • pollLast - 检索并移除最后一个(最大的)元素
  • descendingSet - 返回反序排列的 Set
  • descendingIterator - 返回反序排列的 Set 的迭代器
  • subSet - 返回指定区间的子集
  • headSet - 返回小于指定元素的子集
  • tailSet - 返回大于指定元素的子集

AbstractSet 抽象类

AbstractSet 类提供 Set 接口的核心实现,以最大限度地减少实现 Set 接口所需的工作。

AbstractSet 抽象类定义如下:

1
public abstract class AbstractSet<E> extends AbstractCollection<E> implements Set<E> {}

事实上,主要的实现已经在 AbstractCollection 中完成。

HashSet 类

HashSet 类依赖于 HashMap,它实际上是通过 HashMap 实现的。HashSet 中的元素是无序的、散列的。

HashSet 类定义如下:

1
2
3
public class HashSet<E>
extends AbstractSet<E>
implements Set<E>, Cloneable, java.io.Serializable {}

HashSet 要点

  • HashSet 通过继承 AbstractSet 实现了 Set 接口中的骨干方法。
  • HashSet 实现了 Cloneable,所以支持克隆。
  • HashSet 实现了 Serializable,所以支持序列化。
  • HashSet 中存储的元素是无序的。
  • HashSet 允许 null 值的元素。
  • HashSet 不是线程安全的。

HashSet 原理

HashSet 是基于 HashMap 实现的。

1
2
3
4
5
6
// HashSet 的核心,通过维护一个 HashMap 实体来实现 HashSet 方法
private transient HashMap<E,Object> map;

// PRESENT 是用于关联 map 中当前操作元素的一个虚拟值
private static final Object PRESENT = new Object();
}
  • HashSet 中维护了一个 HashMap 对象 map,HashSet 的重要方法,如 addremoveiteratorclearsize 等都是围绕 map 实现的。
    • HashSet 类中通过定义 writeObject()readObject() 方法确定了其序列化和反序列化的机制。
  • PRESENT 是用于关联 map 中当前操作元素的一个虚拟值。

TreeSet 类

TreeSet 类依赖于 TreeMap,它实际上是通过 TreeMap 实现的。TreeSet 中的元素是有序的,它是按自然排序或者用户指定比较器排序的 Set。

TreeSet 类定义如下:

1
2
public class TreeSet<E> extends AbstractSet<E>
implements NavigableSet<E>, Cloneable, java.io.Serializable {}

TreeSet 要点

  • TreeSet 通过继承 AbstractSet 实现了 NavigableSet 接口中的骨干方法。
  • TreeSet 实现了 Cloneable,所以支持克隆。
  • TreeSet 实现了 Serializable,所以支持序列化。
  • TreeSet 中存储的元素是有序的。排序规则是自然顺序或比较器(Comparator)中提供的顺序规则。
  • TreeSet 不是线程安全的。

TreeSet 源码

TreeSet 是基于 TreeMap 实现的。

1
2
3
4
5
// TreeSet 的核心,通过维护一个 NavigableMap 实体来实现 TreeSet 方法
private transient NavigableMap<E,Object> m;

// PRESENT 是用于关联 map 中当前操作元素的一个虚拟值
private static final Object PRESENT = new Object();
  • TreeSet 中维护了一个 NavigableMap 对象 map(实际上是一个 TreeMap 实例),TreeSet 的重要方法,如 addremoveiteratorclearsize 等都是围绕 map 实现的。
  • PRESENT 是用于关联 map 中当前操作元素的一个虚拟值。TreeSet 中的元素都被当成 TreeMap 的 key 存储,而 value 都填的是 PRESENT

LinkedHashSet 类

LinkedHashSet 是按插入顺序排序的 Set。

LinkedHashSet 类定义如下:

1
2
3
public class LinkedHashSet<E>
extends HashSet<E>
implements Set<E>, Cloneable, java.io.Serializable {}

LinkedHashSet 要点

  • LinkedHashSet 通过继承 HashSet 实现了 Set 接口中的骨干方法。
  • LinkedHashSet 实现了 Cloneable,所以支持克隆。
  • LinkedHashSet 实现了 Serializable,所以支持序列化。
  • LinkedHashSet 中存储的元素是按照插入顺序保存的。
  • LinkedHashSet 不是线程安全的。

LinkedHashSet 原理

LinkedHashSet 有三个构造方法,无一例外,都是调用父类 HashSet 的构造方法。

1
2
3
4
5
6
7
8
9
public LinkedHashSet(int initialCapacity, float loadFactor) {
super(initialCapacity, loadFactor, true);
}
public LinkedHashSet(int initialCapacity) {
super(initialCapacity, .75f, true);
}
public LinkedHashSet() {
super(16, .75f, true);
}

需要强调的是:LinkedHashSet 构造方法实际上调用的是父类 HashSet 的非 public 构造方法。

1
2
3
HashSet(int initialCapacity, float loadFactor, boolean dummy) {
map = new LinkedHashMap<>(initialCapacity, loadFactor);
}

不同于 HashSet public 构造方法中初始化的 HashMap 实例,这个构造方法中,初始化了 LinkedHashMap 实例。

也就是说,实际上,LinkedHashSet 维护了一个双链表。由双链表的特性可以知道,它是按照元素的插入顺序保存的。所以,这就是 LinkedHashSet 中存储的元素是按照插入顺序保存的原理。

EnumSet 类

EnumSet 类定义如下:

1
2
public abstract class EnumSet<E extends Enum<E>> extends AbstractSet<E>
implements Cloneable, java.io.Serializable {}

EnumSet 要点

  • EnumSet 继承了 AbstractSet,所以有 Set 接口中的骨干方法。
  • EnumSet 实现了 Cloneable,所以支持克隆。
  • EnumSet 实现了 Serializable,所以支持序列化。
  • EnumSet 通过 <E extends Enum<E>> 限定了存储元素必须是枚举值。
  • EnumSet 没有构造方法,只能通过类中的 static 方法来创建 EnumSet 对象。
  • EnumSet 是有序的。以枚举值在 EnumSet 类中的定义顺序来决定集合元素的顺序。
  • EnumSet 不是线程安全的。

HashSet vs. LinkedHashSet vs. TreeSet

HashSetLinkedHashSetTreeSet 都是 Set 接口的实现类,都能保证元素唯一,并且都不是线程安全的。

HashSetLinkedHashSetTreeSet 的主要区别在于底层数据结构不同。HashSet 的底层数据结构是哈希表(基于 HashMap 实现)。LinkedHashSet 的底层数据结构是链表和哈希表,元素的插入和取出顺序满足 FIFO。TreeSet 底层数据结构是红黑树,元素是有序的,排序的方式有自然排序和定制排序。

底层数据结构不同又导致这三者的应用场景不同。HashSet 用于不需要保证元素插入和取出顺序的场景,LinkedHashSet 用于保证元素的插入和取出顺序满足 FIFO 的场景,TreeSet 用于支持对元素自定义排序规则的场景。

参考资料

Java 容器简介

容器简介

数组与容器

Java 中常用的存储容器就是数组和容器,二者有以下区别:

  • 存储大小是否固定
    • 数组的长度固定
    • 容器的长度可变
  • 数据类型
    • 数组可以存储基本数据类型,也可以存储引用数据类型
    • 容器只能存储引用数据类型,基本数据类型的变量要转换成对应的包装类才能放入容器类中。

:bulb: 不了解什么是基本数据类型、引用数据类型、包装类这些概念,可以参考:Java 基本数据类型

容器框架

img

Java 容器框架主要分为 CollectionMap 两种。其中,Collection 又分为 ListSet 以及 Queue

  • Collection - 一个独立元素的序列,这些元素都服从一条或者多条规则。
    • List - 必须按照插入的顺序保存元素。常见 List 容器有:
      • ArrayList - Object[] 数组。
      • LinkedList - 双链表 (JDK1.6 之前为循环链表,JDK1.7 取消了循环)。
      • Vector - 通过 synchronized 修饰读写方法来保证并发安全。
      • Vector - Object[] 数组,通过 synchronized 修饰读写方法来保证并发安全。
    • Set - 不能有重复的元素。常见 Set 容器有:
      • HashSet - 无序,内部基于 HashMap 来实现的。
      • LinkedHashSet - 保证插入顺序,内部基于 LinkedHashMap 来实现的。
      • TreeSet - 保证自然序或用户指定的比较器顺序,内部基于红黑树实现。
    • Queue - 按照排队规则来确定对象产生的顺序。
      • PriorityQueue - 基于 Object[] 数组来实现小顶堆
      • DelayQueue - 延迟队列。
      • ArrayQueue - ArrayDequeDeque 的顺序表实现。基于动态数组实现了栈和队列所需的所有操作。
      • LinkedList - LinkedListDeque 的链表实现。
  • Map - 一组成对的“键值对”对象,允许你使用键来查找值。常见的 Map 容器有:
    • HashMap:JDK1.8 之前 HashMap 由数组+链表组成的,数组是 HashMap 的主体,链表则是主要为了解决哈希冲突而存在的(“拉链法”解决冲突)。JDK1.8 以后在解决哈希冲突时有了较大的变化,当链表长度大于阈值(默认为 8)(将链表转换成红黑树前会判断,如果当前数组的长度小于 64,那么会选择先进行数组扩容,而不是转换为红黑树)时,将链表转化为红黑树,以减少搜索时间。
    • LinkedHashMapLinkedHashMap 继承自 HashMap,所以它的底层仍然是基于拉链式散列结构即由数组和链表或红黑树组成。另外,LinkedHashMap 在上面结构的基础上,增加了一条双向链表,使得上面的结构可以保持键值对的插入顺序。同时通过对链表进行相应的操作,实现了访问顺序相关逻辑。
    • Hashtable:数组+链表组成的,数组是 Hashtable 的主体,链表则是主要为了解决哈希冲突而存在的。
    • TreeMap:红黑树(自平衡的排序二叉树)。

容器的基本机制

Java 的容器具有一定的共性,它们或全部或部分依赖以下技术。所以,学习以下技术点,对于理解 Java 容器的特性和原理有很大的帮助。

泛型

Java 1.5 引入了泛型技术。

Java 容器通过泛型技术来保证其数据的类型安全。什么是类型安全呢?

举例来说:如果有一个 List<Object> 容器,Java 编译器在编译时不会对原始类型进行类型安全检查,却会对带参数的类型进行检查,通过使用 Object 作为类型,可以告知编译器该方法可以接受任何类型的对象,比如 String 或 Integer。

1
2
3
List<Object> list = new ArrayList<Object>();
list.add("123");
list.add(123);

如果没有泛型技术,如示例中的代码那样,容器中就可能存储任意数据类型,这是很危险的行为。

1
2
3
List<String> list = new ArrayList<String>();
list.add("123");
list.add(123);

:bulb: 想深入了解 Java 泛型技术的用法和原理可以参考:深入理解 Java 泛型

Iterable 和 Iterator

Iterable 和 Iterator 目的在于遍历访问容器中的元素。

Iterator 接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface Iterator<E> {

boolean hasNext();

E next();

default void remove() {
throw new UnsupportedOperationException("remove");
}

default void forEachRemaining(Consumer<? super E> action) {
Objects.requireNonNull(action);
while (hasNext())
action.accept(next());
}
}

Iterable 接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface Iterable<T> {

Iterator<T> iterator();

default void forEach(Consumer<? super T> action) {
Objects.requireNonNull(action);
for (T t : this) {
action.accept(t);
}
}

default Spliterator<T> spliterator() {
return Spliterators.spliteratorUnknownSize(iterator(), 0);
}
}

Collection 接口扩展了 Iterable 接口。

迭代其实我们可以简单地理解为遍历,是一个标准化遍历各类容器里面的所有对象的接口。它是一个经典的设计模式——迭代器模式(Iterator)。

迭代器模式 - 提供一种方法顺序访问一个聚合对象中各个元素,而又无须暴露该对象的内部表示

示例:迭代器遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class IteratorDemo {

public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
Iterator it = list.iterator();
while (it.hasNext()) {
System.out.println(it.next());
}
}

}

《阿里巴巴 Java 开发手册》的描述如下:

不要在 foreach 循环里进行元素的 remove/add 操作。remove 元素请使用 Iterator 方式,如果并发操作,需要对 Iterator 对象加锁。

通过反编译你会发现 foreach 语法底层其实还是依赖 Iterator 。不过, remove/add 操作直接调用的是集合自己的方法,而不是 Iteratorremove/add方法

这就导致 Iterator 莫名其妙地发现自己有元素被 remove/add ,然后,它就会抛出一个 ConcurrentModificationException 来提示用户发生了并发修改异常。这就是单线程状态下产生的 fail-fast 机制

fail-fast 机制:多个线程对 fail-fast 集合进行修改的时候,可能会抛出ConcurrentModificationException。 即使是单线程下也有可能会出现这种情况,上面已经提到过。

相关阅读:什么是 fail-fastopen in new window

Java8 开始,可以使用 Collection#removeIf()方法删除满足特定条件的元素,如:

1
2
3
4
5
6
List<Integer> list = new ArrayList<>();
for (int i = 1; i <= 10; ++i) {
list.add(i);
}
list.removeIf(filter -> filter % 2 == 0); /* 删除list中的所有偶数 */
System.out.println(list); /* [1, 3, 5, 7, 9] */

除了上面介绍的直接使用 Iterator 进行遍历操作之外,你还可以:

  • 使用普通的 for 循环
  • 使用 fail-safe 的集合类。java.util包下面的所有的集合类都是 fail-fast 的,而java.util.concurrent包下面的所有的类都是 fail-safe 的。
  • … …

Comparable 和 Comparator

Comparable 接口和 Comparator 接口一般用于实现容器中元素的比较及排序:

  • Comparable 接口实际上是出自 java.lang 包 它有一个 compareTo(Object obj) 方法用来排序
  • Comparator 接口实际上是出自 java.util 包它有一个 compare(Object obj1, Object obj2) 方法用来排序

::: tabs#Comparable和Comparator接口定义

@tab Comparable 接口定义

Comparable 接口定义

1
2
3
public interface Comparable<T> {
public int compareTo(T o);
}

@tab Comparator 接口定义

Comparator 接口定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@FunctionalInterface
public interface Comparator<T> {

int compare(T o1, T o2);

boolean equals(Object obj);

// 反转
default Comparator<T> reversed() {
return Collections.reverseOrder(this);
}

default Comparator<T> thenComparing(Comparator<? super T> other) {
Objects.requireNonNull(other);
return (Comparator<T> & Serializable) (c1, c2) -> {
int res = compare(c1, c2);
return (res != 0) ? res : other.compare(c1, c2);
};
}

// thenComparingXXX 方法略

// 静态方法略
}

:::

假设,有一个 List 容器,存储的是 User 类型对象。现在要根据 User 中的 age 属性进行排序。

User 定义如下:

1
2
3
4
5
6
7
8
9
10
11
public class User {

private String name;
private int age;

public User(String name, int age) {
this.age = age;
this.name = name;
}
// getter、setter 略
}

我们分别通过 ComparableComparator 来实现比较、排序,体会一下有何差异。

::: tabs#Comparable和Comparator使用示例

@tab Comparable 接口使用示例

Comparable 接口使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ComparableDemo {

public static void main(String[] args) {
User a = new User("A", 18);
User b = new User("B", 17);
User c = new User("C", 20);
List<User> list = new ArrayList<>(Arrays.asList(a, b, c));
Collections.sort(list);
list.forEach(System.out::println);
}
// 输出:
// User{age=17, name='B'}
// User{age=18, name='A'}
// User{age=20, name='C'}

// 需要对被比较、排序的类进行改造,实现 Comparable 接口
static class User implements Comparable<User> {

private String name;
private int age;

public User(String name, int age) {
this.age = age;
this.name = name;
}

// getter、setter 略

@Override
public int compareTo(User o) {
return this.age - o.age;
}

@Override
public String toString() {
return "User{" + "age=" + age + ", name='" + name + '\'' + '}';
}
}
}

从上例可以看出,使用 Comparable 接口,被排序对象类必须实现 Comparable 接口;并在类中定义 compareTo 方法的实现,即排序逻辑必须置于被排序对象类中。

@tab Comparator 接口使用示例

Comparator 接口使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ComparatorDemo {

public static void main(String[] args) {
User a = new User("A", 18);
User b = new User("B", 17);
User c = new User("C", 20);
List<User> list = new ArrayList<>(Arrays.asList(a, b, c));
Collections.sort(list, new Comparator<User>() {
@Override
public int compare(User o1, User o2) {
return o1.age - o2.age;
}
});
list.forEach(System.out::println);
}
// 输出:
// User{age=17, name='B'}
// User{age=18, name='A'}
// User{age=20, name='C'}

static class User {

private String name;
private int age;

public User(String name, int age) {
this.age = age;
this.name = name;
}

// getter、setter 略

@Override
public String toString() {
return "User{" + "age=" + age + ", name='" + name + '\'' + '}';
}
}
}

从上例可以看出,使用 Comparator 接口和 Comparable 接口的不同点在于:被排序的对象类无需实现 Comparator 接口,排序逻辑置于被排序对象类的外部。

:::

Cloneable

Java 中 一个类要实现 clone 功能 必须实现 Cloneable 接口,否则在调用 clone() 时会报 CloneNotSupportedException 异常。

Java 中所有类都默认继承 java.lang.Object 类,在 java.lang.Object 类中有一个方法 clone(),这个方法将返回 Object 对象的一个拷贝。Object 类里的 clone() 方法仅仅用于浅拷贝(拷贝基本成员属性,对于引用类型仅返回指向改地址的引用)。

如果 Java 类需要深拷贝,需要覆写 clone() 方法。

fail-fast

fail-fast 的要点

Java 容器(如:ArrayList、HashMap、TreeSet 等待)的 javadoc 中常常提到类似的描述:

注意,迭代器的快速失败行为无法得到保证,因为一般来说,不可能对是否出现不同步并发修改做出任何硬性保证。快速失败(fail-fast)迭代器会尽最大努力抛出 ConcurrentModificationException。因此,为提高这类迭代器的正确性而编写一个依赖于此异常的程序是错误的做法:迭代器的快速失败行为应该仅用于检测 bug。

那么,我们不禁要问,什么是 fail-fast,为什么要有 fail-fast 机制?

fail-fast 是 Java 容器的一种错误检测机制。当多个线程对容器进行结构上的改变的操作时,就可能触发 fail-fast 机制。记住是有可能,而不是一定。

例如:假设存在两个线程(线程 1、线程 2),线程 1 通过 Iterator 在遍历容器 A 中的元素,在某个时候线程 2 修改了容器 A 的结构(是结构上面的修改,而不是简单的修改容器元素的内容),那么这个时候程序就会抛出 ConcurrentModificationException 异常,从而产生 fail-fast 机制。

容器在迭代操作中改变元素个数(添加、删除元素)都可能会导致 fail-fast

示例:fail-fast 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class FailFastDemo {

private static int MAX = 100;

private static List<Integer> list = new ArrayList<>();

public static void main(String[] args) {
for (int i = 0; i < MAX; i++) {
list.add(i);
}
new Thread(new MyThreadA()).start();
new Thread(new MyThreadB()).start();
}

/** 迭代遍历容器所有元素 */
static class MyThreadA implements Runnable {

@Override
public void run() {
Iterator<Integer> iterator = list.iterator();
while (iterator.hasNext()) {
int i = iterator.next();
System.out.println("MyThreadA 访问元素:" + i);
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}

/** 遍历删除指定范围内的所有偶数 */
static class MyThreadB implements Runnable {

@Override
public void run() {
int i = 0;
while (i < MAX) {
if (i % 2 == 0) {
System.out.println("MyThreadB 删除元素" + i);
list.remove(i);
}
i++;
}
}

}

}

执行后,会抛出 java.util.ConcurrentModificationException 异常。

解决 fail-fast

fail-fast 有两种解决方案:

  • 在遍历过程中所有涉及到改变容器个数的地方全部加上 synchronized 或者直接使用 Collections.synchronizedXXX 容器,这样就可以解决。但是不推荐,因为增删造成的同步锁可能会阻塞遍历操作,影响吞吐。
  • 使用并发容器,如:CopyOnWriterArrayList

容器和线程安全

为了在并发环境下安全地使用容器,Java 提供了同步容器和并发容器。

同步容器和并发容器详情请参考:Java 并发之容器

参考资料

Java 并发之 AQS

AQS 简介

AQSAbstractQueuedSynchronizer 的缩写,即 队列同步器,顾名思义,其主要作用是处理同步。它是并发锁和很多同步工具类的实现基石(如 ReentrantLockReentrantReadWriteLockCountDownLatchSemaphoreFutureTask 等)。

**AQS 提供了对锁和同步器的通用能力支持 **。在 java.util.concurrent.locks 包中的相关锁(常用的有 ReentrantLockThreadPoolExecutor)都是基于 AQS 来实现。这些锁都没有直接继承 AQS,而是定义了一个 Sync 类去继承 AQS。为什么要这样呢?因为锁面向的是使用用户,而同步器面向的则是线程控制,那么在锁的实现中聚合同步器而不是直接继承 AQS 就可以很好的隔离二者所关注的事情。

AQS 的应用

AQS 定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如 ReentrantLock)和 Share(共享,多个线程可同时执行,如 Semaphore / CountDownLatch)。

独占锁 API

获取、释放独占锁的主要 API 如下:

1
2
3
4
public final void acquire(int arg)
public final void acquireInterruptibly(int arg)
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
public final boolean release(int arg)
  • acquire - 获取独占锁。
  • acquireInterruptibly - 获取可中断的独占锁。
  • tryAcquireNanos - 尝试在指定时间内获取可中断的独占锁。在以下三种情况下回返回:
    • 在超时时间内,当前线程成功获取了锁;
    • 当前线程在超时时间内被中断;
    • 超时时间结束,仍未获得锁返回 false。
  • release - 释放独占锁。

共享锁 API

获取、释放共享锁的主要 API 如下:

1
2
3
4
public final void acquireShared(int arg)
public final void acquireSharedInterruptibly(int arg)
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
public final boolean releaseShared(int arg)
  • acquireShared - 获取共享锁。
  • acquireSharedInterruptibly - 获取可中断的共享锁。
  • tryAcquireSharedNanos - 尝试在指定时间内获取可中断的共享锁。
  • release - 释放共享锁。

AQS 的原理

AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态;如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制。这个机制是基于 CLH 锁 (Craig, Landin, and Hagersten locks) 的变体实现的,将暂时获取不到锁的线程加入到队列中。

CLH 本是一个单向队列,AQS 中的队列采用了 CLH 的变体,是一个虚拟的 FIFO 双向队列(虚拟的双向队列,是指不存在结点实例,仅存在结点之间的关联关系),暂时获取不到锁的线程将被加入到该队列中。AQS 将每条请求共享资源的线程封装成一个 CLH 队列锁的一个结点(Node)来实现锁的分配。在 CLH 队列锁中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。

AQS 的核心原理图:

AQS 的数据结构

先看一下 AbstractQueuedSynchronizer 的定义:

1
2
3
4
5
6
7
8
9
10
11
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

/** 等待队列的队头,懒加载。只能通过 setHead 方法修改。 */
private transient volatile Node head;
/** 等待队列的队尾,懒加载。只能通过 enq 方法添加新的等待节点。*/
private transient volatile Node tail;
/** 同步状态 */
private volatile int state;
}

阅读 AQS 的源码,可以发现:AQS 继承自 AbstractOwnableSynchronize,它有以下核心属性:

  • state - AQS 使用一个整型的 volatile 变量来 维护同步状态。这个整数状态的意义由子类来赋予,如 ReentrantLock 中该状态值表示所有者线程已经重复获取该锁的次数;Semaphore 中该状态值表示剩余的许可数量。
  • headtail - AQS **维护了一个 Node 类型(AQS 的内部类)的双向队列来完成同步状态的管理 **。这个双向队列是一个双向的 FIFO 队列,通过 headtail 指针进行访问。当 **有线程获取锁失败后,就被添加到队列末尾 **。

再来看一下 Node 的源码,很显然,Node 是一个双向队列结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static final class Node {
/** 该等待同步的节点处于共享模式 */
static final Node SHARED = new Node();
/** 该等待同步的节点处于独占模式 */
static final Node EXCLUSIVE = null;

/** 线程等待状态,状态值有:0、1、-1、-2、-3 */
volatile int waitStatus;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

/** 前驱节点 */
volatile Node prev;
/** 后继节点 */
volatile Node next;
/** 等待锁的线程 */
volatile Thread thread;

/** 和节点是否共享有关 */
Node nextWaiter;
}

属性说明:

方法和属性值 含义
waitStatus 当前节点在队列中的状态
thread 表示处于该节点的线程
prev 前驱指针
next 后继指针

waitStatus 是一个整型的 volatile 变量,用来维护 AQS 同步队列中线程节点的状态。waitStatus 有五个状态值:

  • 0 - 一个 Node 被初始化的时候的默认值
  • CANCELLED(1) - 表示线程获取锁的请求已经取消了
  • SIGNAL(-1) - 表示线程已经准备好了,就等资源释放了
  • CONDITION(-2) - 表示节点在等待队列中,节点线程等待唤醒
  • PROPAGATE(-3) - 当前线程处在 SHARED 情况下,该字段才会使用

独占锁的获取和释放

获取独占锁

AQS 中使用 acquire(int arg) 方法获取独占锁的相关源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// 利用 CAS 操作将当前线程加入等待队列队尾
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

// 自旋尝试为等待队列中的线程节点获取独占锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 获取锁成功,退出
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 线程中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

其大致流程如下:

  1. 先通过 tryAcquire 尝试获取同步状态,如果获取同步状态成功,则结束方法,直接返回。
  2. 若不成功,调用 addWaiter 方法,利用 CAS 操作将当前线程加入等待队列队尾。
  3. 接着,自旋尝试为等待队列中的线程节点获取独占锁,直到获取成功或线程中断。

释放独占锁

AQS 中使用 acquire(int arg) 方法获取独占锁的相关源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
Node h = head;
// 如果队列不为空,唤醒下一个节点中的线程
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
  1. 先尝试获取解锁线程的同步状态,如果获取同步状态不成功,则结束方法,直接返回。
  2. 如果获取同步状态成功且队列不为空,AQS 会尝试唤醒下一个节点中的线程。

获取可中断的独占锁

AQS 中使用 acquireInterruptibly(int arg) 方法获取可中断的独占锁。

acquireInterruptibly(int arg) 实现方式 相较于获取独占锁方法( acquire)非常相似,区别仅在于它会 通过 Thread.interrupted 检测当前线程是否被中断,如果是,则立即抛出中断异常(InterruptedException)。

限时获取独占锁

AQS 中使用 tryAcquireNanos(int arg) 方法获取超时等待的独占锁。

doAcquireNanos 的实现方式 相较于获取独占锁方法( acquire)非常相似,区别在于它会根据超时时间和当前时间计算出截止时间。在获取锁的流程中,会不断判断是否超时,如果超时,直接返回 false;如果没超时,则用 LockSupport.parkNanos 来阻塞当前线程。

共享锁的获取和释放

获取共享锁

AQS 中使用 acquireShared(int arg) 方法获取共享锁。

acquireShared 方法和 acquire 方法的逻辑很相似,区别仅在于自旋的条件以及节点出队的操作有所不同。

成功获得共享锁的条件如下:

  • tryAcquireShared(arg) 返回值大于等于 0 (这意味着共享锁的 permit 还没有用完)。
  • 当前节点的前驱节点是头结点。

释放共享锁

AQS 中使用 releaseShared(int arg) 方法释放共享锁。

releaseShared 首先会尝试释放同步状态,如果成功,则解锁一个或多个后继线程节点。释放共享锁和释放独占锁流程大体相似,区别在于:

对于独占模式,如果需要 SIGNAL,释放仅相当于调用头节点的 unparkSuccessor

获取可中断的共享锁

AQS 中使用 acquireSharedInterruptibly(int arg) 方法获取可中断的共享锁。

acquireSharedInterruptibly 方法与 acquireInterruptibly 几乎一致,不再赘述。

限时获取共享锁

AQS 中使用 tryAcquireSharedNanos(int arg) 方法获取超时等待式的共享锁。

tryAcquireSharedNanos 方法与 tryAcquireNanos 几乎一致,不再赘述。

自定义同步器

同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):

  1. 使用者继承 AbstractQueuedSynchronizer 并重写指定的方法。
  2. 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。

AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的钩子方法:

1
2
3
4
5
6
7
8
9
10
// 独占方式。尝试获取资源,成功则返回 true,失败则返回 false。
protected boolean tryAcquire(int)
// 独占方式。尝试释放资源,成功则返回 true,失败则返回 false。
protected boolean tryRelease(int)
// 共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int)
// 共享方式。尝试释放资源,成功则返回 true,失败则返回 false。
protected boolean tryReleaseShared(int)
// 该线程是否正在独占资源。只有用到 condition 才需要去实现它。
protected boolean isHeldExclusively()

什么是钩子方法呢? 钩子方法是一种被声明在抽象类中的方法,一般使用 protected 关键字修饰,它可以是空方法(由子类实现),也可以是默认实现的方法。模板设计模式通过钩子方法控制固定步骤的实现。

参考资料