Dunwu Blog

大道至简,知易行难

Java 容器简介

img

容器简介

数组与容器

Java 中常用的存储容器就是数组和容器,二者有以下区别:

  • 存储大小是否固定
    • 数组的长度固定
    • 容器的长度可变
  • 数据类型
    • 数组可以存储基本数据类型,也可以存储引用数据类型
    • 容器只能存储引用数据类型,基本数据类型的变量要转换成对应的包装类才能放入容器类中。

:bulb: 不了解什么是基本数据类型、引用数据类型、包装类这些概念,可以参考:Java 基本数据类型

容器框架

img

Java 容器框架主要分为 CollectionMap 两种。其中,Collection 又分为 ListSet 以及 Queue

  • Collection - 一个独立元素的序列,这些元素都服从一条或者多条规则。
    • List - 必须按照插入的顺序保存元素。
    • Set - 不能有重复的元素。
    • Queue - 按照排队规则来确定对象产生的顺序(通常与它们被插入的顺序相同)。
  • Map - 一组成对的“键值对”对象,允许你使用键来查找值。

容器的基本机制

Java 的容器具有一定的共性,它们或全部或部分依赖以下技术。所以,学习以下技术点,对于理解 Java 容器的特性和原理有很大的帮助。

泛型

Java 1.5 引入了泛型技术。

Java 容器通过泛型技术来保证其数据的类型安全。什么是类型安全呢?

举例来说:如果有一个 List<Object> 容器,Java 编译器在编译时不会对原始类型进行类型安全检查,却会对带参数的类型进行检查,通过使用 Object 作为类型,可以告知编译器该方法可以接受任何类型的对象,比如 String 或 Integer。

1
2
3
List<Object> list = new ArrayList<Object>();
list.add("123");
list.add(123);

如果没有泛型技术,如示例中的代码那样,容器中就可能存储任意数据类型,这是很危险的行为。

1
2
3
List<String> list = new ArrayList<String>();
list.add("123");
list.add(123);

:bulb: 想深入了解 Java 泛型技术的用法和原理可以参考:深入理解 Java 泛型

Iterable 和 Iterator

Iterable 和 Iterator 目的在于遍历访问容器中的元素。

Iterator 接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface Iterator<E> {

boolean hasNext();

E next();

default void remove() {
throw new UnsupportedOperationException("remove");
}

default void forEachRemaining(Consumer<? super E> action) {
Objects.requireNonNull(action);
while (hasNext())
action.accept(next());
}
}

Iterable 接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface Iterable<T> {

Iterator<T> iterator();

default void forEach(Consumer<? super T> action) {
Objects.requireNonNull(action);
for (T t : this) {
action.accept(t);
}
}

default Spliterator<T> spliterator() {
return Spliterators.spliteratorUnknownSize(iterator(), 0);
}
}

Collection 接口扩展了 Iterable 接口。

迭代其实我们可以简单地理解为遍历,是一个标准化遍历各类容器里面的所有对象的接口。它是一个经典的设计模式——迭代器模式(Iterator)。

迭代器模式 - 提供一种方法顺序访问一个聚合对象中各个元素,而又无须暴露该对象的内部表示

示例:迭代器遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class IteratorDemo {

public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
Iterator it = list.iterator();
while (it.hasNext()) {
System.out.println(it.next());
}
}

}

Comparable 和 Comparator

Comparable 是排序接口。若一个类实现了 Comparable 接口,表示该类的实例可以比较,也就意味着支持排序。实现了 Comparable 接口的类的对象的列表或数组可以通过 Collections.sortArrays.sort 进行自动排序。

Comparable 接口定义:

1
2
3
public interface Comparable<T> {
public int compareTo(T o);
}

Comparator 是比较接口,我们如果需要控制某个类的次序,而该类本身不支持排序(即没有实现 Comparable 接口),那么我们就可以建立一个“该类的比较器”来进行排序,这个“比较器”只需要实现 Comparator 接口即可。也就是说,我们可以通过实现 Comparator 来新建一个比较器,然后通过这个比较器对类进行排序。

Comparator 接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@FunctionalInterface
public interface Comparator<T> {

int compare(T o1, T o2);

boolean equals(Object obj);

// 反转
default Comparator<T> reversed() {
return Collections.reverseOrder(this);
}

default Comparator<T> thenComparing(Comparator<? super T> other) {
Objects.requireNonNull(other);
return (Comparator<T> & Serializable) (c1, c2) -> {
int res = compare(c1, c2);
return (res != 0) ? res : other.compare(c1, c2);
};
}

// thenComparingXXX 方法略

// 静态方法略
}

在 Java 容器中,一些可以排序的容器,如 TreeMapTreeSet,都可以通过传入 Comparator,来定义内部元素的排序规则。

Cloneable

Java 中 一个类要实现 clone 功能 必须实现 Cloneable 接口,否则在调用 clone() 时会报 CloneNotSupportedException 异常。

Java 中所有类都默认继承 java.lang.Object 类,在 java.lang.Object 类中有一个方法 clone(),这个方法将返回 Object 对象的一个拷贝。Object 类里的 clone() 方法仅仅用于浅拷贝(拷贝基本成员属性,对于引用类型仅返回指向改地址的引用)。

如果 Java 类需要深拷贝,需要覆写 clone() 方法。

fail-fast

fail-fast 的要点

Java 容器(如:ArrayList、HashMap、TreeSet 等待)的 javadoc 中常常提到类似的描述:

注意,迭代器的快速失败行为无法得到保证,因为一般来说,不可能对是否出现不同步并发修改做出任何硬性保证。快速失败(fail-fast)迭代器会尽最大努力抛出 ConcurrentModificationException。因此,为提高这类迭代器的正确性而编写一个依赖于此异常的程序是错误的做法:迭代器的快速失败行为应该仅用于检测 bug。

那么,我们不禁要问,什么是 fail-fast,为什么要有 fail-fast 机制?

fail-fast 是 Java 容器的一种错误检测机制。当多个线程对容器进行结构上的改变的操作时,就可能触发 fail-fast 机制。记住是有可能,而不是一定。

例如:假设存在两个线程(线程 1、线程 2),线程 1 通过 Iterator 在遍历容器 A 中的元素,在某个时候线程 2 修改了容器 A 的结构(是结构上面的修改,而不是简单的修改容器元素的内容),那么这个时候程序就会抛出 ConcurrentModificationException 异常,从而产生 fail-fast 机制。

容器在迭代操作中改变元素个数(添加、删除元素)都可能会导致 fail-fast

示例:fail-fast 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class FailFastDemo {

private static int MAX = 100;

private static List<Integer> list = new ArrayList<>();

public static void main(String[] args) {
for (int i = 0; i < MAX; i++) {
list.add(i);
}
new Thread(new MyThreadA()).start();
new Thread(new MyThreadB()).start();
}

/** 迭代遍历容器所有元素 */
static class MyThreadA implements Runnable {

@Override
public void run() {
Iterator<Integer> iterator = list.iterator();
while (iterator.hasNext()) {
int i = iterator.next();
System.out.println("MyThreadA 访问元素:" + i);
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}

/** 遍历删除指定范围内的所有偶数 */
static class MyThreadB implements Runnable {

@Override
public void run() {
int i = 0;
while (i < MAX) {
if (i % 2 == 0) {
System.out.println("MyThreadB 删除元素" + i);
list.remove(i);
}
i++;
}
}

}

}

执行后,会抛出 java.util.ConcurrentModificationException 异常。

解决 fail-fast

fail-fast 有两种解决方案:

  • 在遍历过程中所有涉及到改变容器个数的地方全部加上 synchronized 或者直接使用 Collections.synchronizedXXX 容器,这样就可以解决。但是不推荐,因为增删造成的同步锁可能会阻塞遍历操作,影响吞吐。
  • 使用并发容器,如:CopyOnWriterArrayList

容器和线程安全

为了在并发环境下安全地使用容器,Java 提供了同步容器和并发容器。

同步容器和并发容器详情请参考:Java 并发容器

参考资料

Java 容器之 Map

Map 简介

Map 架构

Map 家族主要成员功能如下:

  • Map 是 Map 容器家族的祖先,Map 是一个用于保存键值对(key-value)的接口。Map 中不能包含重复的键;每个键最多只能映射到一个值。
  • AbstractMap 继承了 Map 的抽象类,它实现了 Map 中的核心 API。其它 Map 的实现类可以通过继承 AbstractMap 来减少重复编码。
  • SortedMap 继承了 Map 的接口。SortedMap 中的内容是排序的键值对,排序的方法是通过实现比较器(Comparator)完成的。
  • NavigableMap 继承了 SortedMap 的接口。相比于 SortedMapNavigableMap 有一系列的“导航”方法;如”获取大于/等于某对象的键值对”、“获取小于/等于某对象的键值对”等等。
  • HashMap 继承了 AbstractMap,但没实现 NavigableMap 接口。HashMap 的主要作用是储存无序的键值对,而 Hash 也体现了它的查找效率很高。HashMap 是使用最广泛的 Map
  • Hashtable 虽然没有继承 AbstractMap,但它继承了 DictionaryDictionary 也是键值对的接口),而且也实现 Map 接口。因此,Hashtable 的主要作用是储存无序的键值对。和 HashMap 相比,Hashtable 在它的主要方法中使用 synchronized 关键字修饰,来保证线程安全。但是,由于它的锁粒度太大,非常影响读写速度,所以,现代 Java 程序几乎不会使用 Hashtable ,如果需要保证线程安全,一般会用 ConcurrentHashMap 来替代。
  • TreeMap 继承了 AbstractMap,且实现了 NavigableMap 接口。TreeMap 的主要作用是储存有序的键值对,排序依据根据元素类型的 Comparator 而定。
  • WeakHashMap 继承了 AbstractMapWeakHashMap 的键是弱引用 (即 WeakReference),它的主要作用是当 GC 内存不足时,会自动将 WeakHashMap 中的 key 回收,这避免了 WeakHashMap 的内存空间无限膨胀。很明显,WeakHashMap 适用于作为缓存。

Map 接口

Map 的定义如下:

1
public interface Map<K,V> { }

Map 是一个用于保存键值对(key-value)的接口。Map 中不能包含重复的键;每个键最多只能映射到一个值。

Map 接口提供三种 Collection 视图,允许以键集值集键-值映射关系集的形式访问数据。

Map 有些实现类,可以有序的保存元素,如 TreeMap;另一些实现类则不保证顺序,如 HashMap 类。

Map 的实现类应该提供 2 个“标准的”构造方法:

  • void(无参数)构造方法,用于创建空 Map;
  • 带有单个 Map 类型参数的构造方法,用于创建一个与其参数具有相同键-值映射关系的新 Map。

实际上,后一个构造方法允许用户复制任意 Map,生成所需类的一个等价 Map。尽管无法强制执行此建议(因为接口不能包含构造方法),但是 JDK 中所有通用的 Map 实现都遵从它。

Map.Entry 接口

Map.Entry 一般用于通过迭代器(Iterator)访问问 Map

Map.Entry 是 Map 中内部的一个接口,Map.Entry 代表了 键值对 实体,Map 通过 entrySet() 获取 Map.Entry 集合,从而通过该集合实现对键值对的操作。

AbstractMap 抽象类

AbstractMap 的定义如下:

1
public abstract class AbstractMap<K,V> implements Map<K,V> {}

AbstractMap 抽象类提供了 Map 接口的核心实现,以最大限度地减少实现 Map 接口所需的工作。

要实现不可修改的 Map,编程人员只需扩展此类并提供 entrySet() 方法的实现即可,该方法将返回 Map 的映射关系 Set 视图。通常,返回的 set 将依次在 AbstractSet 上实现。此 Set 不支持 add()remove() 方法,其迭代器也不支持 remove() 方法。

要实现可修改的 Map,编程人员必须另外重写此类的 put 方法(否则将抛出 UnsupportedOperationException),entrySet().iterator() 返回的迭代器也必须另外实现其 remove() 方法。

SortedMap 接口

SortedMap 的定义如下:

1
public interface SortedMap<K,V> extends Map<K,V> { }

SortedMap 继承了 Map ,它是一个有序的 Map

SortedMap 的排序方式有两种:自然排序或者用户指定比较器插入有序 SortedMap 的所有元素都必须实现 Comparable 接口(或者被指定的比较器所接受)

另外,所有 SortedMap 实现类都应该提供 4 个“标准”构造方法:

  1. void(无参数)构造方法,它创建一个空的有序 Map,按照键的自然顺序进行排序。
  2. 带有一个 Comparator 类型参数的构造方法,它创建一个空的有序 Map,根据指定的比较器进行排序。
  3. 带有一个 Map 类型参数的构造方法,它创建一个新的有序 Map,其键-值映射关系与参数相同,按照键的自然顺序进行排序。
  4. 带有一个 SortedMap 类型参数的构造方法,它创建一个新的有序 Map,其键-值映射关系和排序方法与输入的有序 Map 相同。无法保证强制实施此建议,因为接口不能包含构造方法。

NavigableMap 的定义如下:

1
public interface NavigableMap<K,V> extends SortedMap<K,V> { }

NavigableMap 继承了 SortedMap ,它提供了丰富的查找方法。

NavigableMap 分别提供了获取“键”、“键-值对”、“键集”、“键-值对集”的相关方法。

NavigableMap 提供的功能可以分为 4 类:

  • 获取键-值对
    • lowerEntryfloorEntryceilingEntryhigherEntry 方法,它们分别返回与小于、小于等于、大于等于、大于给定键的键关联的 Map.Entry 对象。
    • firstEntrypollFirstEntrylastEntrypollLastEntry 方法,它们返回和/或移除最小和最大的映射关系(如果存在),否则返回 null。
  • 获取键。这个和第 1 类比较类似。
    • lowerKeyfloorKeyceilingKeyhigherKey 方法,它们分别返回与小于、小于等于、大于等于、大于给定键的键。
  • 获取键的集合
    • navigableKeySetdescendingKeySet 分别获取正序/反序的键集。
  • 获取键-值对的子集

Dictionary 抽象类

Dictionary 的定义如下:

1
public abstract class Dictionary<K,V> {}

Dictionary 是 JDK 1.0 定义的操作键值对的抽象类,它包括了操作键值对的基本方法。

HashMap 类

HashMap 类是最常用的 Map

HashMap 要点

HashMap 的命名,也可以看出:**HashMap 以散列方式存储键值对**。

HashMap 允许使用空值和空键。(HashMap 类大致等同于 Hashtable,除了它是不同步的并且允许为空值。)这个类不保序;特别是,它的元素顺序可能会随着时间的推移变化。

HashMap 有两个影响其性能的参数:初始容量和负载因子

  • 容量是哈希表中桶的数量,初始容量就是哈希表创建时的容量。
  • 加载因子是散列表在其容量自动扩容之前被允许的最大饱和量。当哈希表中的 entry 数量超过负载因子和当前容量的乘积时,散列表就会被重新映射(即重建内部数据结构),一般散列表大约是存储桶数量的两倍。

通常,默认加载因子(0.75)在时间和空间成本之间提供了良好的平衡。较高的值会减少空间开销,但会增加查找成本(反映在大部分 HashMap 类的操作中,包括 getput)。在设置初始容量时,应考虑映射中的条目数量及其负载因子,以尽量减少重新运行操作的次数。如果初始容量大于最大入口数除以负载因子,则不会发生重新刷新操作。

如果许多映射要存储在 HashMap 实例中,使用足够大的容量创建映射将允许映射存储的效率高于根据需要执行自动重新散列以增长表。请注意,使用多个具有相同 hashCode() 的密钥是降低任何散列表性能的一个可靠方法。为了改善影响,当键是 Comparable 时,该类可以使用键之间的比较顺序来帮助断开关系。

HashMap 不是线程安全的。

HashMap 原理

HashMap 数据结构

HashMap 的核心字段:

  • table - HashMap 使用一个 Node<K,V>[] 类型的数组 table 来储存元素。
  • size - 初始容量。 初始为 16,每次容量不够自动扩容
  • loadFactor - 负载因子。自动扩容之前被允许的最大饱和量,默认 0.75。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class HashMap<K,V> extends AbstractMap<K,V>
implements Map<K,V>, Cloneable, Serializable {

// 该表在初次使用时初始化,并根据需要调整大小。分配时,长度总是2的幂。
transient Node<K,V>[] table;
// 保存缓存的 entrySet()。请注意,AbstractMap 字段用于 keySet() 和 values()。
transient Set<Map.Entry<K,V>> entrySet;
// map 中的键值对数
transient int size;
// 这个HashMap被结构修改的次数结构修改是那些改变HashMap中的映射数量或者修改其内部结构(例如,重新散列)的修改。
transient int modCount;
// 下一个调整大小的值(容量*加载因子)。
int threshold;
// 散列表的加载因子
final float loadFactor;
}

HashMap 构造方法

1
2
3
4
public HashMap(); // 默认加载因子0.75
public HashMap(int initialCapacity); // 默认加载因子0.75;以 initialCapacity 初始化容量
public HashMap(int initialCapacity, float loadFactor); // 以 initialCapacity 初始化容量;以 loadFactor 初始化加载因子
public HashMap(Map<? extends K, ? extends V> m) // 默认加载因子0.75

put 方法的实现

put 方法大致的思路为:

  • 对 key 的 hashCode() 做 hash 计算,然后根据 hash 值再计算 Node 的存储位置;
  • 如果没有哈希碰撞,直接放到桶里;如果有哈希碰撞,以链表的形式存在桶后。
  • 如果哈希碰撞导致链表过长(大于等于 TREEIFY_THRESHOLD,数值为 8),就把链表转换成红黑树;
  • 如果节点已经存在就替换旧值
  • 桶数量超过容量*负载因子(即 load factor * current capacity),HashMap 调用 resize 自动扩容一倍

具体代码的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public V put(K key, V value) {
return putVal(hash(key), key, value, false, true);
}

// hashcode 无符号位移 16 位
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
// tab 为空则创建
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
// 计算 index,并对 null 做处理
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
// 节点存在
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
// 该链为树
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
// 该链为链表
else {
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
// 写入
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}

为什么计算 hash 使用 hashcode 无符号位移 16 位。

假设要添加两个对象 a 和 b,如果数组长度是 16,这时对象 a 和 b 通过公式 (n - 1) & hash 运算,也就是 (16-1)&a.hashCode 和 (16-1)&b.hashCode,15 的二进制为 0000000000000000000000000001111,假设对象 A 的 hashCode 为 1000010001110001000001111000000,对象 B 的 hashCode 为 0111011100111000101000010100000,你会发现上述与运算结果都是 0。这样的哈希结果就太让人失望了,很明显不是一个好的哈希算法。

但如果我们将 hashCode 值右移 16 位(h >>> 16 代表无符号右移 16 位),也就是取 int 类型的一半,刚好可以将该二进制数对半切开,并且使用位异或运算(如果两个数对应的位置相反,则结果为 1,反之为 0),这样的话,就能避免上面的情况发生。这就是 hash() 方法的具体实现方式。简而言之,就是尽量打乱 hashCode 真正参与运算的低 16 位。

get 方法的实现

在理解了 put 之后,get 就很简单了。大致思路如下:

  • 对 key 的 hashCode() 做 hash 计算,然后根据 hash 值再计算桶的 index

  • 如果桶中的第一个节点命中,直接返回;

  • 如果有冲突,则通过 key.equals(k) 去查找对应的 entry

    • 若为树,则在红黑树中通过 key.equals(k) 查找,O(logn);

    • 若为链表,则在链表中通过 key.equals(k) 查找,O(n)。

具体代码的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public V get(Object key) {
Node<K,V> e;
return (e = getNode(hash(key), key)) == null ? null : e.value;
}

final Node<K,V> getNode(int hash, Object key) {
Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & hash]) != null) {
// 直接命中
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k))))
return first;
// 未命中
if ((e = first.next) != null) {
// 在树中 get
if (first instanceof TreeNode)
return ((TreeNode<K,V>)first).getTreeNode(hash, key);
// 在链表中 get
do {
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}
return null;
}

hash 方法的实现

HashMap **计算桶下标(index)公式:key.hashCode() ^ (h >>> 16)**。

下面针对这个公式来详细讲解。

getput 的过程中,计算下标时,先对 hashCode 进行 hash 操作,然后再通过 hash 值进一步计算下标,如下图所示:

在对 hashCode() 计算 hash 时具体实现是这样的:

1
2
3
4
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

可以看到这个方法大概的作用就是:高 16bit 不变,低 16bit 和高 16bit 做了一个异或。

在设计 hash 方法时,因为目前的 table 长度 n 为 2 的幂,而计算下标的时候,是这样实现的(使用 & 位操作,而非 % 求余):

1
(n - 1) & hash

设计者认为这方法很容易发生碰撞。为什么这么说呢?不妨思考一下,在 n - 1 为 15(0x1111) 时,其实散列真正生效的只是低 4bit 的有效位,当然容易碰撞了。

因此,设计者想了一个顾全大局的方法(综合考虑了速度、作用、质量),就是把高 16bit 和低 16bit 异或了一下。设计者还解释到因为现在大多数的 hashCode 的分布已经很不错了,就算是发生了碰撞也用 O(logn)的 tree 去做了。仅仅异或一下,既减少了系统的开销,也不会造成的因为高位没有参与下标的计算(table 长度比较小时),从而引起的碰撞。

如果还是产生了频繁的碰撞,会发生什么问题呢?作者注释说,他们使用树来处理频繁的碰撞(we use trees to handle large sets of collisions in bins),在 JEP-180 中,描述了这个问题:

Improve the performance of java.util.HashMap under high hash-collision conditions by using balanced trees rather than linked lists to store map entries. Implement the same improvement in the LinkedHashMap class.

之前已经提过,在获取 HashMap 的元素时,基本分两步:

  1. 首先根据 hashCode()做 hash,然后确定 bucket 的 index;

  2. 如果 bucket 的节点的 key 不是我们需要的,则通过 keys.equals()在链中找。

在 JDK8 之前的实现中是用链表解决冲突的,在产生碰撞的情况下,进行 get 时,两步的时间复杂度是 O(1)+O(n)。因此,当碰撞很厉害的时候 n 很大,O(n)的速度显然是影响速度的。

因此在 JDK8 中,利用红黑树替换链表,这样复杂度就变成了 O(1)+O(logn)了,这样在 n 很大的时候,能够比较理想的解决这个问题,在 JDK8:HashMap 的性能提升一文中有性能测试的结果。

resize 的实现

put 时,如果发现目前的 bucket 占用程度已经超过了 Load Factor 所希望的比例,那么就会发生 resize。在 resize 的过程,简单的说就是把 bucket 扩充为 2 倍,之后重新计算 index,把节点再放到新的 bucket 中。

当超过限制的时候会 resize,然而又因为我们使用的是 2 次幂的扩展(指长度扩为原来 2 倍),所以,元素的位置要么是在原位置,要么是在原位置再移动 2 次幂的位置。

怎么理解呢?例如我们从 16 扩展为 32 时,具体的变化如下所示:

因此元素在重新计算 hash 之后,因为 n 变为 2 倍,那么 n-1 的 mask 范围在高位多 1bit(红色),因此新的 index 就会发生这样的变化:

因此,我们在扩充 HashMap 的时候,不需要重新计算 hash,只需要看看原来的 hash 值新增的那个 bit 是 1 还是 0 就好了,是 0 的话索引没变,是 1 的话索引变成“原索引+oldCap”。可以看看下图为 16 扩充为 32 的 resize 示意图:

这个设计确实非常的巧妙,既省去了重新计算 hash 值的时间,而且同时,由于新增的 1bit 是 0 还是 1 可以认为是随机的,因此 resize 的过程,均匀的把之前的冲突的节点分散到新的 bucket 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
final Node<K,V>[] resize() {
Node<K,V>[] oldTab = table;
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
int newCap, newThr = 0;
if (oldCap > 0) {
// 超过最大值就不再扩充了,就只好随你碰撞去吧
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
// 没超过最大值,就扩充为原来的 2 倍
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; // double threshold
}
else if (oldThr > 0) // initial capacity was placed in threshold
newCap = oldThr;
else { // zero initial threshold signifies using defaults
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}

// 计算新的 resize 上限
if (newThr == 0) {
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
if (oldTab != null) {
// 把每个 bucket 都移动到新的 buckets 中
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
if (e.next == null)
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode)
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else { // preserve order
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
// 原索引
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
// 原索引+oldCap
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
// 原索引放到bucket里
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
// 原索引+oldCap放到bucket里
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}

LinkedHashMap 类

LinkedHashMap 要点

LinkedHashMap 通过维护一个保存所有条目(Entry)的双向链表,保证了元素迭代的顺序(即插入顺序)

关注点 结论
是否允许键值对为 null Key 和 Value 都允许 null
是否允许重复数据 Key 重复会覆盖、Value 允许重复
是否有序 按照元素插入顺序存储
是否线程安全 非线程安全

LinkedHashMap 要点

LinkedHashMap 数据结构

LinkedHashMap 通过维护一对 LinkedHashMap.Entry<K,V> 类型的头尾指针,以双链表形式,保存所有数据

学习过数据结构的双链表,就能理解其元素存储以及访问必然是有序的。

1
2
3
4
5
6
7
8
9
10
11
public class LinkedHashMap<K,V>
extends HashMap<K,V>
implements Map<K,V> {

// 双链表的头指针
transient LinkedHashMap.Entry<K,V> head;
// 双链表的尾指针
transient LinkedHashMap.Entry<K,V> tail;
// 迭代排序方法:true 表示访问顺序;false 表示插入顺序
final boolean accessOrder;
}

LinkedHashMap 继承了 HashMapput 方法,本身没有实现 put 方法。

TreeMap 类

TreeMap 要点

TreeMap 基于红黑树实现。

TreeMap 是有序的。它的排序规则是:根据 map 中的 key 的自然语义顺序或提供的比较器(Comparator)的自定义比较顺序。

TreeMap 不是线程安全的。

TreeMap 原理

put 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public V put(K key, V value) {
Entry<K,V> t = root;
// 如果根节点为 null,插入第一个节点
if (t == null) {
compare(key, key); // type (and possibly null) check

root = new Entry<>(key, value, null);
size = 1;
modCount++;
return null;
}
int cmp;
Entry<K,V> parent;
// split comparator and comparable paths
Comparator<? super K> cpr = comparator;
// 每个节点的左孩子节点的值小于它;右孩子节点的值大于它
// 如果有比较器,使用比较器进行比较
if (cpr != null) {
do {
parent = t;
cmp = cpr.compare(key, t.key);
if (cmp < 0)
t = t.left;
else if (cmp > 0)
t = t.right;
else
return t.setValue(value);
} while (t != null);
}
// 没有比较器,使用 key 的自然顺序进行比较
else {
if (key == null)
throw new NullPointerException();
@SuppressWarnings("unchecked")
Comparable<? super K> k = (Comparable<? super K>) key;
do {
parent = t;
cmp = k.compareTo(t.key);
if (cmp < 0)
t = t.left;
else if (cmp > 0)
t = t.right;
else
return t.setValue(value);
} while (t != null);
}
// 通过上面的遍历未找到 key 值,则新插入节点
Entry<K,V> e = new Entry<>(key, value, parent);
if (cmp < 0)
parent.left = e;
else
parent.right = e;
// 插入后,为了维持红黑树的平衡需要调整
fixAfterInsertion(e);
size++;
modCount++;
return null;
}

get 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public V get(Object key) {
Entry<K,V> p = getEntry(key);
return (p==null ? null : p.value);
}

final Entry<K,V> getEntry(Object key) {
// Offload comparator-based version for sake of performance
if (comparator != null)
return getEntryUsingComparator(key);
if (key == null)
throw new NullPointerException();
@SuppressWarnings("unchecked")
Comparable<? super K> k = (Comparable<? super K>) key;
Entry<K,V> p = root;
// 按照二叉树搜索的方式进行搜索,搜到返回
while (p != null) {
int cmp = k.compareTo(p.key);
if (cmp < 0)
p = p.left;
else if (cmp > 0)
p = p.right;
else
return p;
}
return null;
}

remove 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public V remove(Object key) {
Entry<K,V> p = getEntry(key);
if (p == null)
return null;

V oldValue = p.value;
deleteEntry(p);
return oldValue;
}
private void deleteEntry(Entry<K,V> p) {
modCount++;
size--;

// 如果当前节点有左右孩子节点,使用后继节点替换要删除的节点
// If strictly internal, copy successor's element to p and then make p
// point to successor.
if (p.left != null && p.right != null) {
Entry<K,V> s = successor(p);
p.key = s.key;
p.value = s.value;
p = s;
} // p has 2 children

// Start fixup at replacement node, if it exists.
Entry<K,V> replacement = (p.left != null ? p.left : p.right);

if (replacement != null) { // 要删除的节点有一个孩子节点
// Link replacement to parent
replacement.parent = p.parent;
if (p.parent == null)
root = replacement;
else if (p == p.parent.left)
p.parent.left = replacement;
else
p.parent.right = replacement;

// Null out links so they are OK to use by fixAfterDeletion.
p.left = p.right = p.parent = null;

// Fix replacement
if (p.color == BLACK)
fixAfterDeletion(replacement);
} else if (p.parent == null) { // return if we are the only node.
root = null;
} else { // No children. Use self as phantom replacement and unlink.
if (p.color == BLACK)
fixAfterDeletion(p);

if (p.parent != null) {
if (p == p.parent.left)
p.parent.left = null;
else if (p == p.parent.right)
p.parent.right = null;
p.parent = null;
}
}
}

TreeMap 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class TreeMapDemo {

private static final String[] chars = "A B C D E F G H I J K L M N O P Q R S T U V W X Y Z".split(" ");

public static void main(String[] args) {
TreeMap<Integer, String> treeMap = new TreeMap<>();
for (int i = 0; i < chars.length; i++) {
treeMap.put(i, chars[i]);
}
System.out.println(treeMap);
Integer low = treeMap.firstKey();
Integer high = treeMap.lastKey();
System.out.println(low);
System.out.println(high);
Iterator<Integer> it = treeMap.keySet().iterator();
for (int i = 0; i <= 6; i++) {
if (i == 3) { low = it.next(); }
if (i == 6) { high = it.next(); } else { it.next(); }
}
System.out.println(low);
System.out.println(high);
System.out.println(treeMap.subMap(low, high));
System.out.println(treeMap.headMap(high));
System.out.println(treeMap.tailMap(low));
}
}

WeakHashMap

WeakHashMap 的定义如下:

1
2
3
public class WeakHashMap<K,V>
extends AbstractMap<K,V>
implements Map<K,V> {}

WeakHashMap 继承了 AbstractMap,实现了 Map 接口。

和 HashMap 一样,WeakHashMap 也是一个散列表,它存储的内容也是键值对(key-value)映射,而且键和值都可以是 null。

不过 WeakHashMap 的键是弱键。在 WeakHashMap 中,当某个键不再被其它对象引用,会被从 WeakHashMap 中被自动移除。更精确地说,对于一个给定的键,其映射的存在并不阻止垃圾回收器对该键的丢弃,这就使该键成为可终止的,被终止,然后被回收。某个键被终止时,它对应的键值对也就从映射中有效地移除了。

这个弱键的原理呢?大致上就是,通过 WeakReference 和 ReferenceQueue 实现的。

WeakHashMap 的 key 是弱键,即是 WeakReference 类型的;ReferenceQueue 是一个队列,它会保存被 GC 回收的弱键。实现步骤是:

  1. 新建 WeakHashMap,将键值对添加到 WeakHashMap 中。实际上,WeakHashMap 是通过数组 table 保存 Entry(键值对);每一个 Entry 实际上是一个单向链表,即 Entry 是键值对链表。
  2. 当某弱键不再被其它对象引用,并被 GC 回收时。在 GC 回收该弱键时,这个弱键也同时会被添加到 ReferenceQueue(queue)队列中。
  3. 当下一次我们需要操作 WeakHashMap 时,会先同步 table 和 queue。table 中保存了全部的键值对,而 queue 中保存被 GC 回收的键值对;同步它们,就是删除 table 中被 GC 回收的键值对。

这就是弱键如何被自动从 WeakHashMap 中删除的步骤了。

和 HashMap 一样,WeakHashMap 是不同步的。可以使用 Collections.synchronizedMap 方法来构造同步的 WeakHashMap。

总结

Map 简介

img

HashMap

img

其他 Map

img

参考资料

Java 容器之 Set

Set 简介

Set 家族成员简介:

  • Set 继承了 Collection 的接口。实际上 Set 就是 Collection,只是行为略有不同:Set 集合不允许有重复元素。
  • SortedSet 继承了 Set 的接口。SortedSet 中的内容是排序的唯一值,排序的方法是通过比较器(Comparator)。
  • NavigableSet 继承了 SortedSet 的接口。它提供了丰富的查找方法:如”获取大于/等于某值的元素”、“获取小于/等于某值的元素”等等。
  • AbstractSet 是一个抽象类,它继承于 AbstractCollectionAbstractCollection 实现了 Set 中的绝大部分方法,为实现 Set 的实例类提供了便利。
  • HashSet 类依赖于 HashMap,它实际上是通过 HashMap 实现的。HashSet 中的元素是无序的、散列的。
  • TreeSet 类依赖于 TreeMap,它实际上是通过 TreeMap 实现的。TreeSet 中的元素是有序的,它是按自然排序或者用户指定比较器排序的 Set。
  • LinkedHashSet 是按插入顺序排序的 Set。
  • EnumSet 是只能存放 Emum 枚举类型的 Set。

Set 接口

Set 继承了 Collection 的接口。实际上,Set 就是 Collection,二者提供的方法完全相同。

Set 接口定义如下:

1
public interface Set<E> extends Collection<E> {}

SortedSet 接口

继承了 Set 的接口。SortedSet 中的内容是排序的唯一值,排序的方法是通过比较器(Comparator)。

SortedSet 接口定义如下:

1
public interface SortedSet<E> extends Set<E> {}

SortedSet 接口新扩展的方法:

  • comparator - 返回 Comparator
  • subSet - 返回指定区间的子集
  • headSet - 返回小于指定元素的子集
  • tailSet - 返回大于指定元素的子集
  • first - 返回第一个元素
  • last - 返回最后一个元素
  • spliterator

NavigableSet 继承了 SortedSet。它提供了丰富的查找方法。

NavigableSet 接口定义如下:

1
public interface NavigableSet<E> extends SortedSet<E> {}

NavigableSet 接口新扩展的方法:

  • lower - 返回小于指定值的元素中最接近的元素
  • higher - 返回大于指定值的元素中最接近的元素
  • floor - 返回小于或等于指定值的元素中最接近的元素
  • ceiling - 返回大于或等于指定值的元素中最接近的元素
  • pollFirst - 检索并移除第一个(最小的)元素
  • pollLast - 检索并移除最后一个(最大的)元素
  • descendingSet - 返回反序排列的 Set
  • descendingIterator - 返回反序排列的 Set 的迭代器
  • subSet - 返回指定区间的子集
  • headSet - 返回小于指定元素的子集
  • tailSet - 返回大于指定元素的子集

AbstractSet 抽象类

AbstractSet 类提供 Set 接口的核心实现,以最大限度地减少实现 Set 接口所需的工作。

AbstractSet 抽象类定义如下:

1
public abstract class AbstractSet<E> extends AbstractCollection<E> implements Set<E> {}

事实上,主要的实现已经在 AbstractCollection 中完成。

HashSet 类

HashSet 类依赖于 HashMap,它实际上是通过 HashMap 实现的。HashSet 中的元素是无序的、散列的。

HashSet 类定义如下:

1
2
3
public class HashSet<E>
extends AbstractSet<E>
implements Set<E>, Cloneable, java.io.Serializable {}

HashSet 要点

  • HashSet 通过继承 AbstractSet 实现了 Set 接口中的骨干方法。
  • HashSet 实现了 Cloneable,所以支持克隆。
  • HashSet 实现了 Serializable,所以支持序列化。
  • HashSet 中存储的元素是无序的。
  • HashSet 允许 null 值的元素。
  • HashSet 不是线程安全的。

HashSet 原理

HashSet 是基于 HashMap 实现的。

1
2
3
4
5
6
// HashSet 的核心,通过维护一个 HashMap 实体来实现 HashSet 方法
private transient HashMap<E,Object> map;

// PRESENT 是用于关联 map 中当前操作元素的一个虚拟值
private static final Object PRESENT = new Object();
}
  • HashSet 中维护了一个 HashMap 对象 map,HashSet 的重要方法,如 addremoveiteratorclearsize 等都是围绕 map 实现的。
    • HashSet 类中通过定义 writeObject()readObject() 方法确定了其序列化和反序列化的机制。
  • PRESENT 是用于关联 map 中当前操作元素的一个虚拟值。

TreeSet 类

TreeSet 类依赖于 TreeMap,它实际上是通过 TreeMap 实现的。TreeSet 中的元素是有序的,它是按自然排序或者用户指定比较器排序的 Set。

TreeSet 类定义如下:

1
2
public class TreeSet<E> extends AbstractSet<E>
implements NavigableSet<E>, Cloneable, java.io.Serializable {}

TreeSet 要点

  • TreeSet 通过继承 AbstractSet 实现了 NavigableSet 接口中的骨干方法。
  • TreeSet 实现了 Cloneable,所以支持克隆。
  • TreeSet 实现了 Serializable,所以支持序列化。
  • TreeSet 中存储的元素是有序的。排序规则是自然顺序或比较器(Comparator)中提供的顺序规则。
  • TreeSet 不是线程安全的。

TreeSet 源码

TreeSet 是基于 TreeMap 实现的。

1
2
3
4
5
// TreeSet 的核心,通过维护一个 NavigableMap 实体来实现 TreeSet 方法
private transient NavigableMap<E,Object> m;

// PRESENT 是用于关联 map 中当前操作元素的一个虚拟值
private static final Object PRESENT = new Object();
  • TreeSet 中维护了一个 NavigableMap 对象 map(实际上是一个 TreeMap 实例),TreeSet 的重要方法,如 addremoveiteratorclearsize 等都是围绕 map 实现的。
  • PRESENT 是用于关联 map 中当前操作元素的一个虚拟值。TreeSet 中的元素都被当成 TreeMap 的 key 存储,而 value 都填的是 PRESENT

LinkedHashSet 类

LinkedHashSet 是按插入顺序排序的 Set。

LinkedHashSet 类定义如下:

1
2
3
public class LinkedHashSet<E>
extends HashSet<E>
implements Set<E>, Cloneable, java.io.Serializable {}

LinkedHashSet 要点

  • LinkedHashSet 通过继承 HashSet 实现了 Set 接口中的骨干方法。
  • LinkedHashSet 实现了 Cloneable,所以支持克隆。
  • LinkedHashSet 实现了 Serializable,所以支持序列化。
  • LinkedHashSet 中存储的元素是按照插入顺序保存的。
  • LinkedHashSet 不是线程安全的。

LinkedHashSet 原理

LinkedHashSet 有三个构造方法,无一例外,都是调用父类 HashSet 的构造方法。

1
2
3
4
5
6
7
8
9
public LinkedHashSet(int initialCapacity, float loadFactor) {
super(initialCapacity, loadFactor, true);
}
public LinkedHashSet(int initialCapacity) {
super(initialCapacity, .75f, true);
}
public LinkedHashSet() {
super(16, .75f, true);
}

需要强调的是:LinkedHashSet 构造方法实际上调用的是父类 HashSet 的非 public 构造方法。

1
2
3
HashSet(int initialCapacity, float loadFactor, boolean dummy) {
map = new LinkedHashMap<>(initialCapacity, loadFactor);
}

不同于 HashSet public 构造方法中初始化的 HashMap 实例,这个构造方法中,初始化了 LinkedHashMap 实例。

也就是说,实际上,LinkedHashSet 维护了一个双链表。由双链表的特性可以知道,它是按照元素的插入顺序保存的。所以,这就是 LinkedHashSet 中存储的元素是按照插入顺序保存的原理。

EnumSet 类

EnumSet 类定义如下:

1
2
public abstract class EnumSet<E extends Enum<E>> extends AbstractSet<E>
implements Cloneable, java.io.Serializable {}

EnumSet 要点

  • EnumSet 继承了 AbstractSet,所以有 Set 接口中的骨干方法。
  • EnumSet 实现了 Cloneable,所以支持克隆。
  • EnumSet 实现了 Serializable,所以支持序列化。
  • EnumSet 通过 <E extends Enum<E>> 限定了存储元素必须是枚举值。
  • EnumSet 没有构造方法,只能通过类中的 static 方法来创建 EnumSet 对象。
  • EnumSet 是有序的。以枚举值在 EnumSet 类中的定义顺序来决定集合元素的顺序。
  • EnumSet 不是线程安全的。

要点总结

img

参考资料

Java 原子变量类

原子变量类简介

为何需要原子变量类

保证线程安全是 Java 并发编程必须要解决的重要问题。Java 从原子性、可见性、有序性这三大特性入手,确保多线程的数据一致性。

  • 确保线程安全最常见的做法是利用锁机制(Locksychronized)来对共享数据做互斥同步,这样在同一个时刻,只有一个线程可以执行某个方法或者某个代码块,那么操作必然是原子性的,线程安全的。互斥同步最主要的问题是线程阻塞和唤醒所带来的性能问题。
  • volatile 是轻量级的锁(自然比普通锁性能要好),它保证了共享变量在多线程中的可见性,但无法保证原子性。所以,它只能在一些特定场景下使用。
  • 为了兼顾原子性以及锁带来的性能问题,Java 引入了 CAS (主要体现在 Unsafe 类)来实现非阻塞同步(也叫乐观锁)。并基于 CAS ,提供了一套原子工具类。

原子变量类的作用

原子变量类 比锁的粒度更细,更轻量级,并且对于在多处理器系统上实现高性能的并发代码来说是非常关键的。原子变量将发生竞争的范围缩小到单个变量上。

原子变量类相当于一种泛化的 volatile 变量,能够支持原子的、有条件的读/改/写操作。

原子类在内部使用 CAS 指令(基于硬件的支持)来实现同步。这些指令通常比锁更快。

原子变量类可以分为 4 组:

  • 基本类型
    • AtomicBoolean - 布尔类型原子类
    • AtomicInteger - 整型原子类
    • AtomicLong - 长整型原子类
  • 引用类型
    • AtomicReference - 引用类型原子类
    • AtomicMarkableReference - 带有标记位的引用类型原子类
    • AtomicStampedReference - 带有版本号的引用类型原子类
  • 数组类型
    • AtomicIntegerArray - 整形数组原子类
    • AtomicLongArray - 长整型数组原子类
    • AtomicReferenceArray - 引用类型数组原子类
  • 属性更新器类型
    • AtomicIntegerFieldUpdater - 整型字段的原子更新器。
    • AtomicLongFieldUpdater - 长整型字段的原子更新器。
    • AtomicReferenceFieldUpdater - 原子更新引用类型里的字段。

这里不对 CAS、volatile、互斥同步做深入探讨。如果想了解更多细节,不妨参考:Java 并发核心机制

基本类型

这一类型的原子类是针对 Java 基本类型进行操作。

  • AtomicBoolean - 布尔类型原子类
  • AtomicInteger - 整型原子类
  • AtomicLong - 长整型原子类

以上类都支持 CAS(compare-and-swap)技术,此外,AtomicIntegerAtomicLong 还支持算术运算。

:bulb: 提示:

虽然 Java 只提供了 AtomicBooleanAtomicIntegerAtomicLong,但是可以模拟其他基本类型的原子变量。要想模拟其他基本类型的原子变量,可以将 shortbyte 等类型与 int 类型进行转换,以及使用 Float.floatToIntBitsDouble.doubleToLongBits 来转换浮点数。

由于 AtomicBooleanAtomicIntegerAtomicLong 实现方式、使用方式都相近,所以本文仅针对 AtomicInteger 进行介绍。

AtomicInteger 用法

1
2
3
4
5
6
7
public final int get() // 获取当前值
public final int getAndSet(int newValue) // 获取当前值,并设置新值
public final int getAndIncrement()// 获取当前值,并自增
public final int getAndDecrement() // 获取当前值,并自减
public final int getAndAdd(int delta) // 获取当前值,并加上预期值
boolean compareAndSet(int expect, int update) // 如果输入值(update)等于预期值,将该值设置为输入值
public final void lazySet(int newValue) // 最终设置为 newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

AtomicInteger 使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class AtomicIntegerDemo {

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
AtomicInteger count = new AtomicInteger(0);
for (int i = 0; i < 1000; i++) {
executorService.submit((Runnable) () -> {
System.out.println(Thread.currentThread().getName() + " count=" + count.get());
count.incrementAndGet();
});
}

executorService.shutdown();
executorService.awaitTermination(30, TimeUnit.SECONDS);
System.out.println("Final Count is : " + count.get());
}
}

AtomicInteger 实现

阅读 AtomicInteger 源码,可以看到如下定义:

1
2
3
4
5
6
7
8
9
10
11
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

private volatile int value;

说明:

  • value - value 属性使用 volatile 修饰,使得对 value 的修改在并发环境下对所有线程可见。
  • valueOffset - value 属性的偏移量,通过这个偏移量可以快速定位到 value 字段,这个是实现 AtomicInteger 的关键。
  • unsafe - Unsafe 类型的属性,它为 AtomicInteger 提供了 CAS 操作。

引用类型

Java 数据类型分为 基本数据类型引用数据类型 两大类(不了解 Java 数据类型划分可以参考: Java 基本数据类型 )。

上一节中提到了针对基本数据类型的原子类,那么如果想针对引用类型做原子操作怎么办?Java 也提供了相关的原子类:

  • AtomicReference - 引用类型原子类
  • AtomicMarkableReference - 带有标记位的引用类型原子类
  • AtomicStampedReference - 带有版本号的引用类型原子类

AtomicStampedReference 类在引用类型原子类中,彻底地解决了 ABA 问题,其它的 CAS 能力与另外两个类相近,所以最具代表性。因此,本节只针对 AtomicStampedReference 进行说明。

示例:基于 AtomicReference 实现一个简单的自旋锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class AtomicReferenceDemo2 {

private static int ticket = 10;

public static void main(String[] args) {
threadSafeDemo();
}

private static void threadSafeDemo() {
SpinLock lock = new SpinLock();
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executorService.execute(new MyThread(lock));
}
executorService.shutdown();
}

/**
* 基于 {@link AtomicReference} 实现的简单自旋锁
*/
static class SpinLock {

private AtomicReference<Thread> atomicReference = new AtomicReference<>();

public void lock() {
Thread current = Thread.currentThread();
while (!atomicReference.compareAndSet(null, current)) {}
}

public void unlock() {
Thread current = Thread.currentThread();
atomicReference.compareAndSet(current, null);
}

}

/**
* 利用自旋锁 {@link SpinLock} 并发处理数据
*/
static class MyThread implements Runnable {

private SpinLock lock;

public MyThread(SpinLock lock) {
this.lock = lock;
}

@Override
public void run() {
while (ticket > 0) {
lock.lock();
if (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}
lock.unlock();
}
}

}

}

原子类的实现基于 CAS 机制,而 CAS 存在 ABA 问题(不了解 ABA 问题,可以参考:Java 并发基础机制 - CAS 的问题)。正是为了解决 ABA 问题,才有了 AtomicMarkableReferenceAtomicStampedReference

AtomicMarkableReference 使用一个布尔值作为标记,修改时在 true / false 之间切换。这种策略不能根本上解决 ABA 问题,但是可以降低 ABA 发生的几率。常用于缓存或者状态描述这样的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class AtomicMarkableReferenceDemo {

private final static String INIT_TEXT = "abc";

public static void main(String[] args) throws InterruptedException {

final AtomicMarkableReference<String> amr = new AtomicMarkableReference<>(INIT_TEXT, false);

ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(Math.abs((int) (Math.random() * 100)));
} catch (InterruptedException e) {
e.printStackTrace();
}

String name = Thread.currentThread().getName();
if (amr.compareAndSet(INIT_TEXT, name, amr.isMarked(), !amr.isMarked())) {
System.out.println(Thread.currentThread().getName() + " 修改了对象!");
System.out.println("新的对象为:" + amr.getReference());
}
}
});
}

executorService.shutdown();
executorService.awaitTermination(3, TimeUnit.SECONDS);
}

}

AtomicStampedReference 使用一个整型值做为版本号,每次更新前先比较版本号,如果一致,才进行修改。通过这种策略,可以根本上解决 ABA 问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class AtomicStampedReferenceDemo {

private final static String INIT_REF = "pool-1-thread-3";

private final static AtomicStampedReference<String> asr = new AtomicStampedReference<>(INIT_REF, 0);

public static void main(String[] args) throws InterruptedException {

System.out.println("初始对象为:" + asr.getReference());

ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
executorService.execute(new MyThread());
}

executorService.shutdown();
executorService.awaitTermination(3, TimeUnit.SECONDS);
}

static class MyThread implements Runnable {

@Override
public void run() {
try {
Thread.sleep(Math.abs((int) (Math.random() * 100)));
} catch (InterruptedException e) {
e.printStackTrace();
}

final int stamp = asr.getStamp();
if (asr.compareAndSet(INIT_REF, Thread.currentThread().getName(), stamp, stamp + 1)) {
System.out.println(Thread.currentThread().getName() + " 修改了对象!");
System.out.println("新的对象为:" + asr.getReference());
}
}

}

}

数组类型

Java 提供了以下针对数组的原子类:

  • AtomicIntegerArray - 整形数组原子类
  • AtomicLongArray - 长整型数组原子类
  • AtomicReferenceArray - 引用类型数组原子类

已经有了针对基本类型和引用类型的原子类,为什么还要提供针对数组的原子类呢?

数组类型的原子类为 数组元素 提供了 volatile 类型的访问语义,这是普通数组所不具备的特性——**volatile 类型的数组仅在数组引用上具有 volatile 语义**。

示例:AtomicIntegerArray 使用示例(AtomicLongArrayAtomicReferenceArray 使用方式也类似)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class AtomicIntegerArrayDemo {

private static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);

public static void main(final String[] arguments) throws InterruptedException {

System.out.println("Init Values: ");
for (int i = 0; i < atomicIntegerArray.length(); i++) {
atomicIntegerArray.set(i, i);
System.out.print(atomicIntegerArray.get(i) + " ");
}
System.out.println();

Thread t1 = new Thread(new Increment());
Thread t2 = new Thread(new Compare());
t1.start();
t2.start();

t1.join();
t2.join();

System.out.println("Final Values: ");
for (int i = 0; i < atomicIntegerArray.length(); i++) {
System.out.print(atomicIntegerArray.get(i) + " ");
}
System.out.println();
}

static class Increment implements Runnable {

@Override
public void run() {

for (int i = 0; i < atomicIntegerArray.length(); i++) {
int value = atomicIntegerArray.incrementAndGet(i);
System.out.println(Thread.currentThread().getName() + ", index = " + i + ", value = " + value);
}
}

}

static class Compare implements Runnable {

@Override
public void run() {
for (int i = 0; i < atomicIntegerArray.length(); i++) {
boolean swapped = atomicIntegerArray.compareAndSet(i, 2, 3);
if (swapped) {
System.out.println(Thread.currentThread().getName() + " swapped, index = " + i + ", value = 3");
}
}
}

}

}

属性更新器类型

更新器类支持基于反射机制的更新字段值的原子操作。

  • AtomicIntegerFieldUpdater - 整型字段的原子更新器。
  • AtomicLongFieldUpdater - 长整型字段的原子更新器。
  • AtomicReferenceFieldUpdater - 原子更新引用类型里的字段。

这些类的使用有一定限制:

  • 因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法 newUpdater() 创建一个更新器,并且需要设置想要更新的类和属性。
  • 字段必须是 volatile 类型的;
  • 不能作用于静态变量(static);
  • 不能作用于常量(final);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class AtomicReferenceFieldUpdaterDemo {

static User user = new User("begin");

static AtomicReferenceFieldUpdater<User, String> updater =
AtomicReferenceFieldUpdater.newUpdater(User.class, String.class, "name");

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executorService.execute(new MyThread());
}
executorService.shutdown();
}

static class MyThread implements Runnable {

@Override
public void run() {
if (updater.compareAndSet(user, "begin", "end")) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 已修改 name = " + user.getName());
} else {
System.out.println(Thread.currentThread().getName() + " 已被其他线程修改");
}
}

}

static class User {

volatile String name;

public User(String name) {
this.name = name;
}

public String getName() {
return name;
}

public User setName(String name) {
this.name = name;
return this;
}

}

}

原子化的累加器

DoubleAccumulatorDoubleAdderLongAccumulatorLongAdder,这四个类仅仅用来执行累加操作,相比原子化的基本数据类型,速度更快,但是不支持 compareAndSet() 方法。如果你仅仅需要累加操作,使用原子化的累加器性能会更好,代价就是会消耗更多的内存空间。

LongAdder 内部由一个 base 变量和一个 cell[] 数组组成。

  • 当只有一个写线程,没有竞争的情况下,LongAdder 会直接使用 base 变量作为原子操作变量,通过 CAS 操作修改变量;
  • 当有多个写线程竞争的情况下,除了占用 base 变量的一个写线程之外,其它各个线程会将修改的变量写入到自己的槽 cell[] 数组中。

我们可以发现,LongAdder 在操作后的返回值只是一个近似准确的数值,但是 LongAdder 最终返回的是一个准确的数值, 所以在一些对实时性要求比较高的场景下,LongAdder 并不能取代 AtomicIntegerAtomicLong

参考资料

深入理解 Java 并发锁

本文先阐述 Java 中各种锁的概念。

然后,介绍锁的核心实现 AQS。

然后,重点介绍 Lock 和 Condition 两个接口及其实现。并发编程有两个核心问题:同步和互斥。

互斥,即同一时刻只允许一个线程访问共享资源;

同步,即线程之间如何通信、协作。

这两大问题,管程(sychronized)都是能够解决的。J.U.C 包还提供了 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题

并发锁简介

确保线程安全最常见的做法是利用锁机制(Locksychronized)来对共享数据做互斥同步,这样在同一个时刻,只有一个线程可以执行某个方法或者某个代码块,那么操作必然是原子性的,线程安全的。

在工作、面试中,经常会听到各种五花八门的锁,听的人云里雾里。锁的概念术语很多,它们是针对不同的问题所提出的,通过简单的梳理,也不难理解。

可重入锁

可重入锁,顾名思义,指的是线程可以重复获取同一把锁。即同一个线程在外层方法获取了锁,在进入内层方法会自动获取锁。

可重入锁可以在一定程度上避免死锁

  • ReentrantLockReentrantReadWriteLock 是可重入锁。这点,从其命名也不难看出。
  • synchronized 也是一个可重入锁

【示例】synchronized 的可重入示例

1
2
3
4
5
6
7
8
synchronized void setA() throws Exception{
Thread.sleep(1000);
setB();
}

synchronized void setB() throws Exception{
Thread.sleep(1000);
}

上面的代码就是一个典型场景:如果使用的锁不是可重入锁的话,setB 可能不会被当前线程执行,从而造成死锁。

【示例】ReentrantLock 的可重入示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Task {

private int value;
private final Lock lock = new ReentrantLock();

public Task() {
this.value = 0;
}

public int get() {
// 获取锁
lock.lock();
try {
return value;
} finally {
// 保证锁能释放
lock.unlock();
}
}

public void addOne() {
// 获取锁
lock.lock();
try {
// 注意:此处已经成功获取锁,进入 get 方法后,又尝试获取锁,
// 如果锁不是可重入的,会导致死锁
value = 1 + get();
} finally {
// 保证锁能释放
lock.unlock();
}
}

}

公平锁与非公平锁

  • 公平锁 - 公平锁是指 多线程按照申请锁的顺序来获取锁
  • 非公平锁 - 非公平锁是指 多线程不按照申请锁的顺序来获取锁 。这就可能会出现优先级反转(后来者居上)或者饥饿现象(某线程总是抢不过别的线程,导致始终无法执行)。

公平锁为了保证线程申请顺序,势必要付出一定的性能代价,因此其吞吐量一般低于非公平锁。

公平锁与非公平锁 在 Java 中的典型实现:

  • synchronized 只支持非公平锁
  • ReentrantLockReentrantReadWriteLock,默认是非公平锁,但支持公平锁

独享锁与共享锁

独享锁与共享锁是一种广义上的说法,从实际用途上来看,也常被称为互斥锁与读写锁。

  • 独享锁 - 独享锁是指 锁一次只能被一个线程所持有
  • 共享锁 - 共享锁是指 锁可被多个线程所持有

独享锁与共享锁在 Java 中的典型实现:

  • synchronizedReentrantLock 只支持独享锁
  • ReentrantReadWriteLock 其写锁是独享锁,其读锁是共享锁。读锁是共享锁使得并发读是非常高效的,读写,写读 ,写写的过程是互斥的。

悲观锁与乐观锁

乐观锁与悲观锁不是指具体的什么类型的锁,而是处理并发同步的策略

  • 悲观锁 - 悲观锁对于并发采取悲观的态度,认为:不加锁的并发操作一定会出问题悲观锁适合写操作频繁的场景
  • 乐观锁 - 乐观锁对于并发采取乐观的态度,认为:不加锁的并发操作也没什么问题。对于同一个数据的并发操作,是不会发生修改的。在更新数据的时候,会采用不断尝试更新的方式更新数据。乐观锁适合读多写少的场景

悲观锁与乐观锁在 Java 中的典型实现:

  • 悲观锁在 Java 中的应用就是通过使用 synchronizedLock 显示加锁来进行互斥同步,这是一种阻塞同步。

  • 乐观锁在 Java 中的应用就是采用 CAS 机制(CAS 操作通过 Unsafe 类提供,但这个类不直接暴露为 API,所以都是间接使用,如各种原子类)。

偏向锁、轻量级锁、重量级锁

所谓轻量级锁与重量级锁,指的是锁控制粒度的粗细。显然,控制粒度越细,阻塞开销越小,并发性也就越高。

Java 1.6 以前,重量级锁一般指的是 synchronized ,而轻量级锁指的是 volatile

Java 1.6 以后,针对 synchronized 做了大量优化,引入 4 种锁状态: 无锁状态、偏向锁、轻量级锁和重量级锁。锁可以单向的从偏向锁升级到轻量级锁,再从轻量级锁升级到重量级锁 。

  • 偏向锁 - 偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。

  • 轻量级锁 - 是指当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。

  • 重量级锁 - 是指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让其他申请的线程进入阻塞,性能降低。

分段锁

分段锁其实是一种锁的设计,并不是具体的一种锁。所谓分段锁,就是把锁的对象分成多段,每段独立控制,使得锁粒度更细,减少阻塞开销,从而提高并发性。这其实很好理解,就像高速公路上的收费站,如果只有一个收费口,那所有的车只能排成一条队缴费;如果有多个收费口,就可以分流了。

Hashtable 使用 synchronized 修饰方法来保证线程安全性,那么面对线程的访问,Hashtable 就会锁住整个对象,所有的其它线程只能等待,这种阻塞方式的吞吐量显然很低。

Java 1.7 以前的 ConcurrentHashMap 就是分段锁的典型案例。ConcurrentHashMap 维护了一个 Segment 数组,一般称为分段桶。

1
final Segment<K,V>[] segments;

当有线程访问 ConcurrentHashMap 的数据时,ConcurrentHashMap 会先根据 hashCode 计算出数据在哪个桶(即哪个 Segment),然后锁住这个 Segment

显示锁和内置锁

Java 1.5 之前,协调对共享对象的访问时可以使用的机制只有 synchronizedvolatile。这两个都属于内置锁,即锁的申请和释放都是由 JVM 所控制。

Java 1.5 之后,增加了新的机制:ReentrantLockReentrantReadWriteLock ,这类锁的申请和释放都可以由程序所控制,所以常被称为显示锁。

💡 synchronized 的用法和原理可以参考:Java 并发基础机制 - synchronized

:bell: 注意:如果不需要 ReentrantLockReentrantReadWriteLock 所提供的高级同步特性,应该优先考虑使用 synchronized 。理由如下:

  • Java 1.6 以后,synchronized 做了大量的优化,其性能已经与 ReentrantLockReentrantReadWriteLock 基本上持平。
  • 从趋势来看,Java 未来更可能会优化 synchronized ,而不是 ReentrantLockReentrantReadWriteLock ,因为 synchronized 是 JVM 内置属性,它能执行一些优化。
  • ReentrantLockReentrantReadWriteLock 申请和释放锁都是由程序控制,如果使用不当,可能造成死锁,这是很危险的。

以下对比一下显示锁和内置锁的差异:

  • 主动获取锁和释放锁
    • synchronized 不能主动获取锁和释放锁。获取锁和释放锁都是 JVM 控制的。
    • ReentrantLock 可以主动获取锁和释放锁。(如果忘记释放锁,就可能产生死锁)。
  • 响应中断
    • synchronized 不能响应中断。
    • ReentrantLock 可以响应中断。
  • 超时机制
    • synchronized 没有超时机制。
    • ReentrantLock 有超时机制。ReentrantLock 可以设置超时时间,超时后自动释放锁,避免一直等待。
  • 支持公平锁
    • synchronized 只支持非公平锁。
    • ReentrantLock 支持非公平锁和公平锁。
  • 是否支持共享
    • synchronized 修饰的方法或代码块,只能被一个线程访问(独享)。如果这个线程被阻塞,其他线程也只能等待
    • ReentrantLock 可以基于 Condition 灵活的控制同步条件。
  • 是否支持读写分离
    • synchronized 不支持读写锁分离;
    • ReentrantReadWriteLock 支持读写锁,从而使阻塞读写的操作分开,有效提高并发性。

Lock 和 Condition

为何引入 Lock 和 Condition

并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。这两大问题,管程都是能够解决的。Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题

synchronized 是管程的一种实现,既然如此,何必再提供 Lock 和 Condition。

JDK 1.6 以前,synchronized 还没有做优化,性能远低于 Lock。但是,性能不是引入 Lock 的最重要因素。真正关键在于:synchronized 使用不当,可能会出现死锁。

synchronized 无法通过破坏不可抢占条件来避免死锁。原因是 synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。

与内置锁 synchronized 不同的是,**Lock 提供了一组无条件的、可轮询的、定时的以及可中断的锁操作**,所有获取锁、释放锁的操作都是显式的操作。

  • 能够响应中断。synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。
  • 支持超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
  • 非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。

Lock 接口

Lock 的接口定义如下:

1
2
3
4
5
6
7
8
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
  • lock() - 获取锁。
  • unlock() - 释放锁。
  • tryLock() - 尝试获取锁,仅在调用时锁未被另一个线程持有的情况下,才获取该锁。
  • tryLock(long time, TimeUnit unit) - 和 tryLock() 类似,区别仅在于限定时间,如果限定时间内未获取到锁,视为失败。
  • lockInterruptibly() - 锁未被另一个线程持有,且线程没有被中断的情况下,才能获取锁。
  • newCondition() - 返回一个绑定到 Lock 对象上的 Condition 实例。

Condition

Condition 实现了管程模型里面的条件变量

前文中提过 Lock 接口中 有一个 newCondition() 方法用于返回一个绑定到 Lock 对象上的 Condition 实例。Condition 是什么?有什么作用?本节将一一讲解。

在单线程中,一段代码的执行可能依赖于某个状态,如果不满足状态条件,代码就不会被执行(典型的场景,如:if ... else ...)。在并发环境中,当一个线程判断某个状态条件时,其状态可能是由于其他线程的操作而改变,这时就需要有一定的协调机制来确保在同一时刻,数据只能被一个线程锁修改,且修改的数据状态被所有线程所感知。

Java 1.5 之前,主要是利用 Object 类中的 waitnotifynotifyAll 配合 synchronized 来进行线程间通信(如果不了解其特性,可以参考:Java 线程基础 - wait/notify/notifyAll)。

waitnotifynotifyAll 需要配合 synchronized 使用,不适用于 Lock。而使用 Lock 的线程,彼此间通信应该使用 Condition 。这可以理解为,什么样的锁配什么样的钥匙。内置锁(synchronized)配合内置条件队列(waitnotifynotifyAll ),显式锁(Lock)配合显式条件队列(Condition

Condition 的特性

Condition 接口定义如下:

1
2
3
4
5
6
7
8
9
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}

其中,awaitsignalsignalAllwaitnotifynotifyAll 相对应,功能也相似。除此以外,Condition 相比内置条件队列( waitnotifynotifyAll ),提供了更为丰富的功能:

  • 每个锁(Lock)上可以存在多个 Condition,这意味着锁的状态条件可以有多个。
  • 支持公平的或非公平的队列操作。
  • 支持可中断的条件等待,相关方法:awaitUninterruptibly()
  • 支持可定时的等待,相关方法:awaitNanos(long)await(long, TimeUnit)awaitUntil(Date)

Condition 的用法

这里以 Condition 来实现一个消费者、生产者模式。

:bell: 注意:事实上,解决此类问题使用 CountDownLatchSemaphore 等工具更为便捷、安全。想了解详情,可以参考 Java 并发工具类

产品类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class Message {

private final Lock lock = new ReentrantLock();

private final Condition producedMsg = lock.newCondition();

private final Condition consumedMsg = lock.newCondition();

private String message;

private boolean state;

private boolean end;

public void consume() {
//lock
lock.lock();
try {
// no new message wait for new message
while (!state) { producedMsg.await(); }

System.out.println("consume message : " + message);
state = false;
// message consumed, notify waiting thread
consumedMsg.signal();
} catch (InterruptedException ie) {
System.out.println("Thread interrupted - viewMessage");
} finally {
lock.unlock();
}
}

public void produce(String message) {
lock.lock();
try {
// last message not consumed, wait for it be consumed
while (state) { consumedMsg.await(); }

System.out.println("produce msg: " + message);
this.message = message;
state = true;
// new message added, notify waiting thread
producedMsg.signal();
} catch (InterruptedException ie) {
System.out.println("Thread interrupted - publishMessage");
} finally {
lock.unlock();
}
}

public boolean isEnd() {
return end;
}

public void setEnd(boolean end) {
this.end = end;
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class MessageConsumer implements Runnable {

private Message message;

public MessageConsumer(Message msg) {
message = msg;
}

@Override
public void run() {
while (!message.isEnd()) { message.consume(); }
}

}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class MessageProducer implements Runnable {

private Message message;

public MessageProducer(Message msg) {
message = msg;
}

@Override
public void run() {
produce();
}

public void produce() {
List<String> msgs = new ArrayList<>();
msgs.add("Begin");
msgs.add("Msg1");
msgs.add("Msg2");

for (String msg : msgs) {
message.produce(msg);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

message.produce("End");
message.setEnd(true);
}

}

测试

1
2
3
4
5
6
7
8
9
10
public class LockConditionDemo {

public static void main(String[] args) {
Message msg = new Message();
Thread producer = new Thread(new MessageProducer(msg));
Thread consumer = new Thread(new MessageConsumer(msg));
producer.start();
consumer.start();
}
}

ReentrantLock

ReentrantLock 类是 Lock 接口的具体实现,与内置锁 synchronized 相同的是,它是一个可重入锁

ReentrantLock 的特性

ReentrantLock 的特性如下:

  • ReentrantLock 提供了与 synchronized 相同的互斥性、内存可见性和可重入性
  • ReentrantLock 支持公平锁和非公平锁(默认)两种模式。
  • ReentrantLock 实现了 Lock 接口,支持了 synchronized 所不具备的灵活性
    • synchronized 无法中断一个正在等待获取锁的线程
    • synchronized 无法在请求获取一个锁时无休止地等待

ReentrantLock 的用法

前文了解了 ReentrantLock 的特性,接下来,我们要讲述其具体用法。

ReentrantLock 的构造方法

ReentrantLock 有两个构造方法:

1
2
public ReentrantLock() {}
public ReentrantLock(boolean fair) {}
  • ReentrantLock() - 默认构造方法会初始化一个非公平锁(NonfairSync)
  • ReentrantLock(boolean) - new ReentrantLock(true) 会初始化一个公平锁(FairSync)

lock 和 unlock 方法

  • lock() - 无条件获取锁。如果当前线程无法获取锁,则当前线程进入休眠状态不可用,直至当前线程获取到锁。如果该锁没有被另一个线程持有,则获取该锁并立即返回,将锁的持有计数设置为 1。
  • unlock() - 用于释放锁

:bell: 注意:请务必牢记,获取锁操作 lock() 必须在 try catch 块中进行,并且将释放锁操作 unlock() 放在 finally 块中进行,以保证锁一定被被释放,防止死锁的发生

示例:ReentrantLock 的基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class ReentrantLockDemo {

public static void main(String[] args) {
Task task = new Task();
MyThread tA = new MyThread("Thread-A", task);
MyThread tB = new MyThread("Thread-B", task);
MyThread tC = new MyThread("Thread-C", task);
tA.start();
tB.start();
tC.start();
}

static class MyThread extends Thread {

private Task task;

public MyThread(String name, Task task) {
super(name);
this.task = task;
}

@Override
public void run() {
task.execute();
}

}

static class Task {

private ReentrantLock lock = new ReentrantLock();

public void execute() {
lock.lock();
try {
for (int i = 0; i < 3; i++) {
System.out.println(lock.toString());

// 查询当前线程 hold 住此锁的次数
System.out.println("\t holdCount: " + lock.getHoldCount());

// 查询正等待获取此锁的线程数
System.out.println("\t queuedLength: " + lock.getQueueLength());

// 是否为公平锁
System.out.println("\t isFair: " + lock.isFair());

// 是否被锁住
System.out.println("\t isLocked: " + lock.isLocked());

// 是否被当前线程持有锁
System.out.println("\t isHeldByCurrentThread: " + lock.isHeldByCurrentThread());

try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
lock.unlock();
}
}

}

}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-A]
holdCount: 1
queuedLength: 2
isFair: false
isLocked: true
isHeldByCurrentThread: true
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-C]
holdCount: 1
queuedLength: 1
isFair: false
isLocked: true
isHeldByCurrentThread: true
// ...

tryLock 方法

与无条件获取锁相比,tryLock 有更完善的容错机制。

  • tryLock() - 可轮询获取锁。如果成功,则返回 true;如果失败,则返回 false。也就是说,这个方法无论成败都会立即返回,获取不到锁(锁已被其他线程获取)时不会一直等待。
  • tryLock(long, TimeUnit) - 可定时获取锁。和 tryLock() 类似,区别仅在于这个方法在获取不到锁时会等待一定的时间,在时间期限之内如果还获取不到锁,就返回 false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回 true。

示例:ReentrantLocktryLock() 操作

修改上个示例中的 execute() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public void execute() {
if (lock.tryLock()) {
try {
for (int i = 0; i < 3; i++) {
// 略...
}
} finally {
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " 获取锁失败");
}
}

示例:ReentrantLocktryLock(long, TimeUnit) 操作

修改上个示例中的 execute() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void execute() {
try {
if (lock.tryLock(2, TimeUnit.SECONDS)) {
try {
for (int i = 0; i < 3; i++) {
// 略...
}
} finally {
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " 获取锁失败");
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 获取锁超时");
e.printStackTrace();
}
}

lockInterruptibly 方法

  • lockInterruptibly() - 可中断获取锁。可中断获取锁可以在获得锁的同时保持对中断的响应。可中断获取锁比其它获取锁的方式稍微复杂一些,需要两个 try-catch 块(如果在获取锁的操作中抛出了 InterruptedException ,那么可以使用标准的 try-finally 加锁模式)。
    • 举例来说:假设有两个线程同时通过 lock.lockInterruptibly() 获取某个锁时,若线程 A 获取到了锁,则线程 B 只能等待。若此时对线程 B 调用 threadB.interrupt() 方法能够中断线程 B 的等待过程。由于 lockInterruptibly() 的声明中抛出了异常,所以 lock.lockInterruptibly() 必须放在 try 块中或者在调用 lockInterruptibly() 的方法外声明抛出 InterruptedException

:bell: 注意:当一个线程获取了锁之后,是不会被 interrupt() 方法中断的。单独调用 interrupt() 方法不能中断正在运行状态中的线程,只能中断阻塞状态中的线程。因此当通过 lockInterruptibly() 方法获取某个锁时,如果未获取到锁,只有在等待的状态下,才可以响应中断。

示例:ReentrantLocklockInterruptibly() 操作

修改上个示例中的 execute() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void execute() {
try {
lock.lockInterruptibly();

for (int i = 0; i < 3; i++) {
// 略...
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断");
e.printStackTrace();
} finally {
lock.unlock();
}
}

newCondition 方法

newCondition() - 返回一个绑定到 Lock 对象上的 Condition 实例。Condition 的特性和具体方法请阅读下文 Condition

ReentrantLock 的原理

ReentrantLock 的可见性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class X {
private final Lock rtl =
new ReentrantLock();
int value;
public void addOne() {
// 获取锁
rtl.lock();
try {
value+=1;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}

ReentrantLock,内部持有一个 volatile 的成员变量 state,获取锁的时候,会读写 state 的值;解锁的时候,也会读写 state 的值(简化后的代码如下面所示)。也就是说,在执行 value+=1 之前,程序先读写了一次 volatile 变量 state,在执行 value+=1 之后,又读写了一次 volatile 变量 state。根据相关的 Happens-Before 规则:

  1. 顺序性规则:对于线程 T1,value+=1 Happens-Before 释放锁的操作 unlock();
  2. volatile 变量规则:由于 state = 1 会先读取 state,所以线程 T1 的 unlock() 操作 Happens-Before 线程 T2 的 lock() 操作;
  3. 传递性规则:线程 T1 的 value+=1 Happens-Before 线程 T2 的 lock() 操作。

ReentrantLock 的数据结构

阅读 ReentrantLock 的源码,可以发现它有一个核心字段:

1
private final Sync sync;
  • sync - 内部抽象类 ReentrantLock.Sync 对象,Sync 继承自 AQS。它有两个子类:
  • ReentrantLock.FairSync - 公平锁。
  • ReentrantLock.NonfairSync - 非公平锁。

查看源码可以发现,ReentrantLock 实现 Lock 接口其实是调用 ReentrantLock.FairSyncReentrantLock.NonfairSync 中各自的实现,这里不一一列举。

ReentrantLock 的获取锁和释放锁

ReentrantLock 获取锁和释放锁的接口,从表象看,是调用 ReentrantLock.FairSyncReentrantLock.NonfairSync 中各自的实现;从本质上看,是基于 AQS 的实现。

仔细阅读源码很容易发现:

  • void lock() 调用 Sync 的 lock() 方法。

  • void lockInterruptibly() 直接调用 AQS 的 获取可中断的独占锁 方法 lockInterruptibly()

  • boolean tryLock() 调用 Sync 的 nonfairTryAcquire()

  • boolean tryLock(long time, TimeUnit unit) 直接调用 AQS 的 获取超时等待式的独占锁 方法 tryAcquireNanos(int arg, long nanosTimeout)

  • void unlock() 直接调用 AQS 的 释放独占锁 方法 release(int arg)

直接调用 AQS 接口的方法就不再赘述了,其原理在 [AQS 的原理](#AQS 的原理) 中已经用很大篇幅进行过讲解。

nonfairTryAcquire 方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 公平锁和非公平锁都会用这个方法区尝试获取锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
// 如果同步状态为0,将其设为 acquires,并设置当前线程为排它线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

处理流程很简单:

  • 如果同步状态为 0,设置同步状态设为 acquires,并设置当前线程为排它线程,然后返回 true,获取锁成功。
  • 如果同步状态不为 0 且当前线程为排它线程,设置同步状态为当前状态值+acquires 值,然后返回 true,获取锁成功。
  • 否则,返回 false,获取锁失败。

公平锁和非公平锁

ReentrantLock 这个类有两个构造函数,一个是无参构造函数,一个是传入 fair 参数的构造函数。fair 参数代表的是锁的公平策略,如果传入 true 就表示需要构造一个公平锁,反之则表示要构造一个非公平锁。

锁都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒。

lock 方法在公平锁和非公平锁中的实现:

二者的区别仅在于申请非公平锁时,如果同步状态为 0,尝试将其设为 1,如果成功,直接将当前线程置为排它线程;否则和公平锁一样,调用 AQS 获取独占锁方法 acquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 非公平锁实现
final void lock() {
if (compareAndSetState(0, 1))
// 如果同步状态为0,将其设为1,并设置当前线程为排它线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 调用 AQS 获取独占锁方法 acquire
acquire(1);
}

// 公平锁实现
final void lock() {
// 调用 AQS 获取独占锁方法 acquire
acquire(1);
}

ReentrantReadWriteLock

ReadWriteLock 适用于读多写少的场景

ReentrantReadWriteLock 类是 ReadWriteLock 接口的具体实现,它是一个可重入的读写锁。ReentrantReadWriteLock 维护了一对读写锁,将读写锁分开,有利于提高并发效率。

读写锁,并不是 Java 语言特有的,而是一个广为使用的通用技术,所有的读写锁都遵守以下三条基本原则:

  • 允许多个线程同时读共享变量;
  • 只允许一个线程写共享变量;
  • 如果一个写线程正在执行写操作,此时禁止读线程读共享变量。

读写锁与互斥锁的一个重要区别就是读写锁允许多个线程同时读共享变量,而互斥锁是不允许的,这是读写锁在读多写少场景下性能优于互斥锁的关键。但读写锁的写操作是互斥的,当一个线程在写共享变量的时候,是不允许其他线程执行写操作和读操作。

ReentrantReadWriteLock 的特性

ReentrantReadWriteLock 的特性如下:

  • ReentrantReadWriteLock 适用于读多写少的场景。如果是写多读少的场景,由于 ReentrantReadWriteLock 其内部实现比 ReentrantLock 复杂,性能可能反而要差一些。如果存在这样的问题,需要具体问题具体分析。由于 ReentrantReadWriteLock 的读写锁(ReadLockWriteLock)都实现了 Lock 接口,所以要替换为 ReentrantLock 也较为容易。
  • ReentrantReadWriteLock 实现了 ReadWriteLock 接口,支持了 ReentrantLock 所不具备的读写锁分离。ReentrantReadWriteLock 维护了一对读写锁(ReadLockWriteLock)。将读写锁分开,有利于提高并发效率。ReentrantReadWriteLock 的加锁策略是:允许多个读操作并发执行,但每次只允许一个写操作
  • ReentrantReadWriteLock 为读写锁都提供了可重入的加锁语义。
  • ReentrantReadWriteLock 支持公平锁和非公平锁(默认)两种模式。

ReadWriteLock 接口定义如下:

1
2
3
4
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
  • readLock - 返回用于读操作的锁(ReadLock)。
  • writeLock - 返回用于写操作的锁(WriteLock)。

在读写锁和写入锁之间的交互可以采用多种实现方式,ReadWriteLock 的一些可选实现包括:

  • 释放优先 - 当一个写入操作释放写锁,并且队列中同时存在读线程和写线程,那么应该优先选择读线程、写线程,还是最先发出请求的线程?
  • 读线程插队 - 如果锁是由读线程持有,但有写线程正在等待,那么新到达的读线程能否立即获得访问权,还是应该在写线程后面等待?如果允许读线程插队到写线程之前,那么将提高并发性,但可能造成线程饥饿问题。
  • 重入性 - 读锁和写锁是否是可重入的?
  • 降级 - 如果一个线程持有写入锁,那么它能否在不释放该锁的情况下获得读锁?这可能会使得写锁被降级为读锁,同时不允许其他写线程修改被保护的资源。
  • 升级 - 读锁能否优先于其他正在等待的读线程和写线程而升级为一个写锁?在大多数的读写锁实现中并不支持升级,因为如果没有显式的升级操作,那么很容易造成死锁。

ReentrantReadWriteLock 的用法

前文了解了 ReentrantReadWriteLock 的特性,接下来,我们要讲述其具体用法。

ReentrantReadWriteLock 的构造方法

ReentrantReadWriteLockReentrantLock 一样,也有两个构造方法,且用法相似。

1
2
public ReentrantReadWriteLock() {}
public ReentrantReadWriteLock(boolean fair) {}
  • ReentrantReadWriteLock() - 默认构造方法会初始化一个非公平锁(NonfairSync)。在非公平的锁中,线程获得锁的顺序是不确定的。写线程降级为读线程是可以的,但读线程升级为写线程是不可以的(这样会导致死锁)。
  • ReentrantReadWriteLock(boolean) - new ReentrantLock(true) 会初始化一个公平锁(FairSync)。对于公平锁,等待时间最长的线程将优先获得锁。如果这个锁是读线程持有,则另一个线程请求写锁,那么其他读线程都不能获得读锁,直到写线程释放写锁。

ReentrantReadWriteLock 的使用实例

ReentrantReadWriteLock 的特性 中已经介绍过,ReentrantReadWriteLock 的读写锁(ReadLockWriteLock)都实现了 Lock 接口,所以其各自独立的使用方式与 ReentrantLock 一样,这里不再赘述。

ReentrantReadWriteLockReentrantLock 用法上的差异,主要在于读写锁的配合使用。本文以一个典型使用场景来进行讲解。

【示例】基于 ReadWriteLock 实现一个简单的泛型无界缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* 简单的无界缓存实现
* <p>
* 使用 WeakHashMap 存储键值对。WeakHashMap 中存储的对象是弱引用,JVM GC 时会自动清除没有被引用的弱引用对象。
*/
static class UnboundedCache<K, V> {

private final Map<K, V> cacheMap = new WeakHashMap<>();

private final ReadWriteLock cacheLock = new ReentrantReadWriteLock();

public V get(K key) {
cacheLock.readLock().lock();
V value;
try {
value = cacheMap.get(key);
String log = String.format("%s 读数据 %s:%s", Thread.currentThread().getName(), key, value);
System.out.println(log);
} finally {
cacheLock.readLock().unlock();
}
return value;
}

public V put(K key, V value) {
cacheLock.writeLock().lock();
try {
cacheMap.put(key, value);
String log = String.format("%s 写入数据 %s:%s", Thread.currentThread().getName(), key, value);
System.out.println(log);
} finally {
cacheLock.writeLock().unlock();
}
return value;
}

public V remove(K key) {
cacheLock.writeLock().lock();
try {
return cacheMap.remove(key);
} finally {
cacheLock.writeLock().unlock();
}
}

public void clear() {
cacheLock.writeLock().lock();
try {
this.cacheMap.clear();
} finally {
cacheLock.writeLock().unlock();
}
}

}

说明:

  • 使用 WeakHashMap 而不是 HashMap 来存储键值对。WeakHashMap 中存储的对象是弱引用,JVM GC 时会自动清除没有被引用的弱引用对象。
  • Map 写数据前加写锁,写完后,释放写锁。
  • Map 读数据前加读锁,读完后,释放读锁。

测试其线程安全性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @since 2020-01-01
*/
public class ReentrantReadWriteLockDemo {

static UnboundedCache<Integer, Integer> cache = new UnboundedCache<>();

public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
executorService.execute(new MyThread());
cache.get(0);
}
executorService.shutdown();
}

/** 线程任务每次向缓存中写入 3 个随机值,key 固定 */
static class MyThread implements Runnable {

@Override
public void run() {
Random random = new Random();
for (int i = 0; i < 3; i++) {
cache.put(i, random.nextInt(100));
}
}

}

}

说明:示例中,通过线程池启动 20 个并发任务。任务每次向缓存中写入 3 个随机值,key 固定;然后主线程每次固定读取缓存中第一个 key 的值。

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
main 读数据 0:null
pool-1-thread-1 写入数据 0:16
pool-1-thread-1 写入数据 1:58
pool-1-thread-1 写入数据 2:50
main 读数据 0:16
pool-1-thread-1 写入数据 0:85
pool-1-thread-1 写入数据 1:76
pool-1-thread-1 写入数据 2:46
pool-1-thread-2 写入数据 0:21
pool-1-thread-2 写入数据 1:41
pool-1-thread-2 写入数据 2:63
main 读数据 0:21
main 读数据 0:21
// ...

ReentrantReadWriteLock 的原理

前面了解了 ReentrantLock 的原理,理解 ReentrantReadWriteLock 就容易多了。

ReentrantReadWriteLock 的数据结构

阅读 ReentrantReadWriteLock 的源码,可以发现它有三个核心字段:

1
2
3
4
5
6
7
8
9
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
  • sync - 内部类 ReentrantReadWriteLock.Sync 对象。与 ReentrantLock 类似,它有两个子类:ReentrantReadWriteLock.FairSyncReentrantReadWriteLock.NonfairSync ,分别表示公平锁和非公平锁的实现。
  • readerLock - 内部类 ReentrantReadWriteLock.ReadLock 对象,这是一把读锁。
  • writerLock - 内部类 ReentrantReadWriteLock.WriteLock 对象,这是一把写锁。

ReentrantReadWriteLock 的获取锁和释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static class ReadLock implements Lock, java.io.Serializable {

// 调用 AQS 获取共享锁方法
public void lock() {
sync.acquireShared(1);
}

// 调用 AQS 释放共享锁方法
public void unlock() {
sync.releaseShared(1);
}
}

public static class WriteLock implements Lock, java.io.Serializable {

// 调用 AQS 获取独占锁方法
public void lock() {
sync.acquire(1);
}

// 调用 AQS 释放独占锁方法
public void unlock() {
sync.release(1);
}
}

StampedLock

ReadWriteLock 支持两种模式:一种是读锁,一种是写锁。而 StampedLock 支持三种模式,分别是:写锁悲观读锁乐观读。其中,写锁、悲观读锁的语义和 ReadWriteLock 的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的。不同的是:StampedLock 里的写锁和悲观读锁加锁成功之后,都会返回一个 stamp;然后解锁的时候,需要传入这个 stamp。

注意这里,用的是“乐观读”这个词,而不是“乐观读锁”,是要提醒你,乐观读这个操作是无锁的,所以相比较 ReadWriteLock 的读锁,乐观读的性能更好一些。

StampedLock 的性能之所以比 ReadWriteLock 还要好,其关键是 StampedLock 支持乐观读的方式。

  • ReadWriteLock 支持多个线程同时读,但是当多个线程同时读的时候,所有的写操作会被阻塞;
  • 而 StampedLock 提供的乐观读,是允许一个线程获取写锁的,也就是说不是所有的写操作都被阻塞。

对于读多写少的场景 StampedLock 性能很好,简单的应用场景基本上可以替代 ReadWriteLock,但是StampedLock 的功能仅仅是 ReadWriteLock 的子集,在使用的时候,还是有几个地方需要注意一下。

  • StampedLock 不支持重入
  • StampedLock 的悲观读锁、写锁都不支持条件变量。
  • 如果线程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上时,此时调用该阻塞线程的 interrupt() 方法,会导致 CPU 飙升。**使用 StampedLock 一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly()**。

【示例】StampedLock 阻塞时,调用 interrupt() 导致 CPU 飙升

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
final StampedLock lock
= new StampedLock();
Thread T1 = new Thread(()->{
// 获取写锁
lock.writeLock();
// 永远阻塞在此处,不释放写锁
LockSupport.park();
});
T1.start();
// 保证 T1 获取写锁
Thread.sleep(100);
Thread T2 = new Thread(()->
// 阻塞在悲观读锁
lock.readLock()
);
T2.start();
// 保证 T2 阻塞在读锁
Thread.sleep(100);
// 中断线程 T2
// 会导致线程 T2 所在 CPU 飙升
T2.interrupt();
T2.join();

【示例】StampedLock 读模板:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
final StampedLock sl =
new StampedLock();

// 乐观读
long stamp =
sl.tryOptimisticRead();
// 读入方法局部变量
......
// 校验 stamp
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
// 读入方法局部变量
.....
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}
// 使用方法局部变量执行业务操作
......

【示例】StampedLock 写模板:

1
2
3
4
5
6
7
long stamp = sl.writeLock();
try {
// 写共享变量
......
} finally {
sl.unlockWrite(stamp);
}

AQS

AbstractQueuedSynchronizer(简称 AQS)是队列同步器,顾名思义,其主要作用是处理同步。它是并发锁和很多同步工具类的实现基石(如 ReentrantLockReentrantReadWriteLockCountDownLatchSemaphoreFutureTask 等)。

AQS 的要点

AQS 提供了对独享锁与共享锁的支持

java.util.concurrent.locks 包中的相关锁(常用的有 ReentrantLockReadWriteLock)都是基于 AQS 来实现。这些锁都没有直接继承 AQS,而是定义了一个 Sync 类去继承 AQS。为什么要这样呢?因为锁面向的是使用用户,而同步器面向的则是线程控制,那么在锁的实现中聚合同步器而不是直接继承 AQS 就可以很好的隔离二者所关注的事情。

AQS 的应用

AQS 提供了对独享锁与共享锁的支持

独享锁 API

获取、释放独享锁的主要 API 如下:

1
2
3
4
public final void acquire(int arg)
public final void acquireInterruptibly(int arg)
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
public final boolean release(int arg)
  • acquire - 获取独占锁。
  • acquireInterruptibly - 获取可中断的独占锁。
  • tryAcquireNanos - 尝试在指定时间内获取可中断的独占锁。在以下三种情况下回返回:
    • 在超时时间内,当前线程成功获取了锁;
    • 当前线程在超时时间内被中断;
    • 超时时间结束,仍未获得锁返回 false。
  • release - 释放独占锁。

共享锁 API

获取、释放共享锁的主要 API 如下:

1
2
3
4
public final void acquireShared(int arg)
public final void acquireSharedInterruptibly(int arg)
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
public final boolean releaseShared(int arg)
  • acquireShared - 获取共享锁。
  • acquireSharedInterruptibly - 获取可中断的共享锁。
  • tryAcquireSharedNanos - 尝试在指定时间内获取可中断的共享锁。
  • release - 释放共享锁。

AQS 的原理

ASQ 原理要点:

  • AQS 使用一个整型的 volatile 变量来 维护同步状态。状态的意义由子类赋予。
  • AQS 维护了一个 FIFO 的双链表,用来存储获取锁失败的线程。

AQS 围绕同步状态提供两种基本操作“获取”和“释放”,并提供一系列判断和处理方法,简单说几点:

  • state 是独占的,还是共享的;
  • state 被获取后,其他线程需要等待;
  • state 被释放后,唤醒等待线程;
  • 线程等不及时,如何退出等待。

至于线程是否可以获得 state,如何释放 state,就不是 AQS 关心的了,要由子类具体实现。

AQS 的数据结构

阅读 AQS 的源码,可以发现:AQS 继承自 AbstractOwnableSynchronize

1
2
3
4
5
6
7
8
9
10
11
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

/** 等待队列的队头,懒加载。只能通过 setHead 方法修改。 */
private transient volatile Node head;
/** 等待队列的队尾,懒加载。只能通过 enq 方法添加新的等待节点。*/
private transient volatile Node tail;
/** 同步状态 */
private volatile int state;
}
  • state - AQS 使用一个整型的 volatile 变量来 维护同步状态
    • 这个整数状态的意义由子类来赋予,如ReentrantLock 中该状态值表示所有者线程已经重复获取该锁的次数,Semaphore 中该状态值表示剩余的许可数量。
  • headtail - AQS 维护了一个 Node 类型(AQS 的内部类)的双链表来完成同步状态的管理。这个双链表是一个双向的 FIFO 队列,通过 headtail 指针进行访问。当 有线程获取锁失败后,就被添加到队列末尾

img

再来看一下 Node 的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static final class Node {
/** 该等待同步的节点处于共享模式 */
static final Node SHARED = new Node();
/** 该等待同步的节点处于独占模式 */
static final Node EXCLUSIVE = null;

/** 线程等待状态,状态值有: 0、1、-1、-2、-3 */
volatile int waitStatus;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

/** 前驱节点 */
volatile Node prev;
/** 后继节点 */
volatile Node next;
/** 等待锁的线程 */
volatile Thread thread;

/** 和节点是否共享有关 */
Node nextWaiter;
}

很显然,Node 是一个双链表结构。

  • waitStatus - Node 使用一个整型的 volatile 变量来 维护 AQS 同步队列中线程节点的状态。waitStatus 有五个状态值:
    • CANCELLED(1) - 此状态表示:该节点的线程可能由于超时或被中断而 处于被取消(作废)状态,一旦处于这个状态,表示这个节点应该从等待队列中移除。
    • SIGNAL(-1) - 此状态表示:后继节点会被挂起,因此在当前节点释放锁或被取消之后,必须唤醒(unparking)其后继结点。
    • CONDITION(-2) - 此状态表示:该节点的线程 处于等待条件状态,不会被当作是同步队列上的节点,直到被唤醒(signal),设置其值为 0,再重新进入阻塞状态。
    • PROPAGATE(-3) - 此状态表示:下一个 acquireShared 应无条件传播。
    • 0 - 非以上状态。

独占锁的获取和释放

获取独占锁

AQS 中使用 acquire(int arg) 方法获取独占锁,其大致流程如下:

  1. 先尝试获取同步状态,如果获取同步状态成功,则结束方法,直接返回。
  2. 如果获取同步状态不成功,AQS 会不断尝试利用 CAS 操作将当前线程插入等待同步队列的队尾,直到成功为止。
  3. 接着,不断尝试为等待队列中的线程节点获取独占锁。

img

img

详细流程可以用下图来表示,请结合源码来理解(一图胜千言):

img

释放独占锁

AQS 中使用 release(int arg) 方法释放独占锁,其大致流程如下:

  1. 先尝试获取解锁线程的同步状态,如果获取同步状态不成功,则结束方法,直接返回。
  2. 如果获取同步状态成功,AQS 会尝试唤醒当前线程节点的后继节点。
获取可中断的独占锁

AQS 中使用 acquireInterruptibly(int arg) 方法获取可中断的独占锁。

acquireInterruptibly(int arg) 实现方式相较于获取独占锁方法( acquire)非常相似,区别仅在于它会通过 Thread.interrupted 检测当前线程是否被中断,如果是,则立即抛出中断异常(InterruptedException)。

获取超时等待式的独占锁

AQS 中使用 tryAcquireNanos(int arg) 方法获取超时等待的独占锁。

doAcquireNanos 的实现方式 相较于获取独占锁方法( acquire)非常相似,区别在于它会根据超时时间和当前时间计算出截止时间。在获取锁的流程中,会不断判断是否超时,如果超时,直接返回 false;如果没超时,则用 LockSupport.parkNanos 来阻塞当前线程。

共享锁的获取和释放

获取共享锁

AQS 中使用 acquireShared(int arg) 方法获取共享锁。

acquireShared 方法和 acquire 方法的逻辑很相似,区别仅在于自旋的条件以及节点出队的操作有所不同。

成功获得共享锁的条件如下:

  • tryAcquireShared(arg) 返回值大于等于 0 (这意味着共享锁的 permit 还没有用完)。
  • 当前节点的前驱节点是头结点。
释放共享锁

AQS 中使用 releaseShared(int arg) 方法释放共享锁。

releaseShared 首先会尝试释放同步状态,如果成功,则解锁一个或多个后继线程节点。释放共享锁和释放独享锁流程大体相似,区别在于:

对于独享模式,如果需要 SIGNAL,释放仅相当于调用头节点的 unparkSuccessor

获取可中断的共享锁

AQS 中使用 acquireSharedInterruptibly(int arg) 方法获取可中断的共享锁。

acquireSharedInterruptibly 方法与 acquireInterruptibly 几乎一致,不再赘述。

获取超时等待式的共享锁

AQS 中使用 tryAcquireSharedNanos(int arg) 方法获取超时等待式的共享锁。

tryAcquireSharedNanos 方法与 tryAcquireNanos 几乎一致,不再赘述。

死锁

什么是死锁

死锁是一种特定的程序状态,在实体之间,由于循环依赖导致彼此一直处于等待之中,没有任何个体可以继续前进。死锁不仅仅是在线程之间会发生,存在资源独占的进程之间同样也
可能出现死锁。通常来说,我们大多是聚焦在多线程场景中的死锁,指两个或多个线程之间,由于互相持有对方需要的锁,而永久处于阻塞的状态。

如何定位死锁

定位死锁最常见的方式就是利用 jstack 等工具获取线程栈,然后定位互相之间的依赖关系,进而找到死锁。如果是比较明显的死锁,往往 jstack 等就能直接定位,类似 JConsole 甚至可以在图形界面进行有限的死锁检测。

如果我们是开发自己的管理工具,需要用更加程序化的方式扫描服务进程、定位死锁,可以考虑使用 Java 提供的标准管理 API,ThreadMXBean,其直接就提供了 findDeadlockedThreads() 方法用于定位。

如何避免死锁

基本上死锁的发生是因为:

  • 互斥,类似 Java 中 Monitor 都是独占的。
  • 长期保持互斥,在使用结束之前,不会释放,也不能被其他线程抢占。
  • 循环依赖,多个个体之间出现了锁的循环依赖,彼此依赖上一环释放锁。

由此,我们可以分析出避免死锁的思路和方法。

(1)避免一个线程同时获取多个锁。

避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占用一个资源。

尝试使用定时锁 lock.tryLock(timeout),避免锁一直不能释放。

对于数据库锁,加锁和解锁必须在一个数据库连接中里,否则会出现解锁失败的情况。

参考资料

Java 并发核心机制

Java 对于并发的支持主要汇聚在 java.util.concurrent,即 J.U.C。而 J.U.C 的核心是 AQS

J.U.C 简介

Java 的 java.util.concurrent 包(简称 J.U.C)中提供了大量并发工具类,是 Java 并发能力的主要体现(注意,不是全部,有部分并发能力的支持在其他包中)。从功能上,大致可以分为:

  • 原子类 - 如:AtomicIntegerAtomicIntegerArrayAtomicReferenceAtomicStampedReference 等。
  • 锁 - 如:ReentrantLockReentrantReadWriteLock 等。
  • 并发容器 - 如:ConcurrentHashMapCopyOnWriteArrayListCopyOnWriteArraySet 等。
  • 阻塞队列 - 如:ArrayBlockingQueueLinkedBlockingQueue 等。
  • 非阻塞队列 - 如: ConcurrentLinkedQueueLinkedTransferQueue 等。
  • Executor 框架(线程池)- 如:ThreadPoolExecutorExecutors 等。

我个人理解,Java 并发框架可以分为以下层次。

img

由 Java 并发框架图不难看出,J.U.C 包中的工具类是基于 synchronizedvolatileCASThreadLocal 这样的并发核心机制打造的。所以,要想深入理解 J.U.C 工具类的特性、为什么具有这样那样的特性,就必须先理解这些核心机制。

synchronized

synchronized 是 Java 中的关键字,是 利用锁的机制来实现互斥同步的

synchronized 可以保证在同一个时刻,只有一个线程可以执行某个方法或者某个代码块

如果不需要 LockReadWriteLock 所提供的高级同步特性,应该优先考虑使用 synchronized ,理由如下:

  • Java 1.6 以后,synchronized 做了大量的优化,其性能已经与 LockReadWriteLock 基本上持平。从趋势来看,Java 未来仍将继续优化 synchronized ,而不是 ReentrantLock
  • ReentrantLock 是 Oracle JDK 的 API,在其他版本的 JDK 中不一定支持;而 synchronized 是 JVM 的内置特性,所有 JDK 版本都提供支持。

synchronized 的应用

synchronized 有 3 种应用方式:

  • 同步实例方法 - 对于普通同步方法,锁是当前实例对象
  • 同步静态方法 - 对于静态同步方法,锁是当前类的 Class 对象
  • 同步代码块 - 对于同步方法块,锁是 synchonized 括号里配置的对象

说明:

类似 VectorHashtable 这类同步类,就是使用 synchonized 修饰其重要方法,来保证其线程安全。

事实上,这类同步容器也非绝对的线程安全,当执行迭代器遍历,根据条件删除元素这种场景下,就可能出现线程不安全的情况。此外,Java 1.6 针对 synchonized 进行优化前,由于阻塞,其性能不高。

综上,这类同步容器,在现代 Java 程序中,已经渐渐不用了。

同步实例方法

❌ 错误示例 - 未同步的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class NoSynchronizedDemo implements Runnable {

public static final int MAX = 100000;

private static int count = 0;

public static void main(String[] args) throws InterruptedException {
NoSynchronizedDemo instance = new NoSynchronizedDemo();
Thread t1 = new Thread(instance);
Thread t2 = new Thread(instance);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(count);
}

@Override
public void run() {
for (int i = 0; i < MAX; i++) {
increase();
}
}

public void increase() {
count++;
}

}
// 输出结果: 小于 200000 的随机数字

Java 实例方法同步是同步在拥有该方法的对象上。这样,每个实例其方法同步都同步在不同的对象上,即该方法所属的实例。只有一个线程能够在实例方法同步块中运行。如果有多个实例存在,那么一个线程一次可以在一个实例同步块中执行操作。一个实例一个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class SynchronizedDemo implements Runnable {

private static final int MAX = 100000;

private static int count = 0;

public static void main(String[] args) throws InterruptedException {
SynchronizedDemo instance = new SynchronizedDemo();
Thread t1 = new Thread(instance);
Thread t2 = new Thread(instance);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(count);
}

@Override
public void run() {
for (int i = 0; i < MAX; i++) {
increase();
}
}

/**
* synchronized 修饰普通方法
*/
public synchronized void increase() {
count++;
}

}

【示例】错误示例

1
2
3
4
5
6
7
8
9
10
11
class Account {
private int balance;
// 转账
synchronized void transfer(
Account target, int amt){
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}

在这段代码中,临界区内有两个资源,分别是转出账户的余额 this.balance 和转入账户的余额 target.balance,并且用的是一把锁 this,符合我们前面提到的,多个资源可以用一把锁来保护,这看上去完全正确呀。真的是这样吗?可惜,这个方案仅仅是看似正确,为什么呢?

问题就出在 this 这把锁上,this 这把锁可以保护自己的余额 this.balance,却保护不了别人的余额 target.balance,就像你不能用自家的锁来保护别人家的资产,也不能用自己的票来保护别人的座位一样。

img

应该保证使用的锁能覆盖所有受保护资源

【示例】正确姿势

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Account {
private Object lock;
private int balance;
private Account();
// 创建 Account 时传入同一个 lock 对象
public Account(Object lock) {
this.lock = lock;
}
// 转账
void transfer(Account target, int amt){
// 此处检查所有对象共享的锁
synchronized(lock) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}

这个办法确实能解决问题,但是有点小瑕疵,它要求在创建 Account 对象的时候必须传入同一个对象,如果创建 Account 对象时,传入的 lock 不是同一个对象,那可就惨了,会出现锁自家门来保护他家资产的荒唐事。在真实的项目场景中,创建 Account 对象的代码很可能分散在多个工程中,传入共享的 lock 真的很难。

上面的方案缺乏实践的可行性,我们需要更好的方案。还真有,就是用 Account.class 作为共享的锁。Account.class 是所有 Account 对象共享的,而且这个对象是 Java 虚拟机在加载 Account 类的时候创建的,所以我们不用担心它的唯一性。使用 Account.class 作为共享的锁,我们就无需在创建 Account 对象时传入了,代码更简单。

【示例】正确姿势

1
2
3
4
5
6
7
8
9
10
11
12
class Account {
private int balance;
// 转账
void transfer(Account target, int amt){
synchronized(Account.class) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}

同步静态方法

静态方法的同步是指同步在该方法所在的类对象上。因为在 JVM 中一个类只能对应一个类对象,所以同时只允许一个线程执行同一个类中的静态同步方法。

对于不同类中的静态同步方法,一个线程可以执行每个类中的静态同步方法而无需等待。不管类中的哪个静态同步方法被调用,一个类只能由一个线程同时执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class SynchronizedDemo2 implements Runnable {

private static final int MAX = 100000;

private static int count = 0;

public static void main(String[] args) throws InterruptedException {
SynchronizedDemo2 instance = new SynchronizedDemo2();
Thread t1 = new Thread(instance);
Thread t2 = new Thread(instance);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(count);
}

@Override
public void run() {
for (int i = 0; i < MAX; i++) {
increase();
}
}

/**
* synchronized 修饰静态方法
*/
public synchronized static void increase() {
count++;
}

}

同步代码块

有时你不需要同步整个方法,而是同步方法中的一部分。Java 可以对方法的一部分进行同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@ThreadSafe
public class SynchronizedDemo05 implements Runnable {

private static final int MAX = 100000;

private static int count = 0;

public static void main(String[] args) throws InterruptedException {
SynchronizedDemo05 instance = new SynchronizedDemo05();
Thread t1 = new Thread(instance);
Thread t2 = new Thread(instance);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(count);
}

@Override
public void run() {
for (int i = 0; i < MAX; i++) {
increase();
}
}

/**
* synchronized 修饰代码块
*/
public void increase() {
synchronized (this) {
count++;
}
}

}

注意 Java 同步块构造器用括号将对象括起来。在上例中,使用了 this,即为调用 increase 方法的实例本身。用括号括起来的对象叫做监视器对象。一次只有一个线程能够在同步于同一个监视器对象的 Java 方法内执行。

如果是静态方法,就不能用 this 对象作为监视器对象了,而是使用 Class 对象,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class SynchronizedDemo3 implements Runnable {

private static final int MAX = 100000;

private static int count = 0;

public static void main(String[] args) throws InterruptedException {
SynchronizedDemo3 instance = new SynchronizedDemo3();
Thread t1 = new Thread(instance);
Thread t2 = new Thread(instance);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(count);
}

@Override
public void run() {
for (int i = 0; i < MAX; i++) {
increase();
}
}

/**
* synchronized 修饰代码块
*/
public static void increase() {
synchronized (SynchronizedDemo3.class) {
count++;
}
}

}

synchronized 的原理

synchronized 代码块是由一对 monitorentermonitorexit 指令实现的,Monitor 对象是同步的基本实现单元。在 Java 6 之前,Monitor 的实现完全是依靠操作系统内部的互斥锁,因为需要进行用户态到内核态的切换,所以同步操作是一个无差别的重量级操作。

如果 synchronized 明确制定了对象参数,那就是这个对象的引用;如果没有明确指定,那就根据 synchronized 修饰的是实例方法还是静态方法,去对对应的对象实例或 Class 对象来作为锁对象。

synchronized 同步块对同一线程来说是可重入的,不会出现锁死问题。

synchronized 同步块是互斥的,即已进入的线程执行完成前,会阻塞其他试图进入的线程。

【示例】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void foo(Object lock) {
synchronized (lock) {
lock.hashCode();
}
}
// 上面的 Java 代码将编译为下面的字节码
public void foo(java.lang.Object);
Code:
0: aload_1
1: dup
2: astore_2
3: monitorenter
4: aload_1
5: invokevirtual java/lang/Object.hashCode:()I
8: pop
9: aload_2
10: monitorexit
11: goto 19
14: astore_3
15: aload_2
16: monitorexit
17: aload_3
18: athrow
19: return
Exception table:
from to target type
4 11 14 any
14 17 14 any

同步代码块

synchronized 在修饰同步代码块时,是由 monitorentermonitorexit 指令来实现同步的。进入 monitorenter 指令后,线程将持有 Monitor 对象,退出 monitorenter 指令后,线程将释放该 Monitor 对象。

同步方法

synchronized 修饰同步方法时,会设置一个 ACC_SYNCHRONIZED 标志。当方法调用时,调用指令将会检查该方法是否被设置 ACC_SYNCHRONIZED 访问标志。如果设置了该标志,执行线程将先持有 Monitor 对象,然后再执行方法。在该方法运行期间,其它线程将无法获取到该 Mointor 对象,当方法执行完成后,再释放该 Monitor 对象。

Monitor

每个对象实例都会有一个 MonitorMonitor 可以和对象一起创建、销毁。Monitor 是由 ObjectMonitor 实现,而 ObjectMonitor 是由 C++ 的 ObjectMonitor.hpp 文件实现。

当多个线程同时访问一段同步代码时,多个线程会先被存放在 EntryList 集合中,处于 block 状态的线程,都会被加入到该列表。接下来当线程获取到对象的 Monitor 时,Monitor 是依靠底层操作系统的 Mutex Lock 来实现互斥的,线程申请 Mutex 成功,则持有该 Mutex,其它线程将无法获取到该 Mutex。

如果线程调用 wait() 方法,就会释放当前持有的 Mutex,并且该线程会进入 WaitSet 集合中,等待下一次被唤醒。如果当前线程顺利执行完方法,也将释放 Mutex。

synchronized 的优化

Java 1.6 以后,synchronized 做了大量的优化,其性能已经与 LockReadWriteLock 基本上持平

Java 对象头

在 JDK1.6 JVM 中,对象实例在堆内存中被分为了三个部分:对象头、实例数据和对齐填充。其中 Java 对象头由 Mark Word、指向类的指针以及数组长度三部分组成。

Mark Word 记录了对象和锁有关的信息。Mark Word 在 64 位 JVM 中的长度是 64bit,我们可以一起看下 64 位 JVM 的存储结构是怎么样的。如下图所示:

img

锁升级功能主要依赖于 Mark Word 中的锁标志位和是否偏向锁标志位,synchronized 同步锁就是从偏向锁开始的,随着竞争越来越激烈,偏向锁升级到轻量级锁,最终升级到重量级锁。

Java 1.6 引入了偏向锁和轻量级锁,从而让 synchronized 拥有了四个状态:

  • 无锁状态(unlocked)
  • 偏向锁状态(biasble)
  • 轻量级锁状态(lightweight locked)
  • 重量级锁状态(inflated)

当 JVM 检测到不同的竞争状况时,会自动切换到适合的锁实现。

当没有竞争出现时,默认会使用偏向锁。JVM 会利用 CAS 操作(compare and swap),在对象头上的 Mark Word 部分设置线程 ID,以表示这个对象偏向于当前线程,所以并不涉及真正的互斥锁。这样做的假设是基于在很多应用场景中,大部分对象生命周期中最多会被一个线程锁定,使用偏斜锁可以降低无竞争开销。

如果有另外的线程试图锁定某个已经被偏斜过的对象,JVM 就需要撤销(revoke)偏向锁,并切换到轻量级锁实现。轻量级锁依赖 CAS 操作 Mark Word 来试图获取锁,如果重试成功,就使用普通的轻量级锁;否则,进一步升级为重量级锁。

偏向锁

偏向锁的思想是偏向于第一个获取锁对象的线程,这个线程在之后获取该锁就不再需要进行同步操作,甚至连 CAS 操作也不再需要

img

轻量级锁

轻量级锁是相对于传统的重量级锁而言,它 使用 CAS 操作来避免重量级锁使用互斥量的开销。对于绝大部分的锁,在整个同步周期内都是不存在竞争的,因此也就不需要都使用互斥量进行同步,可以先采用 CAS 操作进行同步,如果 CAS 失败了再改用互斥量进行同步。

当尝试获取一个锁对象时,如果锁对象标记为 0|01,说明锁对象的锁未锁定(unlocked)状态。此时虚拟机在当前线程的虚拟机栈中创建 Lock Record,然后使用 CAS 操作将对象的 Mark Word 更新为 Lock Record 指针。如果 CAS 操作成功了,那么线程就获取了该对象上的锁,并且对象的 Mark Word 的锁标记变为 00,表示该对象处于轻量级锁状态。

img

锁消除 / 锁粗化

除了锁升级优化,Java 还使用了编译器对锁进行优化。

(1)锁消除

锁消除是指对于被检测出不可能存在竞争的共享数据的锁进行消除

JIT 编译器在动态编译同步块的时候,借助了一种被称为逃逸分析的技术,来判断同步块使用的锁对象是否只能够被一个线程访问,而没有被发布到其它线程。

确认是的话,那么 JIT 编译器在编译这个同步块的时候不会生成 synchronized 所表示的锁的申请与释放的机器码,即消除了锁的使用。在 Java7 之后的版本就不需要手动配置了,该操作可以自动实现。

对于一些看起来没有加锁的代码,其实隐式的加了很多锁。例如下面的字符串拼接代码就隐式加了锁:

1
2
3
public static String concatString(String s1, String s2, String s3) {
return s1 + s2 + s3;
}

String 是一个不可变的类,编译器会对 String 的拼接自动优化。在 Java 1.5 之前,会转化为 StringBuffer 对象的连续 append() 操作:

1
2
3
4
5
6
7
public static String concatString(String s1, String s2, String s3) {
StringBuffer sb = new StringBuffer();
sb.append(s1);
sb.append(s2);
sb.append(s3);
return sb.toString();
}

每个 append() 方法中都有一个同步块。虚拟机观察变量 sb,很快就会发现它的动态作用域被限制在 concatString() 方法内部。也就是说,sb 的所有引用永远不会逃逸到 concatString() 方法之外,其他线程无法访问到它,因此可以进行消除。

(2)锁粗化

锁粗化同理,就是在 JIT 编译器动态编译时,如果发现几个相邻的同步块使用的是同一个锁实例,那么 JIT 编译器将会把这几个同步块合并为一个大的同步块,从而避免一个线程“反复申请、释放同一个锁“所带来的性能开销。

如果一系列的连续操作都对同一个对象反复加锁和解锁,频繁的加锁操作就会导致性能损耗。

上一节的示例代码中连续的 append() 方法就属于这类情况。如果虚拟机探测到由这样的一串零碎的操作都对同一个对象加锁,将会把加锁的范围扩展(粗化)到整个操作序列的外部。对于上一节的示例代码就是扩展到第一个 append() 操作之前直至最后一个 append() 操作之后,这样只需要加锁一次就可以了。

自旋锁

互斥同步进入阻塞状态的开销都很大,应该尽量避免。在许多应用中,共享数据的锁定状态只会持续很短的一段时间。自旋锁的思想是让一个线程在请求一个共享数据的锁时执行忙循环(自旋)一段时间,如果在这段时间内能获得锁,就可以避免进入阻塞状态。

自旋锁虽然能避免进入阻塞状态从而减少开销,但是它需要进行忙循环操作占用 CPU 时间,它只适用于共享数据的锁定状态很短的场景。

在 Java 1.6 中引入了自适应的自旋锁。自适应意味着自旋的次数不再固定了,而是由前一次在同一个锁上的自旋次数及锁的拥有者的状态来决定。

synchronized 的误区

示例摘自:《Java 业务开发常见错误 100 例》

synchronized 使用范围不当导致的错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Interesting {

volatile int a = 1;
volatile int b = 1;

public static void main(String[] args) {
Interesting interesting = new Interesting();
new Thread(() -> interesting.add()).start();
new Thread(() -> interesting.compare()).start();
}

public synchronized void add() {
log.info("add start");
for (int i = 0; i < 10000; i++) {
a++;
b++;
}
log.info("add done");
}

public void compare() {
log.info("compare start");
for (int i = 0; i < 10000; i++) {
//a始终等于b吗?
if (a < b) {
log.info("a:{},b:{},{}", a, b, a > b);
//最后的a>b应该始终是false吗?
}
}
log.info("compare done");
}

}

【输出】

1
2
3
4
16:05:25.541 [Thread-0] INFO io.github.dunwu.javacore.concurrent.sync.synchronized使用范围不当 - add start
16:05:25.544 [Thread-0] INFO io.github.dunwu.javacore.concurrent.sync.synchronized使用范围不当 - add done
16:05:25.544 [Thread-1] INFO io.github.dunwu.javacore.concurrent.sync.synchronized使用范围不当 - compare start
16:05:25.544 [Thread-1] INFO io.github.dunwu.javacore.concurrent.sync.synchronized使用范围不当 - compare done

之所以出现这种错乱,是因为两个线程是交错执行 add 和 compare 方法中的业务逻辑,而且这些业务逻辑不是原子性的:a++ 和 b++ 操作中可以穿插在 compare 方法的比较代码中;更需要注意的是,a<b 这种比较操作在字节码层面是加载 a、加载 b 和比较三步,代码虽然是一行但也不是原子性的。

所以,正确的做法应该是,为 add 和 compare 都加上方法锁,确保 add 方法执行时,compare 无法读取 a 和 b:

1
2
public synchronized void add()
public synchronized void compare()

所以,使用锁解决问题之前一定要理清楚,我们要保护的是什么逻辑,多线程执行的情况又是怎样的。

synchronized 保护对象不对导致的错误

加锁前要清楚锁和被保护的对象是不是一个层面的。

静态字段属于类,类级别的锁才能保护;而非静态字段属于类实例,实例级别的锁就可以保护。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class synchronized错误使用示例2 {

public static void main(String[] args) {
synchronized错误使用示例2 demo = new synchronized错误使用示例2();
System.out.println(demo.wrong(1000000));
System.out.println(demo.right(1000000));
}

public int wrong(int count) {
Data.reset();
IntStream.rangeClosed(1, count).parallel().forEach(i -> new Data().wrong());
return Data.getCounter();
}

public int right(int count) {
Data.reset();
IntStream.rangeClosed(1, count).parallel().forEach(i -> new Data().right());
return Data.getCounter();
}

private static class Data {

@Getter
private static int counter = 0;
private static Object locker = new Object();

public static int reset() {
counter = 0;
return counter;
}

public synchronized void wrong() {
counter++;
}

public void right() {
synchronized (locker) {
counter++;
}
}

}

}

wrong 方法中试图对一个静态对象加对象级别的 synchronized 锁,并不能保证线程安全。

锁粒度导致的问题

要尽可能的缩小加锁的范围,这可以提高并发吞吐。

如果精细化考虑了锁应用范围后,性能还无法满足需求的话,我们就要考虑另一个维度的粒度问题了,即:区分读写场景以及资源的访问冲突,考虑使用悲观方式的锁还是乐观方式的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class synchronized锁粒度不当 {

public static void main(String[] args) {
Demo demo = new Demo();
demo.wrong();
demo.right();
}

private static class Demo {

private List<Integer> data = new ArrayList<>();

private void slow() {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
}
}

public int wrong() {
long begin = System.currentTimeMillis();
IntStream.rangeClosed(1, 1000).parallel().forEach(i -> {
synchronized (this) {
slow();
data.add(i);
}
});
log.info("took:{}", System.currentTimeMillis() - begin);
return data.size();
}

public int right() {
long begin = System.currentTimeMillis();
IntStream.rangeClosed(1, 1000).parallel().forEach(i -> {
slow();
synchronized (data) {
data.add(i);
}
});
log.info("took:{}", System.currentTimeMillis() - begin);
return data.size();
}

}

}

volatile

volatile 的要点

volatile 是轻量级的 synchronized,它在多处理器开发中保证了共享变量的“可见性”。

volatile 修饰的变量,具备以下特性:

  • 线程可见性 - 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个共享变量,另外一个线程能读到这个修改的值。
  • 禁止指令重排序
  • 不保证原子性

我们知道,线程安全需要具备:可见性、原子性、顺序性。volatile 不保证原子性,所以决定了它不能彻底地保证线程安全。

volatile 的应用

如果 volatile 变量修饰符使用恰当的话,它比 synchronized 的使用和执行成本更低,因为它不会引起线程上下文的切换和调度。但是,**volatile 无法替代 synchronized ,因为 volatile 无法保证操作的原子性**。

通常来说,使用 volatile 必须具备以下 2 个条件

  • 对变量的写操作不依赖于当前值
  • 该变量没有包含在具有其他变量的表达式中

【示例】状态标记量

1
2
3
4
5
6
7
8
9
volatile boolean flag = false;

while(!flag) {
doSomething();
}

public void setFlag() {
flag = true;
}

【示例】双重锁实现线程安全的单例模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Singleton {
private volatile static Singleton instance = null;

private Singleton() {}

public static Singleton getInstance() {
if(instance==null) {
synchronized (Singleton.class) {
if(instance==null)
instance = new Singleton();
}
}
return instance;
}
}

volatile 的原理

观察加入 volatile 关键字和没有加入 volatile 关键字时所生成的汇编代码发现,加入 volatile 关键字时,会多出一个 lock 前缀指令。**lock 前缀指令实际上相当于一个内存屏障**(也成内存栅栏),内存屏障会提供 3 个功能:

  • 它确保指令重排序时不会把其后面的指令排到内存屏障之前的位置,也不会把前面的指令排到内存屏障的后面;即在执行到内存屏障这句指令时,在它前面的操作已经全部完成;
  • 它会强制将对缓存的修改操作立即写入主存;
  • 如果是写操作,它会导致其他 CPU 中对应的缓存行无效。

volatile 的问题

volatile 的要点中,已经提到,**volatile 不保证原子性,所以 volatile 并不能保证线程安全**。

那么,如何做到线程安全呢?有两种方案:

  • volatile + synchronized - 可以参考:【示例】双重锁实现线程安全的单例模式
  • 使用原子类替代 volatile

CAS

CAS 的要点

互斥同步是最常见的并发正确性保障手段。

互斥同步最主要的问题是线程阻塞和唤醒所带来的性能问题,因此互斥同步也被称为阻塞同步。互斥同步属于一种悲观的并发策略,总是认为只要不去做正确的同步措施,那就肯定会出现问题。无论共享数据是否真的会出现竞争,它都要进行加锁(这里讨论的是概念模型,实际上虚拟机会优化掉很大一部分不必要的加锁)、用户态核心态转换、维护锁计数器和检查是否有被阻塞的线程需要唤醒等操作。

随着硬件指令集的发展,我们可以使用基于冲突检测的乐观并发策略:先进行操作,如果没有其它线程争用共享数据,那操作就成功了,否则采取补偿措施(不断地重试,直到成功为止)。这种乐观的并发策略的许多实现都不需要将线程阻塞,因此这种同步操作称为非阻塞同步。

为什么说乐观锁需要 硬件指令集的发展 才能进行?因为需要操作和冲突检测这两个步骤具备原子性。而这点是由硬件来完成,如果再使用互斥同步来保证就失去意义了。硬件支持的原子性操作最典型的是:CAS。

CAS(Compare and Swap),字面意思为比较并交换。CAS 有 3 个操作数,分别是:内存值 M,期望值 E,更新值 U。当且仅当内存值 M 和期望值 E 相等时,将内存值 M 修改为 U,否则什么都不做

CAS 的应用

CAS 只适用于线程冲突较少的情况

CAS 的典型应用场景是:

  • 原子类
  • 自旋锁

原子类

原子类是 CAS 在 Java 中最典型的应用。

我们先来看一个常见的代码片段。

1
2
3
if(a==b) {
a++;
}

如果 a++ 执行前, a 的值被修改了怎么办?还能得到预期值吗?出现该问题的原因是在并发环境下,以上代码片段不是原子操作,随时可能被其他线程所篡改。

解决这种问题的最经典方式是应用原子类的 incrementAndGet 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class AtomicIntegerDemo {

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
final AtomicInteger count = new AtomicInteger(0);
for (int i = 0; i < 10; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
count.incrementAndGet();
}
});
}

executorService.shutdown();
executorService.awaitTermination(3, TimeUnit.SECONDS);
System.out.println("Final Count is : " + count.get());
}

}

J.U.C 包中提供了 AtomicBooleanAtomicIntegerAtomicLong 分别针对 BooleanIntegerLong 执行原子操作,操作和上面的示例大体相似,不做赘述。

自旋锁

利用原子类(本质上是 CAS),可以实现自旋锁。

所谓自旋锁,是指线程反复检查锁变量是否可用,直到成功为止。由于线程在这一过程中保持执行,因此是一种忙等待。一旦获取了自旋锁,线程会一直保持该锁,直至显式释放自旋锁。

示例:非线程安全示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class AtomicReferenceDemo {

private static int ticket = 10;

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executorService.execute(new MyThread());
}
executorService.shutdown();
}

static class MyThread implements Runnable {

@Override
public void run() {
while (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}
}

}

}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
pool-1-thread-2 卖出了第 10 张票
pool-1-thread-1 卖出了第 10 张票
pool-1-thread-3 卖出了第 10 张票
pool-1-thread-1 卖出了第 8 张票
pool-1-thread-2 卖出了第 9 张票
pool-1-thread-1 卖出了第 6 张票
pool-1-thread-3 卖出了第 7 张票
pool-1-thread-1 卖出了第 4 张票
pool-1-thread-2 卖出了第 5 张票
pool-1-thread-1 卖出了第 2 张票
pool-1-thread-3 卖出了第 3 张票
pool-1-thread-2 卖出了第 1 张票

很明显,出现了重复售票的情况。

【示例】使用自旋锁来保证线程安全

可以通过自旋锁这种非阻塞同步来保证线程安全,下面使用 AtomicReference 来实现一个自旋锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class AtomicReferenceDemo2 {

private static int ticket = 10;

public static void main(String[] args) {
threadSafeDemo();
}

private static void threadSafeDemo() {
SpinLock lock = new SpinLock();
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executorService.execute(new MyThread(lock));
}
executorService.shutdown();
}

static class SpinLock {

private AtomicReference<Thread> atomicReference = new AtomicReference<>();

public void lock() {
Thread current = Thread.currentThread();
while (!atomicReference.compareAndSet(null, current)) {}
}

public void unlock() {
Thread current = Thread.currentThread();
atomicReference.compareAndSet(current, null);
}

}

static class MyThread implements Runnable {

private SpinLock lock;

public MyThread(SpinLock lock) {
this.lock = lock;
}

@Override
public void run() {
while (ticket > 0) {
lock.lock();
if (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}
lock.unlock();
}
}

}

}

输出结果:

1
2
3
4
5
6
7
8
9
10
pool-1-thread-2 卖出了第 10 张票
pool-1-thread-1 卖出了第 9 张票
pool-1-thread-3 卖出了第 8 张票
pool-1-thread-2 卖出了第 7 张票
pool-1-thread-3 卖出了第 6 张票
pool-1-thread-1 卖出了第 5 张票
pool-1-thread-2 卖出了第 4 张票
pool-1-thread-1 卖出了第 3 张票
pool-1-thread-3 卖出了第 2 张票
pool-1-thread-1 卖出了第 1 张票

CAS 的原理

Java 主要利用 Unsafe 这个类提供的 CAS 操作。Unsafe 的 CAS 依赖的是 JVM 针对不同的操作系统实现的硬件指令 **Atomic::cmpxchg**。Atomic::cmpxchg 的实现使用了汇编的 CAS 操作,并使用 CPU 提供的 lock 信号保证其原子性。

CAS 的问题

一般情况下,CAS 比锁性能更高。因为 CAS 是一种非阻塞算法,所以其避免了线程阻塞和唤醒的等待时间。

但是,事物总会有利有弊,CAS 也存在三大问题:

  • ABA 问题
  • 循环时间长开销大
  • 只能保证一个共享变量的原子性

ABA 问题

如果一个变量初次读取的时候是 A 值,它的值被改成了 B,后来又被改回为 A,那 CAS 操作就会误认为它从来没有被改变过

J.U.C 包提供了一个带有标记的原子引用类 AtomicStampedReference 来解决这个问题,它可以通过控制变量值的版本来保证 CAS 的正确性。大部分情况下 ABA 问题不会影响程序并发的正确性,如果需要解决 ABA 问题,改用传统的互斥同步可能会比原子类更高效

循环时间长开销大

自旋 CAS (不断尝试,直到成功为止)如果长时间不成功,会给 CPU 带来非常大的执行开销

如果 JVM 能支持处理器提供的 pause 指令那么效率会有一定的提升,pause 指令有两个作用:

  • 它可以延迟流水线执行指令(de-pipeline),使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。
  • 它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起 CPU 流水线被清空(CPU pipeline flush),从而提高 CPU 的执行效率。

比较花费 CPU 资源,即使没有任何用也会做一些无用功。

只能保证一个共享变量的原子性

当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就可以用锁。

或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量 i = 2, j = a,合并一下 ij=2a,然后用 CAS 来操作 ij。从 Java 1.5 开始 JDK 提供了 AtomicReference 类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行 CAS 操作。

ThreadLocal

ThreadLocal 是一个存储线程本地副本的工具类

要保证线程安全,不一定非要进行同步。同步只是保证共享数据争用时的正确性,如果一个方法本来就不涉及共享数据,那么自然无须同步。

Java 中的 无同步方案 有:

  • 可重入代码 - 也叫纯代码。如果一个方法,它的 返回结果是可以预测的,即只要输入了相同的数据,就能返回相同的结果,那它就满足可重入性,当然也是线程安全的。
  • 线程本地存储 - 使用 ThreadLocal 为共享变量在每个线程中都创建了一个本地副本,这个副本只能被当前线程访问,其他线程无法访问,那么自然是线程安全的。

ThreadLocal 的应用

ThreadLocal 的方法:

1
2
3
4
5
6
public class ThreadLocal<T> {
public T get() {}
public void set(T value) {}
public void remove() {}
public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {}
}

说明:

  • get - 用于获取 ThreadLocal 在当前线程中保存的变量副本。
  • set - 用于设置当前线程中变量的副本。
  • remove - 用于删除当前线程中变量的副本。如果此线程局部变量随后被当前线程读取,则其值将通过调用其 initialValue 方法重新初始化,除非其值由中间线程中的当前线程设置。 这可能会导致当前线程中多次调用 initialValue 方法。
  • initialValue - 为 ThreadLocal 设置默认的 get 初始值,需要重写 initialValue 方法 。

ThreadLocal 常用于防止对可变的单例(Singleton)变量或全局变量进行共享。典型应用场景有:管理数据库连接、Session。

【示例】数据库连接

1
2
3
4
5
6
7
8
9
10
private static ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>() {
@Override
public Connection initialValue() {
return DriverManager.getConnection(DB_URL);
}
};

public static Connection getConnection() {
return connectionHolder.get();
}

【示例】Session 管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static final ThreadLocal<Session> sessionHolder = new ThreadLocal<>();

public static Session getSession() {
Session session = (Session) sessionHolder.get();
try {
if (session == null) {
session = createSession();
sessionHolder.set(session);
}
} catch (Exception e) {
e.printStackTrace();
}
return session;
}

【示例】完整使用 ThreadLocal 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ThreadLocalDemo {

private static ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;
}
};

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executorService.execute(new MyThread());
}
executorService.shutdown();
}

static class MyThread implements Runnable {

@Override
public void run() {
int count = threadLocal.get();
for (int i = 0; i < 10; i++) {
try {
count++;
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
threadLocal.set(count);
threadLocal.remove();
System.out.println(Thread.currentThread().getName() + " : " + count);
}

}

}

全部输出 count = 10

ThreadLocal 的原理

存储结构

Thread 类中维护着一个 ThreadLocal.ThreadLocalMap 类型的成员 threadLocals。这个成员就是用来存储当前线程独占的变量副本。

ThreadLocalMapThreadLocal 的内部类,它维护着一个 Entry 数组,**Entry 继承了 WeakReference** ,所以是弱引用。 Entry 用于保存键值对,其中:

  • keyThreadLocal 对象;
  • value 是传递进来的对象(变量副本)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Thread implements Runnable {
// ...
ThreadLocal.ThreadLocalMap threadLocals = null;
// ...
}

static class ThreadLocalMap {
// ...
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
// ...
}

如何解决 Hash 冲突

ThreadLocalMap 虽然是类似 Map 结构的数据结构,但它并没有实现 Map 接口。它不支持 Map 接口中的 next 方法,这意味着 ThreadLocalMap 中解决 Hash 冲突的方式并非 拉链表 方式。

实际上,**ThreadLocalMap 采用线性探测的方式来解决 Hash 冲突**。所谓线性探测,就是根据初始 key 的 hashcode 值确定元素在 table 数组中的位置,如果发现这个位置上已经被其他的 key 值占用,则利用固定的算法寻找一定步长的下个位置,依次判断,直至找到能够存放的位置。

内存泄漏问题

ThreadLocalMapEntry 继承了 WeakReference,所以它的 key (ThreadLocal 对象)是弱引用,而 value (变量副本)是强引用

  • 如果 ThreadLocal 对象没有外部强引用来引用它,那么 ThreadLocal 对象会在下次 GC 时被回收。
  • 此时,Entry 中的 key 已经被回收,但是 value 由于是强引用不会被垃圾收集器回收。如果创建 ThreadLocal 的线程一直持续运行,那么 value 就会一直得不到回收,产生内存泄露

那么如何避免内存泄漏呢?方法就是:使用 ThreadLocalset 方法后,显示的调用 remove 方法

1
2
3
4
5
6
7
ThreadLocal<String> threadLocal = new ThreadLocal();
try {
threadLocal.set("xxx");
// ...
} finally {
threadLocal.remove();
}

ThreadLocal 的误区

示例摘自:《Java 业务开发常见错误 100 例》

ThreadLocal 适用于变量在线程间隔离,而在方法或类间共享的场景。

前文提到,ThreadLocal 是线程隔离的,那么是不是使用 ThreadLocal 就一定高枕无忧呢?

ThreadLocal 错误案例

使用 Spring Boot 创建一个 Web 应用程序,使用 ThreadLocal 存放一个 Integer 的值,来暂且代表需要在线程中保存的用户信息,这个值初始是 null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);

@GetMapping("wrong")
public Map<String, String> wrong(@RequestParam("id") Integer userId) {
//设置用户信息之前先查询一次ThreadLocal中的用户信息
String before = Thread.currentThread().getName() + ":" + currentUser.get();
//设置用户信息到ThreadLocal
currentUser.set(userId);
//设置用户信息之后再查询一次ThreadLocal中的用户信息
String after = Thread.currentThread().getName() + ":" + currentUser.get();
//汇总输出两次查询结果
Map<String, String> result = new HashMap<>();
result.put("before", before);
result.put("after", after);
return result;
}

【预期】从代码逻辑来看,我们预期第一次获取的值始终应该是 null。

【实际】

为了方便复现,将 Tomcat 工作线程设为 1:

1
server.tomcat.max-threads=1

当访问 id = 1 时,符合预期

img

当访问 id = 2 时,before 的应答不是 null,而是 1,不符合预期。

【分析】实际情况和预期存在偏差。Spring Boot 程序运行在 Tomcat 中,执行程序的线程是 Tomcat 的工作线程,而 Tomcat 的工作线程是基于线程池的。线程池会重用固定的几个线程,一旦线程重用,那么很可能首次从
ThreadLocal 获取的值是之前其他用户的请求遗留的值。这时,ThreadLocal 中的用户信息就是其他用户的信息

并不能认为没有显式开启多线程就不会有线程安全问题。使用类似 ThreadLocal 工具来存放一些数据时,需要特别注意在代码运行完后,显式地去清空设置的数据。

ThreadLocal 错误案例修正

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@GetMapping("right")
public Map<String, String> right(@RequestParam("id") Integer userId) {
String before = Thread.currentThread().getName() + ":" + currentUser.get();
currentUser.set(userId);
try {
String after = Thread.currentThread().getName() + ":" + currentUser.get();
Map<String, String> result = new HashMap<>();
result.put("before", before);
result.put("after", after);
return result;
} finally {
//在finally代码块中删除ThreadLocal中的数据,确保数据不串
currentUser.remove();
}
}

InheritableThreadLocal

InheritableThreadLocal 类是 ThreadLocal 类的子类。

ThreadLocal 中每个线程拥有它自己独占的数据。与 ThreadLocal 不同的是,InheritableThreadLocal 允许一个线程以及该线程创建的所有子线程都可以访问它保存的数据。

原理参考:Java 多线程:InheritableThreadLocal 实现原理

参考资料

Java 线程基础

关键词:ThreadRunnableCallableFuturewaitnotifynotifyAlljoinsleepyeild线程状态线程通信

线程简介

什么是进程

简言之,进程可视为一个正在运行的程序。它是系统运行程序的基本单位,因此进程是动态的。进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动。进程是操作系统进行资源分配的基本单位。

什么是线程

线程是操作系统进行调度的基本单位。线程也叫轻量级进程(Light Weight Process),在一个进程里可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等属性,并且能够访问共享的内存变量。

进程和线程的区别

  • 一个程序至少有一个进程,一个进程至少有一个线程。
  • 线程比进程划分更细,所以执行开销更小,并发性更高。
  • 进程是一个实体,拥有独立的资源;而同一个进程中的多个线程共享进程的资源。

创建线程

创建线程有三种方式:

  • 继承 Thread
  • 实现 Runnable 接口
  • 实现 Callable 接口

Thread

通过继承 Thread 类创建线程的步骤:

  1. 定义 Thread 类的子类,并覆写该类的 run 方法。run 方法的方法体就代表了线程要完成的任务,因此把 run 方法称为执行体。
  2. 创建 Thread 子类的实例,即创建了线程对象。
  3. 调用线程对象的 start 方法来启动该线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ThreadDemo {

public static void main(String[] args) {
// 实例化对象
MyThread tA = new MyThread("Thread 线程-A");
MyThread tB = new MyThread("Thread 线程-B");
// 调用线程主体
tA.start();
tB.start();
}

static class MyThread extends Thread {

private int ticket = 5;

MyThread(String name) {
super(name);
}

@Override
public void run() {
while (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}
}

}

}

Runnable

实现 Runnable 接口优于继承 Thread,因为:

  • Java 不支持多重继承,所有的类都只允许继承一个父类,但可以实现多个接口。如果继承了 Thread 类就无法继承其它类,这不利于扩展。
  • 类可能只要求可执行就行,继承整个 Thread 类开销过大。

通过实现 Runnable 接口创建线程的步骤:

  1. 定义 Runnable 接口的实现类,并覆写该接口的 run 方法。该 run 方法的方法体同样是该线程的线程执行体。
  2. 创建 Runnable 实现类的实例,并以此实例作为 Thread 的 target 来创建 Thread 对象,该 Thread 对象才是真正的线程对象。
  3. 调用线程对象的 start 方法来启动该线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class RunnableDemo {

public static void main(String[] args) {
// 实例化对象
Thread tA = new Thread(new MyThread(), "Runnable 线程-A");
Thread tB = new Thread(new MyThread(), "Runnable 线程-B");
// 调用线程主体
tA.start();
tB.start();
}

static class MyThread implements Runnable {

private int ticket = 5;

@Override
public void run() {
while (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}
}

}

}

Callable、Future、FutureTask

继承 Thread 类和实现 Runnable 接口这两种创建线程的方式都没有返回值。所以,线程执行完后,无法得到执行结果。但如果期望得到执行结果该怎么做?

为了解决这个问题,Java 1.5 后,提供了 Callable 接口和 Future 接口,通过它们,可以在线程执行结束后,返回执行结果。

Callable

Callable 接口只声明了一个方法,这个方法叫做 call():

1
2
3
4
5
6
7
8
9
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

那么怎么使用 Callable 呢?一般情况下是配合 ExecutorService 来使用的,在 ExecutorService 接口中声明了若干个 submit 方法的重载版本:

1
2
3
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

第一个 submit 方法里面的参数类型就是 Callable。

Future

Future 就是对于具体的 Callable 任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过 get 方法获取执行结果,该方法会阻塞直到任务返回结果。

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask

FutureTask 类实现了 RunnableFuture 接口,RunnableFuture 继承了 Runnable 接口和 Future 接口。

所以,FutureTask 既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的返回值。

1
2
3
4
5
6
7
8
9
public class FutureTask<V> implements RunnableFuture<V> {
// ...
public FutureTask(Callable<V> callable) {}
public FutureTask(Runnable runnable, V result) {}
}

public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

事实上,FutureTask 是 Future 接口的一个唯一实现类。

Callable + Future + FutureTask 示例

通过实现 Callable 接口创建线程的步骤:

  1. 创建 Callable 接口的实现类,并实现 call 方法。该 call 方法将作为线程执行体,并且有返回值。
  2. 创建 Callable 实现类的实例,使用 FutureTask 类来包装 Callable 对象,该 FutureTask 对象封装了该 Callable 对象的 call 方法的返回值。
  3. 使用 FutureTask 对象作为 Thread 对象的 target 创建并启动新线程。
  4. 调用 FutureTask 对象的 get 方法来获得线程执行结束后的返回值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class CallableDemo {

public static void main(String[] args) {
Callable<Long> callable = new MyThread();
FutureTask<Long> future = new FutureTask<>(callable);
new Thread(future, "Callable 线程").start();
try {
System.out.println("任务耗时:" + (future.get() / 1000000) + "毫秒");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

static class MyThread implements Callable<Long> {

private int ticket = 10000;

@Override
public Long call() {
long begin = System.nanoTime();
while (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}

long end = System.nanoTime();
return (end - begin);
}

}

}

线程基本用法

线程(Thread)基本方法清单:

方法 描述
run 线程的执行实体。
start 线程的启动方法。
currentThread 返回对当前正在执行的线程对象的引用。
setName 设置线程名称。
getName 获取线程名称。
setPriority 设置线程优先级。Java 中的线程优先级的范围是 [1,10],一般来说,高优先级的线程在运行时会具有优先权。可以通过 thread.setPriority(Thread.MAX_PRIORITY) 的方式设置,默认优先级为 5。
getPriority 获取线程优先级。
setDaemon 设置线程为守护线程。
isDaemon 判断线程是否为守护线程。
isAlive 判断线程是否启动。
interrupt 中断另一个线程的运行状态。
interrupted 测试当前线程是否已被中断。通过此方法可以清除线程的中断状态。换句话说,如果要连续调用此方法两次,则第二次调用将返回 false(除非当前线程在第一次调用清除其中断状态之后且在第二次调用检查其状态之前再次中断)。
join 可以使一个线程强制运行,线程强制运行期间,其他线程无法运行,必须等待此线程完成之后才可以继续执行。
Thread.sleep 静态方法。将当前正在执行的线程休眠。
Thread.yield 静态方法。将当前正在执行的线程暂停,让其他线程执行。

线程休眠

使用 Thread.sleep 方法可以使得当前正在执行的线程进入休眠状态。

使用 Thread.sleep 需要向其传入一个整数值,这个值表示线程将要休眠的毫秒数。

Thread.sleep 方法可能会抛出 InterruptedException,因为异常不能跨线程传播回 main 中,因此必须在本地进行处理。线程中抛出的其它异常也同样需要在本地进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ThreadSleepDemo {

public static void main(String[] args) {
new Thread(new MyThread("线程A", 500)).start();
new Thread(new MyThread("线程B", 1000)).start();
new Thread(new MyThread("线程C", 1500)).start();
}

static class MyThread implements Runnable {

/** 线程名称 */
private String name;

/** 休眠时间 */
private int time;

private MyThread(String name, int time) {
this.name = name;
this.time = time;
}

@Override
public void run() {
try {
// 休眠指定的时间
Thread.sleep(this.time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.name + "休眠" + this.time + "毫秒。");
}

}

}

线程礼让

Thread.yield 方法的调用声明了当前线程已经完成了生命周期中最重要的部分,可以切换给其它线程来执行 。

该方法只是对线程调度器的一个建议,而且也只是建议具有相同优先级的其它线程可以运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ThreadYieldDemo {

public static void main(String[] args) {
MyThread t = new MyThread();
new Thread(t, "线程A").start();
new Thread(t, "线程B").start();
}

static class MyThread implements Runnable {

@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "运行,i = " + i);
if (i == 2) {
System.out.print("线程礼让:");
Thread.yield();
}
}
}
}
}

终止线程

Thread 中的 stop 方法有缺陷,已废弃

使用 Thread.stop 停止线程会导致它解锁所有已锁定的监视器(由于未经检查的 ThreadDeath 异常会在堆栈中传播,这是自然的结果)。 如果先前由这些监视器保护的任何对象处于不一致状态,则损坏的对象将对其他线程可见,从而可能导致任意行为。

stop() 方法会真的杀死线程,不给线程喘息的机会,如果线程持有 ReentrantLock 锁,被 stop() 的线程并不会自动调用 ReentrantLock 的 unlock() 去释放锁,那其他线程就再也没机会获得 ReentrantLock 锁,这实在是太危险了。所以该方法就不建议使用了,类似的方法还有 suspend() 和 resume() 方法,这两个方法同样也都不建议使用了,所以这里也就不多介绍了。Thread.stop 的许多用法应由仅修改某些变量以指示目标线程应停止运行的代码代替。 目标线程应定期检查此变量,如果该变量指示要停止运行,则应按有序方式从其运行方法返回。如果目标线程等待很长时间(例如,在条件变量上),则应使用中断方法来中断等待。

当一个线程运行时,另一个线程可以直接通过 interrupt 方法中断其运行状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ThreadInterruptDemo {

public static void main(String[] args) {
MyThread mt = new MyThread(); // 实例化Runnable子类对象
Thread t = new Thread(mt, "线程"); // 实例化Thread对象
t.start(); // 启动线程
try {
Thread.sleep(2000); // 线程休眠2秒
} catch (InterruptedException e) {
System.out.println("3、main线程休眠被终止");
}
t.interrupt(); // 中断线程执行
}

static class MyThread implements Runnable {

@Override
public void run() {
System.out.println("1、进入run()方法");
try {
Thread.sleep(10000); // 线程休眠10秒
System.out.println("2、已经完成了休眠");
} catch (InterruptedException e) {
System.out.println("3、MyThread线程休眠被终止");
return; // 返回调用处
}
System.out.println("4、run()方法正常结束");
}
}
}

如果一个线程的 run 方法执行一个无限循环,并且没有执行 sleep 等会抛出 InterruptedException 的操作,那么调用线程的 interrupt 方法就无法使线程提前结束。

但是调用 interrupt 方法会设置线程的中断标记,此时调用 interrupted 方法会返回 true。因此可以在循环体中使用 interrupted 方法来判断线程是否处于中断状态,从而提前结束线程。

安全地终止线程有两种方法:

  • 定义 volatile 标志位,在 run 方法中使用标志位控制线程终止
  • 使用 interrupt 方法和 Thread.interrupted 方法配合使用来控制线程终止

【示例】使用 volatile 标志位控制线程终止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ThreadStopDemo2 {

public static void main(String[] args) throws Exception {
MyTask task = new MyTask();
Thread thread = new Thread(task, "MyTask");
thread.start();
TimeUnit.MILLISECONDS.sleep(50);
task.cancel();
}

private static class MyTask implements Runnable {

private volatile boolean flag = true;

private volatile long count = 0L;

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 线程启动");
while (flag) {
System.out.println(count++);
}
System.out.println(Thread.currentThread().getName() + " 线程终止");
}

/**
* 通过 volatile 标志位来控制线程终止
*/
public void cancel() {
flag = false;
}

}

}

【示例】使用 interrupt 方法和 Thread.interrupted 方法配合使用来控制线程终止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ThreadStopDemo3 {

public static void main(String[] args) throws Exception {
MyTask task = new MyTask();
Thread thread = new Thread(task, "MyTask");
thread.start();
TimeUnit.MILLISECONDS.sleep(50);
thread.interrupt();
}

private static class MyTask implements Runnable {

private volatile long count = 0L;

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 线程启动");
// 通过 Thread.interrupted 和 interrupt 配合来控制线程终止
while (!Thread.interrupted()) {
System.out.println(count++);
}
System.out.println(Thread.currentThread().getName() + " 线程终止");
}
}
}

守护线程

什么是守护线程?

  • 守护线程(Daemon Thread)是在后台执行并且不会阻止 JVM 终止的线程当所有非守护线程结束时,程序也就终止,同时会杀死所有守护线程
  • 与守护线程(Daemon Thread)相反的,叫用户线程(User Thread),也就是非守护线程。

为什么需要守护线程?

  • 守护线程的优先级比较低,用于为系统中的其它对象和线程提供服务。典型的应用就是垃圾回收器。

如何使用守护线程?

  • 可以使用 isDaemon 方法判断线程是否为守护线程。
  • 可以使用 setDaemon 方法设置线程为守护线程。
    • 正在运行的用户线程无法设置为守护线程,所以 setDaemon 必须在 thread.start 方法之前设置,否则会抛出 llegalThreadStateException 异常;
    • 一个守护线程创建的子线程依然是守护线程。
    • 不要认为所有的应用都可以分配给守护线程来进行服务,比如读写操作或者计算逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ThreadDaemonDemo {

public static void main(String[] args) {
Thread t = new Thread(new MyThread(), "线程");
t.setDaemon(true); // 此线程在后台运行
System.out.println("线程 t 是否是守护进程:" + t.isDaemon());
t.start(); // 启动线程
}

static class MyThread implements Runnable {

@Override
public void run() {
while (true) {
System.out.println(Thread.currentThread().getName() + "在运行。");
}
}
}
}

参考阅读:Java 中守护线程的总结

线程通信

当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其它部分之前完成,那么就需要对线程进行协调。

wait/notify/notifyAll

  • wait - wait 会自动释放当前线程占有的对象锁,并请求操作系统挂起当前线程,让线程从 Running 状态转入 Waiting 状态,等待 notify / notifyAll 来唤醒。如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify 或者 notifyAll 来唤醒挂起的线程,造成死锁。
  • notify - 唤醒一个正在 Waiting 状态的线程,并让它拿到对象锁,具体唤醒哪一个线程由 JVM 控制 。
  • notifyAll - 唤醒所有正在 Waiting 状态的线程,接下来它们需要竞争对象锁。

注意:

  • waitnotifynotifyAll 都是 Object 类中的方法,而非 Thread
  • **waitnotifynotifyAll 只能用在 synchronized 方法或者 synchronized 代码块中使用,否则会在运行时抛出 IllegalMonitorStateException**。

为什么 waitnotifynotifyAll 不定义在 Thread 中?为什么 waitnotifynotifyAll 要配合 synchronized 使用?

首先,需要了解几个基本知识点:

  • 每一个 Java 对象都有一个与之对应的 监视器(monitor)
  • 每一个监视器里面都有一个 对象锁 、一个 等待队列、一个 同步队列

了解了以上概念,我们回过头来理解前面两个问题。

为什么这几个方法不定义在 Thread 中?

由于每个对象都拥有对象锁,让当前线程等待某个对象锁,自然应该基于这个对象(Object)来操作,而非使用当前线程(Thread)来操作。因为当前线程可能会等待多个线程的锁,如果基于线程(Thread)来操作,就非常复杂了。

为什么 waitnotifynotifyAll 要配合 synchronized 使用?

如果调用某个对象的 wait 方法,当前线程必须拥有这个对象的对象锁,因此调用 wait 方法必须在 synchronized 方法和 synchronized 代码块中。

生产者、消费者模式是 waitnotifynotifyAll 的一个经典使用案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class ThreadWaitNotifyDemo02 {

private static final int QUEUE_SIZE = 10;
private static final PriorityQueue<Integer> queue = new PriorityQueue<>(QUEUE_SIZE);

public static void main(String[] args) {
new Producer("生产者A").start();
new Producer("生产者B").start();
new Consumer("消费者A").start();
new Consumer("消费者B").start();
}

static class Consumer extends Thread {

Consumer(String name) {
super(name);
}

@Override
public void run() {
while (true) {
synchronized (queue) {
while (queue.size() == 0) {
try {
System.out.println("队列空,等待数据");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
queue.notifyAll();
}
}
queue.poll(); // 每次移走队首元素
queue.notifyAll();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 从队列取走一个元素,队列当前有:" + queue.size() + "个元素");
}
}
}
}

static class Producer extends Thread {

Producer(String name) {
super(name);
}

@Override
public void run() {
while (true) {
synchronized (queue) {
while (queue.size() == QUEUE_SIZE) {
try {
System.out.println("队列满,等待有空余空间");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
queue.notifyAll();
}
}
queue.offer(1); // 每次插入一个元素
queue.notifyAll();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 向队列取中插入一个元素,队列当前有:" + queue.size() + "个元素");
}
}
}
}
}

join

在线程操作中,可以使用 join 方法让一个线程强制运行,线程强制运行期间,其他线程无法运行,必须等待此线程完成之后才可以继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ThreadJoinDemo {

public static void main(String[] args) {
MyThread mt = new MyThread(); // 实例化Runnable子类对象
Thread t = new Thread(mt, "mythread"); // 实例化Thread对象
t.start(); // 启动线程
for (int i = 0; i < 50; i++) {
if (i > 10) {
try {
t.join(); // 线程强制运行
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Main 线程运行 --> " + i);
}
}

static class MyThread implements Runnable {

@Override
public void run() {
for (int i = 0; i < 50; i++) {
System.out.println(Thread.currentThread().getName() + " 运行,i = " + i); // 取得当前线程的名字
}
}
}
}

管道

管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不同之处在于,它主要用于线程之间的数据传输,而传输的媒介为内存。
管道输入/输出流主要包括了如下 4 种具体实现:PipedOutputStreamPipedInputStreamPipedReaderPipedWriter,前两种面向字节,而后两种面向字符。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Piped {

public static void main(String[] args) throws Exception {
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
// 将输出流和输入流进行连接,否则在使用时会抛出IOException
out.connect(in);
Thread printThread = new Thread(new Print(in), "PrintThread");
printThread.start();
int receive = 0;
try {
while ((receive = System.in.read()) != -1) {
out.write(receive);
}
} finally {
out.close();
}
}

static class Print implements Runnable {

private PipedReader in;

Print(PipedReader in) {
this.in = in;
}

public void run() {
int receive = 0;
try {
while ((receive = in.read()) != -1) {
System.out.print((char) receive);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

线程生命周期

img

java.lang.Thread.State 中定义了 6 种不同的线程状态,在给定的一个时刻,线程只能处于其中的一个状态。

以下是各状态的说明,以及状态间的联系:

  • 新建(New) - 尚未调用 start 方法的线程处于此状态。此状态意味着:创建的线程尚未启动

  • 就绪(Runnable) - 已经调用了 start 方法的线程处于此状态。此状态意味着:线程已经在 JVM 中运行。但是在操作系统层面,它可能处于运行状态,也可能等待资源调度(例如处理器资源),资源调度完成就进入运行状态。所以该状态的可运行是指可以被运行,具体有没有运行要看底层操作系统的资源调度。

  • 阻塞(Blocked) - 此状态意味着:线程处于被阻塞状态。表示线程在等待 synchronized 的隐式锁(Monitor lock)。synchronized 修饰的方法、代码块同一时刻只允许一个线程执行,其他线程只能等待,即处于阻塞状态。当占用 synchronized 隐式锁的线程释放锁,并且等待的线程获得 synchronized 隐式锁时,就又会从 BLOCKED 转换到 RUNNABLE 状态。

  • 等待(Waiting) - 此状态意味着:线程无限期等待,直到被其他线程显式地唤醒。 阻塞和等待的区别在于,阻塞是被动的,它是在等待获取 synchronized 的隐式锁。而等待是主动的,通过调用 Object.wait 等方法进入。

    进入方法 退出方法
    没有设置 Timeout 参数的 Object.wait 方法 Object.notify / Object.notifyAll
    没有设置 Timeout 参数的 Thread.join 方法 被调用的线程执行完毕
    LockSupport.park 方法(Java 并发包中的锁,都是基于它实现的) LockSupport.unpark
  • 定时等待(Timed waiting) - 此状态意味着:无需等待其它线程显式地唤醒,在一定时间之后会被系统自动唤醒

    进入方法 退出方法
    Thread.sleep 方法 时间结束
    获得 synchronized 隐式锁的线程,调用设置了 Timeout 参数的 Object.wait 方法 时间结束 / Object.notify / Object.notifyAll
    设置了 Timeout 参数的 Thread.join 方法 时间结束 / 被调用的线程执行完毕
    LockSupport.parkNanos 方法 LockSupport.unpark
    LockSupport.parkUntil 方法 LockSupport.unpark
  • 终止(Terminated) - 线程执行完 run 方法,或者因异常退出了 run 方法。此状态意味着:线程结束了生命周期。

线程常见问题

sleep、yield、join 方法有什么区别

  • yield 方法
    • yield 方法会 让线程从 Running 状态转入 Runnable 状态
    • 当调用了 yield 方法后,只有与当前线程相同或更高优先级的Runnable 状态线程才会获得执行的机会
  • sleep 方法
    • sleep 方法会 让线程从 Running 状态转入 Waiting 状态
    • sleep 方法需要指定等待的时间,超过等待时间后,JVM 会将线程从 Waiting 状态转入 Runnable 状态
    • 当调用了 sleep 方法后,无论什么优先级的线程都可以得到执行机会
    • sleep 方法不会释放“锁标志”,也就是说如果有 synchronized 同步块,其他线程仍然不能访问共享数据。
  • join
    • join 方法会 让线程从 Running 状态转入 Waiting 状态
    • 当调用了 join 方法后,当前线程必须等待调用 join 方法的线程结束后才能继续执行

为什么 sleep 和 yield 方法是静态的

Thread 类的 sleepyield 方法将处理 Running 状态的线程。

所以在其他处于非 Running 状态的线程上执行这两个方法是没有意义的。这就是为什么这些方法是静态的。它们可以在当前正在执行的线程中工作,并避免程序员错误的认为可以在其他非运行线程调用这些方法。

Java 线程是否按照线程优先级严格执行

即使设置了线程的优先级,也无法保证高优先级的线程一定先执行

原因在于线程优先级依赖于操作系统的支持,然而,不同的操作系统支持的线程优先级并不相同,不能很好的和 Java 中线程优先级一一对应。

一个线程两次调用 start()方法会怎样

Java 的线程是不允许启动两次的,第二次调用必然会抛出 IllegalThreadStateException,这是一种运行时异常,多次调用 start 被认为是编程错误。

startrun 方法有什么区别

  • run 方法是线程的执行体。
  • start 方法会启动线程,然后 JVM 会让这个线程去执行 run 方法。

可以直接调用 Thread 类的 run 方法么

  • 可以。但是如果直接调用 Threadrun 方法,它的行为就会和普通的方法一样。
  • 为了在新的线程中执行我们的代码,必须使用 Threadstart 方法。

参考资料

Java 线程池

简介

什么是线程池

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。

为什么要用线程池

如果并发请求数量很多,但每个线程执行的时间很短,就会出现频繁的创建和销毁线程。如此一来,会大大降低系统的效率,可能频繁创建和销毁线程的时间、资源开销要大于实际工作的所需。

正是由于这个问题,所以有必要引入线程池。使用 线程池的好处 有以下几点:

  • 降低资源消耗 - 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度 - 当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性 - 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。

Executor 框架

Executor 框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架,目的是提供一种将”任务提交”与”任务如何运行”分离开来的机制。

核心 API 概述

Executor 框架核心 API 如下:

  • Executor - 运行任务的简单接口。
  • ExecutorService - 扩展了 Executor 接口。扩展能力:
    • 支持有返回值的线程;
    • 支持管理线程的生命周期。
  • ScheduledExecutorService - 扩展了 ExecutorService 接口。扩展能力:支持定期执行任务。
  • AbstractExecutorService - ExecutorService 接口的默认实现。
  • ThreadPoolExecutor - Executor 框架最核心的类,它继承了 AbstractExecutorService 类。
  • ScheduledThreadPoolExecutor - ScheduledExecutorService 接口的实现,一个可定时调度任务的线程池。
  • Executors - 可以通过调用 Executors 的静态工厂方法来创建线程池并返回一个 ExecutorService 对象。

img

Executor

Executor 接口中只定义了一个 execute 方法,用于接收一个 Runnable 对象。

1
2
3
public interface Executor {
void execute(Runnable command);
}

ExecutorService

ExecutorService 接口继承了 Executor 接口,它还提供了 invokeAllinvokeAnyshutdownsubmit 等方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public interface ExecutorService extends Executor {

void shutdown();

List<Runnable> shutdownNow();

boolean isShutdown();

boolean isTerminated();

boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

从其支持的方法定义,不难看出:相比于 Executor 接口,ExecutorService 接口主要的扩展是:

  • 支持有返回值的线程 - sumbitinvokeAllinvokeAny 方法中都支持传入Callable 对象。
  • 支持管理线程生命周期 - shutdownshutdownNowisShutdown 等方法。

ScheduledExecutorService

ScheduledExecutorService 接口扩展了 ExecutorService 接口。

它除了支持前面两个接口的所有能力以外,还支持定时调度线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface ScheduledExecutorService extends ExecutorService {

public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);

}

其扩展的接口提供以下能力:

  • schedule 方法可以在指定的延时后执行一个 Runnable 或者 Callable 任务。
  • scheduleAtFixedRate 方法和 scheduleWithFixedDelay 方法可以按照指定时间间隔,定期执行任务。

ThreadPoolExecutor

java.uitl.concurrent.ThreadPoolExecutor 类是 Executor 框架中最核心的类。所以,本文将着重讲述一下这个类。

重要字段

ThreadPoolExecutor 有以下重要字段:

1
2
3
4
5
6
7
8
9
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

参数说明:

  • ctl - 用于控制线程池的运行状态和线程池中的有效线程数量。它包含两部分的信息:
    • 线程池的运行状态 (runState)
    • 线程池内有效线程的数量 (workerCount)
    • 可以看到,ctl 使用了 Integer 类型来保存,高 3 位保存 runState,低 29 位保存 workerCountCOUNT_BITS 就是 29,CAPACITY 就是 1 左移 29 位减 1(29 个 1),这个常量表示 workerCount 的上限值,大约是 5 亿。
  • 运行状态 - 线程池一共有五种运行状态:
    • RUNNING - 运行状态。接受新任务,并且也能处理阻塞队列中的任务。
    • SHUTDOWN - 关闭状态。不接受新任务,但可以处理阻塞队列中的任务。
      • 在线程池处于 RUNNING 状态时,调用 shutdown 方法会使线程池进入到该状态。
      • finalize 方法在执行过程中也会调用 shutdown 方法进入该状态。
    • STOP - 停止状态。不接受新任务,也不处理队列中的任务。会中断正在处理任务的线程。在线程池处于 RUNNINGSHUTDOWN 状态时,调用 shutdownNow 方法会使线程池进入到该状态。
    • TIDYING - 整理状态。如果所有的任务都已终止了,workerCount (有效线程数) 为 0,线程池进入该状态后会调用 terminated 方法进入 TERMINATED 状态。
    • TERMINATED - 已终止状态。在 terminated 方法执行完后进入该状态。默认 terminated 方法中什么也没有做。进入 TERMINATED 的条件如下:
      • 线程池不是 RUNNING 状态;
      • 线程池状态不是 TIDYING 状态或 TERMINATED 状态;
      • 如果线程池状态是 SHUTDOWN 并且 workerQueue 为空;
      • workerCount 为 0;
      • 设置 TIDYING 状态成功。

img

构造方法

ThreadPoolExecutor 有四个构造方法,前三个都是基于第四个实现。第四个构造方法定义如下:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {

参数说明:

  • corePoolSize - 核心线程数量。当有新任务通过 execute 方法提交时 ,线程池会执行以下判断:
    • 如果运行的线程数少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的。
    • 如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当 workQueue 满时才创建新的线程去处理任务;
    • 如果设置的 corePoolSizemaximumPoolSize 相同,则创建的线程池的大小是固定的。这时如果有新任务提交,若 workQueue 未满,则将请求放入 workQueue 中,等待有空闲的线程去从 workQueue 中取任务并处理;
    • 如果运行的线程数量大于等于 maximumPoolSize,这时如果 workQueue 已经满了,则使用 handler 所指定的策略来处理任务;
    • 所以,任务提交时,判断的顺序为 corePoolSize => workQueue => maximumPoolSize
  • maximumPoolSize - 最大线程数量
    • 如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。
    • 值得注意的是:如果使用了无界的任务队列这个参数就没什么效果。
  • keepAliveTime线程保持活动的时间
    • 当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime
    • 所以,如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
  • unit - keepAliveTime 的时间单位。有 7 种取值。可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
  • workQueue - 等待执行的任务队列。用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。
    • ArrayBlockingQueue - 有界阻塞队列
      • 此队列是基于数组的先进先出队列(FIFO)
      • 此队列创建时必须指定大小。
    • LinkedBlockingQueue - 无界阻塞队列
      • 此队列是基于链表的先进先出队列(FIFO)
      • 如果创建时没有指定此队列大小,则默认为 Integer.MAX_VALUE
      • 吞吐量通常要高于 ArrayBlockingQueue
      • 使用 LinkedBlockingQueue 意味着: maximumPoolSize 将不起作用,线程池能创建的最大线程数为 corePoolSize,因为任务等待队列是无界队列。
      • Executors.newFixedThreadPool 使用了这个队列。
    • SynchronousQueue - 不会保存提交的任务,而是将直接新建一个线程来执行新来的任务
      • 每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。
      • 吞吐量通常要高于 LinkedBlockingQueue
      • Executors.newCachedThreadPool 使用了这个队列。
    • PriorityBlockingQueue - 具有优先级的无界阻塞队列
  • threadFactory - 线程工厂。可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
  • handler - 饱和策略。它是 RejectedExecutionHandler 类型的变量。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。线程池支持以下策略:
    • AbortPolicy - 丢弃任务并抛出异常。这也是默认策略。
    • DiscardPolicy - 丢弃任务,但不抛出异常。
    • DiscardOldestPolicy - 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
    • CallerRunsPolicy - 直接调用 run 方法并且阻塞执行。
    • 如果以上策略都不能满足需要,也可以通过实现 RejectedExecutionHandler 接口来定制处理策略。如记录日志或持久化不能处理的任务。

execute 方法

默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。

提交任务可以使用 execute 方法,它是 ThreadPoolExecutor 的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行

execute 方法工作流程如下:

  1. 如果 workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
  2. 如果 workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
  3. 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
  4. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

img

其他重要方法

ThreadPoolExecutor 类中还有一些重要的方法:

  • submit - 类似于 execute,但是针对的是有返回值的线程。submit 方法是在 ExecutorService 中声明的方法,在 AbstractExecutorService 就已经有了具体的实现。ThreadPoolExecutor 直接复用 AbstractExecutorServicesubmit 方法。
  • shutdown - 不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务。
    • 将线程池切换到 SHUTDOWN 状态;
    • 并调用 interruptIdleWorkers 方法请求中断所有空闲的 worker;
    • 最后调用 tryTerminate 尝试结束线程池。
  • shutdownNow - 立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。与 shutdown 方法类似,不同的地方在于:
    • 设置状态为 STOP
    • 中断所有工作线程,无论是否是空闲的;
    • 取出阻塞队列中没有被执行的任务并返回。
  • isShutdown - 调用了 shutdownshutdownNow 方法后,isShutdown 方法就会返回 true。
  • isTerminaed - 当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。
  • setCorePoolSize - 设置核心线程数大小。
  • setMaximumPoolSize - 设置最大线程数大小。
  • getTaskCount - 线程池已经执行的和未执行的任务总数;
  • getCompletedTaskCount - 线程池已完成的任务数量,该值小于等于 taskCount
  • getLargestPoolSize - 线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了 maximumPoolSize
  • getPoolSize - 线程池当前的线程数量;
  • getActiveCount - 当前线程池中正在执行任务的线程数量。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ThreadPoolExecutorDemo {

public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 500, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

for (int i = 0; i < 100; i++) {
threadPoolExecutor.execute(new MyThread());
String info = String.format("线程池中线程数目:%s,队列中等待执行的任务数目:%s,已执行玩别的任务数目:%s",
threadPoolExecutor.getPoolSize(),
threadPoolExecutor.getQueue().size(),
threadPoolExecutor.getCompletedTaskCount());
System.out.println(info);
}
threadPoolExecutor.shutdown();
}

static class MyThread implements Runnable {

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}

}

}

Executors

JDK 的 Executors 类中提供了几种具有代表性的线程池,这些线程池 都是基于 ThreadPoolExecutor 的定制化实现

在实际使用线程池的场景中,我们往往不是直接使用 ThreadPoolExecutor ,而是使用 JDK 中提供的具有代表性的线程池实例。

newSingleThreadExecutor

创建一个单线程的线程池

只会创建唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。 如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它

单工作线程最大的特点是:可保证顺序地执行各个任务

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class SingleThreadExecutorDemo {

public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}
});
}
executorService.shutdown();
}

}

newFixedThreadPool

创建一个固定大小的线程池

每次提交一个任务就会新创建一个工作线程,如果工作线程数量达到线程池最大线程数,则将提交的任务存入到阻塞队列中

FixedThreadPool 是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class FixedThreadPoolDemo {

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 100; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}
});
}
executorService.shutdown();
}

}

newCachedThreadPool

创建一个可缓存的线程池

  • 如果线程池大小超过处理任务所需要的线程数,就会回收部分空闲的线程;
  • 如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为 1 分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
  • 此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说 JVM)能够创建的最大线程大小。 因此,使用 CachedThreadPool 时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class CachedThreadPoolDemo {

public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}
});
}
executorService.shutdown();
}

}

newScheduleThreadPool

创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ScheduledThreadPoolDemo {

public static void main(String[] args) {
schedule();
scheduleAtFixedRate();
}

private static void schedule() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 100; i++) {
executorService.schedule(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}
}, 1, TimeUnit.SECONDS);
}
executorService.shutdown();
}

private static void scheduleAtFixedRate() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 100; i++) {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}
}, 1, 1, TimeUnit.SECONDS);
}
executorService.shutdown();
}

}

newWorkStealingPool

Java 8 才引入。

其内部会构建 ForkJoinPool,利用 Work-Stealing 算法,并行地处理任务,不保证处理顺序。

线程池最佳实践

计算线程数量

一般多线程执行的任务类型可以分为 CPU 密集型和 I/O 密集型,根据不同的任务类型,我们计算线程数的方法也不一样。

CPU 密集型任务:这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

I/O 密集型任务:这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

建议使用有界阻塞队列

不建议使用 Executors 的最重要的原因是:Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列

《阿里巴巴 Java 开发手册》中提到,禁止使用这些方法来创建线程池,而应该手动 new ThreadPoolExecutor 来创建线程池。制订这条规则是因为容易导致生产事故,最典型的就是 newFixedThreadPoolnewCachedThreadPool,可能因为资源耗尽导致 OOM 问题。

【示例】newFixedThreadPool OOM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
printStats(threadPool);
for (int i = 0; i < 100000000; i++) {
threadPool.execute(() -> {
String payload = IntStream.rangeClosed(1, 1000000)
.mapToObj(__ -> "a")
.collect(Collectors.joining("")) + UUID.randomUUID().toString();
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
}
log.info(payload);
});
}

threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);

newFixedThreadPool 使用的工作队列是 LinkedBlockingQueue ,而默认构造方法的 LinkedBlockingQueue 是一个 Integer.MAX_VALUE 长度的队列,可以认为是无界的。如果任务较多并且执行较慢的话,队列可能会快速积压,撑爆内存导致 OOM。

【示例】newCachedThreadPool OOM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
printStats(threadPool);
for (int i = 0; i < 100000000; i++) {
threadPool.execute(() -> {
String payload = UUID.randomUUID().toString();
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
}
log.info(payload);
});
}
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);

newCachedThreadPool 的最大线程数是 Integer.MAX_VALUE,可以认为是没有上限的,而其工作队列 SynchronousQueue 是一个没有存储空间的阻塞队列。这意味着,只要有请求到来,就必须找到一条工作线程来处理,如果当前没有空闲的线程就再创建一条新的。

如果大量的任务进来后会创建大量的线程。我们知道线程是需要分配一定的内存空间作为线程栈的,比如 1MB,因此无限制创建线程必然会导致 OOM。

重要任务应该自定义拒绝策略

使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。

参考资料

Java 并发工具类

JDK 的 java.util.concurrent 包(即 J.U.C)中提供了几个非常有用的并发工具类。

CountDownLatch

字面意思为 递减计数锁。用于控制一个线程等待多个线程

CountDownLatch 维护一个计数器 count,表示需要等待的事件数量。countDown 方法递减计数器,表示有一个事件已经发生。调用 await 方法的线程会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。

img

CountDownLatch 是基于 AQS(AbstractQueuedSynchronizer) 实现的。

CountDownLatch 唯一的构造方法:

1
2
// 初始化计数器
public CountDownLatch(int count) {};

说明:

  • count 为统计值。

CountDownLatch 的重要方法:

1
2
3
public void await() throws InterruptedException { };
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
public void countDown() { };

说明:

  • await() - 调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行。
  • await(long timeout, TimeUnit unit) - 和 await() 类似,只不过等待一定的时间后 count 值还没变为 0 的话就会继续执行
  • countDown() - 将统计值 count 减 1

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class CountDownLatchDemo {

public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2);

new Thread(new MyThread(latch)).start();
new Thread(new MyThread(latch)).start();

try {
System.out.println("等待2个子线程执行完毕...");
latch.await();
System.out.println("2个子线程已经执行完毕");
System.out.println("继续执行主线程");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

static class MyThread implements Runnable {

private CountDownLatch latch;

public MyThread(CountDownLatch latch) {
this.latch = latch;
}

@Override
public void run() {
System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
latch.countDown();
}

}

}

CyclicBarrier

字面意思是 循环栅栏。**CyclicBarrier 可以让一组线程等待至某个状态(遵循字面意思,不妨称这个状态为栅栏)之后再全部同时执行。之所以叫循环栅栏是因为:当所有等待线程都被释放以后,CyclicBarrier 可以被重用**。

CyclicBarrier 维护一个计数器 count。每次执行 await 方法之后,count 加 1,直到计数器的值和设置的值相等,等待的所有线程才会继续执行。

CyclicBarrier 是基于 ReentrantLockCondition 实现的。

CyclicBarrier 应用场景:CyclicBarrier 在并行迭代算法中非常有用。

img

CyclicBarrier 提供了 2 个构造方法

1
2
public CyclicBarrier(int parties) {}
public CyclicBarrier(int parties, Runnable barrierAction) {}

说明:

  • parties - parties 数相当于一个阈值,当有 parties 数量的线程在等待时, CyclicBarrier 处于栅栏状态。
  • barrierAction - 当 CyclicBarrier 处于栅栏状态时执行的动作。

CyclicBarrier 的重要方法:

1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {}
// 将屏障重置为初始状态
public void reset() {}

说明:

  • await() - 等待调用 await() 的线程数达到屏障数。如果当前线程是最后一个到达的线程,并且在构造函数中提供了非空屏障操作,则当前线程在允许其他线程继续之前运行该操作。如果在屏障动作期间发生异常,那么该异常将在当前线程中传播并且屏障被置于断开状态。
  • await(long timeout, TimeUnit unit) - 相比于 await() 方法,这个方法让这些线程等待至一定的时间,如果还有线程没有到达栅栏状态就直接让到达栅栏状态的线程执行后续任务。
  • reset() - 将屏障重置为初始状态。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class CyclicBarrierDemo {

final static int N = 4;

public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(N,
new Runnable() {
@Override
public void run() {
System.out.println("当前线程" + Thread.currentThread().getName());
}
});

for (int i = 0; i < N; i++) {
MyThread myThread = new MyThread(barrier);
new Thread(myThread).start();
}
}

static class MyThread implements Runnable {

private CyclicBarrier cyclicBarrier;

MyThread(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
try {
Thread.sleep(3000); // 以睡眠来模拟写入数据操作
System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}

}

}

Semaphore

字面意思为 信号量Semaphore 用来控制某段代码块的并发数。

Semaphore 管理着一组虚拟的许可(permit),permit 的初始数量可通过构造方法来指定。每次执行 acquire 方法可以获取一个 permit,如果没有就等待;而 release 方法可以释放一个 permit。

Semaphore 应用场景:

  • Semaphore 可以用于实现资源池,如数据库连接池。
  • Semaphore 可以用于将任何一种容器变成有界阻塞容器。

img

Semaphore 提供了 2 个构造方法:

1
2
3
4
// 参数 permits 表示许可数目,即同时可以允许多少线程进行访问
public Semaphore(int permits) {}
// 参数 fair 表示是否是公平的,即等待时间越久的越先获取许可
public Semaphore(int permits, boolean fair) {}

说明:

  • permits - 初始化固定数量的 permit,并且默认为非公平模式。
  • fair - 设置是否为公平模式。所谓公平,是指等待久的优先获取 permit。

Semaphore的重要方法:

1
2
3
4
5
6
7
8
// 获取 1 个许可
public void acquire() throws InterruptedException {}
//获取 permits 个许可
public void acquire(int permits) throws InterruptedException {}
// 释放 1 个许可
public void release() {}
//释放 permits 个许可
public void release(int permits) {}

说明:

  • acquire() - 获取 1 个 permit。
  • acquire(int permits) - 获取 permits 数量的 permit。
  • release() - 释放 1 个 permit。
  • release(int permits) - 释放 permits 数量的 permit。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class SemaphoreDemo {

private static final int THREAD_COUNT = 30;

private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

private static Semaphore semaphore = new Semaphore(10);

public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("save data");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}

threadPool.shutdown();
}

}

总结

  • CountDownLatchCyclicBarrier 都能够实现线程之间的等待,只不过它们侧重点不同:
    • CountDownLatch 一般用于某个线程 A 等待若干个其他线程执行完任务之后,它才执行;
    • CyclicBarrier 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
    • 另外,CountDownLatch 是不可以重用的,而 CyclicBarrier 是可以重用的。
  • Semaphore 其实和锁有点类似,它一般用于控制对某组资源的访问权限。

参考资料

Maven 插件之代码检查

maven-checkstyle-plugin

maven-checkstyle-plugin,用于检测代码中不符合规范的地方。

定义 checkstyle.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
<!DOCTYPE module PUBLIC
"-//Checkstyle//DTD Checkstyle Configuration 1.3//EN"
"https://checkstyle.org/dtds/configuration_1_3.dtd">

<!-- Generated by RHY @will_awoke -->

<module name="Checker">

<property name="charset" value="UTF-8"/>
<property name="severity" value="warning"/>

<!-- Checks for Size Violations. -->
<!-- 检查文件的长度(行) default max=2000 -->
<module name="FileLength">
<property name="max" value="2500"/>
</module>

<!-- Checks that property files contain the same keys. -->
<!-- 检查**.properties配置文件 是否有相同的key
<module name="Translation">
</module>
-->

<module name="TreeWalker">

<!-- Checks for imports -->
<!-- 必须导入类的完整路径,即不能使用*导入所需的类 -->
<module name="AvoidStarImport"/>

<!-- 检查是否从非法的包中导入了类 illegalPkgs: 定义非法的包名称-->
<module name="IllegalImport"/> <!-- defaults to sun.* packages -->

<!-- 检查是否导入了不必显示导入的类-->
<module name="RedundantImport"/>

<!-- 检查是否导入的包没有使用-->
<module name="UnusedImports"/>

<!-- Checks for whitespace
<module name="EmptyForIteratorPad"/>
<module name="MethodParamPad"/>
<module name="NoWhitespaceAfter"/>
<module name="NoWhitespaceBefore"/>
<module name="OperatorWrap"/>
<module name="ParenPad"/>
<module name="TypecastParenPad"/>
<module name="WhitespaceAfter"/>
<module name="WhitespaceAround"/>
-->

<!-- 检查类和接口的javadoc 默认不检查author 和version tags
authorFormat: 检查author标签的格式
versionFormat: 检查version标签的格式
scope: 可以检查的类的范围,例如:public只能检查public修饰的类,private可以检查所有的类
excludeScope: 不能检查的类的范围,例如:public,public的类将不被检查,但访问权限小于public的类仍然会检查,其他的权限以此类推
tokens: 该属性适用的类型,例如:CLASS_DEF,INTERFACE_DEF -->
<module name="JavadocType">
<property name="authorFormat" value="\S"/>
<property name="scope" value="protected"/>
<property name="tokens" value="CLASS_DEF,INTERFACE_DEF"/>
</module>

<!-- 检查方法的javadoc的注释
scope: 可以检查的方法的范围,例如:public只能检查public修饰的方法,private可以检查所有的方法
allowMissingParamTags: 是否忽略对参数注释的检查
allowMissingThrowsTags: 是否忽略对throws注释的检查
allowMissingReturntags: 是否忽略对return注释的检查 -->
<module name="JavadocMethod">
<property name="scope" value="private"/>
<property name="allowMissingParamTags" value="false"/>
<property name="allowMissingThrowsTags" value="false"/>
<property name="allowMissingReturnTag" value="false"/>
<property name="tokens" value="METHOD_DEF"/>
<property name="allowUndeclaredRTE" value="true"/>
<property name="allowThrowsTagsForSubclasses" value="true"/>
<!--允许get set 方法没有注释-->
<property name="allowMissingPropertyJavadoc" value="true"/>
</module>

<!-- 检查类变量的注释
scope: 检查变量的范围,例如:public只能检查public修饰的变量,private可以检查所有的变量 -->
<module name="JavadocVariable">
<property name="scope" value="private"/>
</module>

<!--option: 定义左大括号'{'显示位置,eol在同一行显示,nl在下一行显示
maxLineLength: 大括号'{'所在行行最多容纳的字符数
tokens: 该属性适用的类型,例:CLASS_DEF,INTERFACE_DEF,METHOD_DEF,CTOR_DEF -->
<module name="LeftCurly">
<property name="option" value="nl"/>
</module>

<!-- NeedBraces 检查是否应该使用括号的地方没有加括号
tokens: 定义检查的类型 -->
<module name="NeedBraces"/>

<!-- Checks the placement of right curly braces ('}') for else, try, and catch tokens. The policy to verify is specified using property option.
option: 右大括号是否单独一行显示
tokens: 定义检查的类型 -->
<module name="RightCurly">
<property name="option" value="alone"/>
</module>

<!-- 检查在重写了equals方法后是否重写了hashCode方法 -->
<module name="EqualsHashCode"/>

<!-- Checks for illegal instantiations where a factory method is preferred.
Rationale: Depending on the project, for some classes it might be preferable to create instances through factory methods rather than calling the constructor.
A simple example is the java.lang.Boolean class. In order to save memory and CPU cycles, it is preferable to use the predefined constants TRUE and FALSE. Constructor invocations should be replaced by calls to Boolean.valueOf().
Some extremely performance sensitive projects may require the use of factory methods for other classes as well, to enforce the usage of number caches or object pools. -->
<module name="IllegalInstantiation">
<property name="classes" value="java.lang.Boolean"/>
</module>

<!-- Checks for Naming Conventions. 命名规范 -->
<!-- local, final variables, including catch parameters -->
<module name="LocalFinalVariableName"/>

<!-- local, non-final variables, including catch parameters-->
<module name="LocalVariableName"/>

<!-- static, non-final fields -->
<module name="StaticVariableName">
<property name="format" value="(^[A-Z0-9_]{0,19}$)"/>
</module>

<!-- packages -->
<module name="PackageName">
<property name="format" value="^[a-z]+(\.[a-z][a-z0-9]*)*$"/>
</module>

<!-- classes and interfaces -->
<module name="TypeName">
<property name="format" value="(^[A-Z][a-zA-Z0-9]{0,19}$)"/>
</module>

<!-- methods -->
<module name="MethodName">
<property name="format" value="(^[a-z][a-zA-Z0-9]{0,19}$)"/>
</module>

<!-- non-static fields -->
<module name="MemberName">
<property name="format" value="(^[a-z][a-z0-9][a-zA-Z0-9]{0,19}$)"/>
</module>

<!-- parameters -->
<module name="ParameterName">
<property name="format" value="(^[a-z][a-zA-Z0-9_]{0,19}$)"/>
</module>

<!-- constants (static, final fields) -->
<module name="ConstantName">
<property name="format" value="(^[A-Z0-9_]{0,19}$)"/>
</module>

<!-- 代码缩进 -->
<module name="Indentation">
</module>

<!-- Checks for redundant exceptions declared in throws clause such as duplicates, unchecked exceptions or subclasses of another declared exception.
检查是否抛出了多余的异常
<module name="RedundantThrows">
<property name="logLoadErrors" value="true"/>
<property name="suppressLoadErrors" value="true"/>
</module>
-->

<!-- Checks for overly complicated boolean expressions. Currently finds code like if (b == true), b || true, !false, etc.
检查boolean值是否冗余的地方
Rationale: Complex boolean logic makes code hard to understand and maintain. -->
<module name="SimplifyBooleanExpression"/>

<!-- Checks for overly complicated boolean return statements. For example the following code
检查是否存在过度复杂的boolean返回值
if (valid())
return false;
else
return true;
could be written as
return !valid();
The Idea for this Check has been shamelessly stolen from the equivalent PMD rule. -->
<module name="SimplifyBooleanReturn"/>

<!-- Checks that a class which has only private constructors is declared as final.只有私有构造器的类必须声明为final-->
<module name="FinalClass"/>

<!-- Make sure that utility classes (classes that contain only static methods or fields in their API) do not have a public constructor.
确保Utils类(只提供static方法和属性的类)没有public构造器。
Rationale: Instantiating utility classes does not make sense. Hence the constructors should either be private or (if you want to allow subclassing) protected. A common mistake is forgetting to hide the default constructor.
If you make the constructor protected you may want to consider the following constructor implementation technique to disallow instantiating subclasses:
public class StringUtils // not final to allow subclassing
{
protected StringUtils() {
throw new UnsupportedOperationException(); // prevents calls from subclass
}
public static int count(char c, String s) {
// ...
}
}
<module name="HideUtilityClassConstructor"/>
-->

<!-- Checks visibility of class members. Only static final members may be public; other class members must be private unless property protectedAllowed or packageAllowed is set.
检查class成员属性可见性。只有static final 修饰的成员是可以public的。其他的成员属性必需是private的,除非属性protectedAllowed或者packageAllowed设置了true.
Public members are not flagged if the name matches the public member regular expression (contains "^serialVersionUID$" by default). Note: Checkstyle 2 used to include "^f[A-Z][a-zA-Z0-9]*$" in the default pattern to allow CMP for EJB 1.1 with the default settings. With EJB 2.0 it is not longer necessary to have public access for persistent fields, hence the default has been changed.
Rationale: Enforce encapsulation. 强制封装 -->
<module name="VisibilityModifier"/>

<!-- 每一行只能定义一个变量 -->
<module name="MultipleVariableDeclarations">
</module>

<!-- Checks the style of array type definitions. Some like Java-style: public static void main(String[] args) and some like C-style: public static void main(String args[])
检查再定义数组时,采用java风格还是c风格,例如:int[] num是java风格,int num[]是c风格。默认是java风格-->
<module name="ArrayTypeStyle">
</module>

<!-- Checks that there are no "magic numbers", where a magic number is a numeric literal that is not defined as a constant. By default, -1, 0, 1, and 2 are not considered to be magic numbers.
<module name="MagicNumber">
</module>
-->

<!-- A check for TODO: comments. Actually it is a generic regular expression matcher on Java comments. To check for other patterns in Java comments, set property format.
检查是否存在TODO(待处理) TODO是javaIDE自动生成的。一般代码写完后要去掉。
-->
<module name="TodoComment"/>

<!-- Checks that long constants are defined with an upper ell. That is ' L' and not 'l'. This is in accordance to the Java Language Specification, Section 3.10.1.
检查是否在long类型是否定义了大写的L.字母小写l和数字1(一)很相似。
looks a lot like 1. -->
<module name="UpperEll"/>

<!-- Checks that switch statement has "default" clause. 检查switch语句是否有‘default’从句
Rationale: It's usually a good idea to introduce a default case in every switch statement.
Even if the developer is sure that all currently possible cases are covered, this should be expressed in the default branch,
e.g. by using an assertion. This way the code is protected aginst later changes, e.g. introduction of new types in an enumeration type. -->
<module name="MissingSwitchDefault"/>

<!--检查switch中case后是否加入了跳出语句,例如:return、break、throw、continue -->
<module name="FallThrough"/>

<!-- Checks the number of parameters of a method or constructor. max default 7个. -->
<module name="ParameterNumber">
<property name="max" value="5"/>
</module>

<!-- 每行字符数 -->
<module name="LineLength">
<property name="max" value="200"/>
</module>

<!-- Checks for long methods and constructors. max default 150行. max=300 设置长度300 -->
<module name="MethodLength">
<property name="max" value="300"/>
</module>

<!-- ModifierOrder 检查修饰符的顺序,默认是 public,protected,private,abstract,static,final,transient,volatile,synchronized,native -->
<module name="ModifierOrder">
</module>

<!-- 检查是否有多余的修饰符,例如:接口中的方法不必使用public、abstract修饰 -->
<module name="RedundantModifier">
</module>

<!--- 字符串比较必须使用 equals() -->
<module name="StringLiteralEquality">
</module>

<!-- if-else嵌套语句个数 最多4层 -->
<module name="NestedIfDepth">
<property name="max" value="3"/>
</module>

<!-- try-catch 嵌套语句个数 最多2层 -->
<module name="NestedTryDepth">
<property name="max" value="2"/>
</module>

<!-- 返回个数 -->
<module name="ReturnCount">
<property name="max" value="5"/>
<property name="format" value="^$"/>
</module>

</module>
</module>

配置 pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

<project>
...
<properties>
<checkstyle.config.location>config/maven_checks.xml</checkstyle.config.location>
</properties>
...
<reporting>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.0</version>
<executions>
<execution>
<!-- 绑定pmd:pmd到validate生命周期,在validate时会自动进行代码规范检查 -->
<id>validate</id>
<phase>validate</phase>
<configuration>
<!-- 配置文件的路径,在style文件夹下 -->
<configLocation>style/checkstyle.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<includeTestSourceDirectory>false</includeTestSourceDirectory>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jxr-plugin</artifactId>
<version>2.3</version>
</plugin>
</plugins>
</reporting>
...
</project>

其中可以修改使用的检查规则文件路径,插件默认提供了四个规则文件可以直接使用,无需手动下载:

  • config/sun_checks.xml - Sun Microsystems Definition (default).
  • config/maven_checks.xml - Maven Development Definitions.
  • config/turbine_checks.xml - Turbine Development Definitions.
  • config/avalon_checks.xml - Avalon Development Definitions.

配置好后,可以执行 mvn clean checkstyle:check 检查代码。

maven-pmd-plugin

maven-pmd-plugin 是阿里编程规范检查插件。

配置 pom.xml

参考 https://github.com/alibaba/p3c/blob/master/p3c-pmd/pom.xml 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
  <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-pmd-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<sourceEncoding>${project.build.sourceEncoding}</sourceEncoding>
<targetJdk>${maven.compiler.target}</targetJdk>
<printFailingErrors>true</printFailingErrors>
<rulesets>
<ruleset>rulesets/java/ali-comment.xml</ruleset>
<ruleset>rulesets/java/ali-concurrent.xml</ruleset>
<ruleset>rulesets/java/ali-constant.xml</ruleset>
<ruleset>rulesets/java/ali-exception.xml</ruleset>
<ruleset>rulesets/java/ali-flowcontrol.xml</ruleset>
<ruleset>rulesets/java/ali-naming.xml</ruleset>
<ruleset>rulesets/java/ali-oop.xml</ruleset>
<ruleset>rulesets/java/ali-orm.xml</ruleset>
<ruleset>rulesets/java/ali-other.xml</ruleset>
<ruleset>rulesets/java/ali-set.xml</ruleset>
</rulesets>
<printFailingErrors>true</printFailingErrors>
</configuration>
<executions>
<execution>
<phase>verify</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>com.alibaba.p3c</groupId>
<artifactId>p3c-pmd</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
</plugin>
</plugins>

配置好后,可以执行 mvn clean pmd:check 检查代码。

参考资料