Dunwu Blog

大道至简,知易行难

Java 容器简介

容器简介

数组与容器

Java 中常用的存储容器就是数组和容器,二者有以下区别:

  • 存储大小是否固定
    • 数组的长度固定
    • 容器的长度可变
  • 数据类型
    • 数组可以存储基本数据类型,也可以存储引用数据类型
    • 容器只能存储引用数据类型,基本数据类型的变量要转换成对应的包装类才能放入容器类中。

:bulb: 不了解什么是基本数据类型、引用数据类型、包装类这些概念,可以参考:Java 基本数据类型

容器框架

img

Java 容器框架主要分为 CollectionMap 两种。其中,Collection 又分为 ListSet 以及 Queue

  • Collection - 一个独立元素的序列,这些元素都服从一条或者多条规则。
    • List - 必须按照插入的顺序保存元素。常见 List 容器有:
      • ArrayList - Object[] 数组。
      • LinkedList - 双链表 (JDK1.6 之前为循环链表,JDK1.7 取消了循环)。
      • Vector - 通过 synchronized 修饰读写方法来保证并发安全。
      • Vector - Object[] 数组,通过 synchronized 修饰读写方法来保证并发安全。
    • Set - 不能有重复的元素。常见 Set 容器有:
      • HashSet - 无序,内部基于 HashMap 来实现的。
      • LinkedHashSet - 保证插入顺序,内部基于 LinkedHashMap 来实现的。
      • TreeSet - 保证自然序或用户指定的比较器顺序,内部基于红黑树实现。
    • Queue - 按照排队规则来确定对象产生的顺序。
      • PriorityQueue - 基于 Object[] 数组来实现小顶堆
      • DelayQueue - 延迟队列。
      • ArrayQueue - ArrayDequeDeque 的顺序表实现。基于动态数组实现了栈和队列所需的所有操作。
      • LinkedList - LinkedListDeque 的链表实现。
  • Map - 一组成对的“键值对”对象,允许你使用键来查找值。常见的 Map 容器有:
    • HashMap:JDK1.8 之前 HashMap 由数组+链表组成的,数组是 HashMap 的主体,链表则是主要为了解决哈希冲突而存在的(“拉链法”解决冲突)。JDK1.8 以后在解决哈希冲突时有了较大的变化,当链表长度大于阈值(默认为 8)(将链表转换成红黑树前会判断,如果当前数组的长度小于 64,那么会选择先进行数组扩容,而不是转换为红黑树)时,将链表转化为红黑树,以减少搜索时间。
    • LinkedHashMapLinkedHashMap 继承自 HashMap,所以它的底层仍然是基于拉链式散列结构即由数组和链表或红黑树组成。另外,LinkedHashMap 在上面结构的基础上,增加了一条双向链表,使得上面的结构可以保持键值对的插入顺序。同时通过对链表进行相应的操作,实现了访问顺序相关逻辑。
    • Hashtable:数组+链表组成的,数组是 Hashtable 的主体,链表则是主要为了解决哈希冲突而存在的。
    • TreeMap:红黑树(自平衡的排序二叉树)。

容器的基本机制

Java 的容器具有一定的共性,它们或全部或部分依赖以下技术。所以,学习以下技术点,对于理解 Java 容器的特性和原理有很大的帮助。

泛型

Java 1.5 引入了泛型技术。

Java 容器通过泛型技术来保证其数据的类型安全。什么是类型安全呢?

举例来说:如果有一个 List<Object> 容器,Java 编译器在编译时不会对原始类型进行类型安全检查,却会对带参数的类型进行检查,通过使用 Object 作为类型,可以告知编译器该方法可以接受任何类型的对象,比如 String 或 Integer。

1
2
3
List<Object> list = new ArrayList<Object>();
list.add("123");
list.add(123);

如果没有泛型技术,如示例中的代码那样,容器中就可能存储任意数据类型,这是很危险的行为。

1
2
3
List<String> list = new ArrayList<String>();
list.add("123");
list.add(123);

:bulb: 想深入了解 Java 泛型技术的用法和原理可以参考:深入理解 Java 泛型

Iterable 和 Iterator

Iterable 和 Iterator 目的在于遍历访问容器中的元素。

Iterator 接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface Iterator<E> {

boolean hasNext();

E next();

default void remove() {
throw new UnsupportedOperationException("remove");
}

default void forEachRemaining(Consumer<? super E> action) {
Objects.requireNonNull(action);
while (hasNext())
action.accept(next());
}
}

Iterable 接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface Iterable<T> {

Iterator<T> iterator();

default void forEach(Consumer<? super T> action) {
Objects.requireNonNull(action);
for (T t : this) {
action.accept(t);
}
}

default Spliterator<T> spliterator() {
return Spliterators.spliteratorUnknownSize(iterator(), 0);
}
}

Collection 接口扩展了 Iterable 接口。

迭代其实我们可以简单地理解为遍历,是一个标准化遍历各类容器里面的所有对象的接口。它是一个经典的设计模式——迭代器模式(Iterator)。

迭代器模式 - 提供一种方法顺序访问一个聚合对象中各个元素,而又无须暴露该对象的内部表示

示例:迭代器遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class IteratorDemo {

public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
Iterator it = list.iterator();
while (it.hasNext()) {
System.out.println(it.next());
}
}

}

《阿里巴巴 Java 开发手册》的描述如下:

不要在 foreach 循环里进行元素的 remove/add 操作。remove 元素请使用 Iterator 方式,如果并发操作,需要对 Iterator 对象加锁。

通过反编译你会发现 foreach 语法底层其实还是依赖 Iterator 。不过, remove/add 操作直接调用的是集合自己的方法,而不是 Iteratorremove/add方法

这就导致 Iterator 莫名其妙地发现自己有元素被 remove/add ,然后,它就会抛出一个 ConcurrentModificationException 来提示用户发生了并发修改异常。这就是单线程状态下产生的 fail-fast 机制

fail-fast 机制:多个线程对 fail-fast 集合进行修改的时候,可能会抛出ConcurrentModificationException。 即使是单线程下也有可能会出现这种情况,上面已经提到过。

相关阅读:什么是 fail-fastopen in new window

Java8 开始,可以使用 Collection#removeIf()方法删除满足特定条件的元素,如:

1
2
3
4
5
6
List<Integer> list = new ArrayList<>();
for (int i = 1; i <= 10; ++i) {
list.add(i);
}
list.removeIf(filter -> filter % 2 == 0); /* 删除list中的所有偶数 */
System.out.println(list); /* [1, 3, 5, 7, 9] */

除了上面介绍的直接使用 Iterator 进行遍历操作之外,你还可以:

  • 使用普通的 for 循环
  • 使用 fail-safe 的集合类。java.util包下面的所有的集合类都是 fail-fast 的,而java.util.concurrent包下面的所有的类都是 fail-safe 的。
  • … …

Comparable 和 Comparator

Comparable 接口和 Comparator 接口一般用于实现容器中元素的比较及排序:

  • Comparable 接口实际上是出自 java.lang 包 它有一个 compareTo(Object obj) 方法用来排序
  • Comparator 接口实际上是出自 java.util 包它有一个 compare(Object obj1, Object obj2) 方法用来排序

::: tabs#Comparable和Comparator接口定义

@tab Comparable 接口定义

Comparable 接口定义

1
2
3
public interface Comparable<T> {
public int compareTo(T o);
}

@tab Comparator 接口定义

Comparator 接口定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@FunctionalInterface
public interface Comparator<T> {

int compare(T o1, T o2);

boolean equals(Object obj);

// 反转
default Comparator<T> reversed() {
return Collections.reverseOrder(this);
}

default Comparator<T> thenComparing(Comparator<? super T> other) {
Objects.requireNonNull(other);
return (Comparator<T> & Serializable) (c1, c2) -> {
int res = compare(c1, c2);
return (res != 0) ? res : other.compare(c1, c2);
};
}

// thenComparingXXX 方法略

// 静态方法略
}

:::

假设,有一个 List 容器,存储的是 User 类型对象。现在要根据 User 中的 age 属性进行排序。

User 定义如下:

1
2
3
4
5
6
7
8
9
10
11
public class User {

private String name;
private int age;

public User(String name, int age) {
this.age = age;
this.name = name;
}
// getter、setter 略
}

我们分别通过 ComparableComparator 来实现比较、排序,体会一下有何差异。

::: tabs#Comparable和Comparator使用示例

@tab Comparable 接口使用示例

Comparable 接口使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ComparableDemo {

public static void main(String[] args) {
User a = new User("A", 18);
User b = new User("B", 17);
User c = new User("C", 20);
List<User> list = new ArrayList<>(Arrays.asList(a, b, c));
Collections.sort(list);
list.forEach(System.out::println);
}
// 输出:
// User{age=17, name='B'}
// User{age=18, name='A'}
// User{age=20, name='C'}

// 需要对被比较、排序的类进行改造,实现 Comparable 接口
static class User implements Comparable<User> {

private String name;
private int age;

public User(String name, int age) {
this.age = age;
this.name = name;
}

// getter、setter 略

@Override
public int compareTo(User o) {
return this.age - o.age;
}

@Override
public String toString() {
return "User{" + "age=" + age + ", name='" + name + '\'' + '}';
}
}
}

从上例可以看出,使用 Comparable 接口,被排序对象类必须实现 Comparable 接口;并在类中定义 compareTo 方法的实现,即排序逻辑必须置于被排序对象类中。

@tab Comparator 接口使用示例

Comparator 接口使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ComparatorDemo {

public static void main(String[] args) {
User a = new User("A", 18);
User b = new User("B", 17);
User c = new User("C", 20);
List<User> list = new ArrayList<>(Arrays.asList(a, b, c));
Collections.sort(list, new Comparator<User>() {
@Override
public int compare(User o1, User o2) {
return o1.age - o2.age;
}
});
list.forEach(System.out::println);
}
// 输出:
// User{age=17, name='B'}
// User{age=18, name='A'}
// User{age=20, name='C'}

static class User {

private String name;
private int age;

public User(String name, int age) {
this.age = age;
this.name = name;
}

// getter、setter 略

@Override
public String toString() {
return "User{" + "age=" + age + ", name='" + name + '\'' + '}';
}
}
}

从上例可以看出,使用 Comparator 接口和 Comparable 接口的不同点在于:被排序的对象类无需实现 Comparator 接口,排序逻辑置于被排序对象类的外部。

:::

Cloneable

Java 中 一个类要实现 clone 功能 必须实现 Cloneable 接口,否则在调用 clone() 时会报 CloneNotSupportedException 异常。

Java 中所有类都默认继承 java.lang.Object 类,在 java.lang.Object 类中有一个方法 clone(),这个方法将返回 Object 对象的一个拷贝。Object 类里的 clone() 方法仅仅用于浅拷贝(拷贝基本成员属性,对于引用类型仅返回指向改地址的引用)。

如果 Java 类需要深拷贝,需要覆写 clone() 方法。

fail-fast

fail-fast 的要点

Java 容器(如:ArrayList、HashMap、TreeSet 等待)的 javadoc 中常常提到类似的描述:

注意,迭代器的快速失败行为无法得到保证,因为一般来说,不可能对是否出现不同步并发修改做出任何硬性保证。快速失败(fail-fast)迭代器会尽最大努力抛出 ConcurrentModificationException。因此,为提高这类迭代器的正确性而编写一个依赖于此异常的程序是错误的做法:迭代器的快速失败行为应该仅用于检测 bug。

那么,我们不禁要问,什么是 fail-fast,为什么要有 fail-fast 机制?

fail-fast 是 Java 容器的一种错误检测机制。当多个线程对容器进行结构上的改变的操作时,就可能触发 fail-fast 机制。记住是有可能,而不是一定。

例如:假设存在两个线程(线程 1、线程 2),线程 1 通过 Iterator 在遍历容器 A 中的元素,在某个时候线程 2 修改了容器 A 的结构(是结构上面的修改,而不是简单的修改容器元素的内容),那么这个时候程序就会抛出 ConcurrentModificationException 异常,从而产生 fail-fast 机制。

容器在迭代操作中改变元素个数(添加、删除元素)都可能会导致 fail-fast

示例:fail-fast 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class FailFastDemo {

private static int MAX = 100;

private static List<Integer> list = new ArrayList<>();

public static void main(String[] args) {
for (int i = 0; i < MAX; i++) {
list.add(i);
}
new Thread(new MyThreadA()).start();
new Thread(new MyThreadB()).start();
}

/** 迭代遍历容器所有元素 */
static class MyThreadA implements Runnable {

@Override
public void run() {
Iterator<Integer> iterator = list.iterator();
while (iterator.hasNext()) {
int i = iterator.next();
System.out.println("MyThreadA 访问元素:" + i);
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}

/** 遍历删除指定范围内的所有偶数 */
static class MyThreadB implements Runnable {

@Override
public void run() {
int i = 0;
while (i < MAX) {
if (i % 2 == 0) {
System.out.println("MyThreadB 删除元素" + i);
list.remove(i);
}
i++;
}
}

}

}

执行后,会抛出 java.util.ConcurrentModificationException 异常。

解决 fail-fast

fail-fast 有两种解决方案:

  • 在遍历过程中所有涉及到改变容器个数的地方全部加上 synchronized 或者直接使用 Collections.synchronizedXXX 容器,这样就可以解决。但是不推荐,因为增删造成的同步锁可能会阻塞遍历操作,影响吞吐。
  • 使用并发容器,如:CopyOnWriterArrayList

容器和线程安全

为了在并发环境下安全地使用容器,Java 提供了同步容器和并发容器。

同步容器和并发容器详情请参考:Java 并发之容器

参考资料

Java 并发之 AQS

AQS 简介

AQSAbstractQueuedSynchronizer 的缩写,即 队列同步器,顾名思义,其主要作用是处理同步。它是并发锁和很多同步工具类的实现基石(如 ReentrantLockReentrantReadWriteLockCountDownLatchSemaphoreFutureTask 等)。

**AQS 提供了对锁和同步器的通用能力支持 **。在 java.util.concurrent.locks 包中的相关锁(常用的有 ReentrantLockThreadPoolExecutor)都是基于 AQS 来实现。这些锁都没有直接继承 AQS,而是定义了一个 Sync 类去继承 AQS。为什么要这样呢?因为锁面向的是使用用户,而同步器面向的则是线程控制,那么在锁的实现中聚合同步器而不是直接继承 AQS 就可以很好的隔离二者所关注的事情。

AQS 的应用

AQS 定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如 ReentrantLock)和 Share(共享,多个线程可同时执行,如 Semaphore / CountDownLatch)。

独占锁 API

获取、释放独占锁的主要 API 如下:

1
2
3
4
public final void acquire(int arg)
public final void acquireInterruptibly(int arg)
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
public final boolean release(int arg)
  • acquire - 获取独占锁。
  • acquireInterruptibly - 获取可中断的独占锁。
  • tryAcquireNanos - 尝试在指定时间内获取可中断的独占锁。在以下三种情况下回返回:
    • 在超时时间内,当前线程成功获取了锁;
    • 当前线程在超时时间内被中断;
    • 超时时间结束,仍未获得锁返回 false。
  • release - 释放独占锁。

共享锁 API

获取、释放共享锁的主要 API 如下:

1
2
3
4
public final void acquireShared(int arg)
public final void acquireSharedInterruptibly(int arg)
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
public final boolean releaseShared(int arg)
  • acquireShared - 获取共享锁。
  • acquireSharedInterruptibly - 获取可中断的共享锁。
  • tryAcquireSharedNanos - 尝试在指定时间内获取可中断的共享锁。
  • release - 释放共享锁。

AQS 的原理

AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态;如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制。这个机制是基于 CLH 锁 (Craig, Landin, and Hagersten locks) 的变体实现的,将暂时获取不到锁的线程加入到队列中。

CLH 本是一个单向队列,AQS 中的队列采用了 CLH 的变体,是一个虚拟的 FIFO 双向队列(虚拟的双向队列,是指不存在结点实例,仅存在结点之间的关联关系),暂时获取不到锁的线程将被加入到该队列中。AQS 将每条请求共享资源的线程封装成一个 CLH 队列锁的一个结点(Node)来实现锁的分配。在 CLH 队列锁中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。

AQS 的核心原理图:

AQS 的数据结构

先看一下 AbstractQueuedSynchronizer 的定义:

1
2
3
4
5
6
7
8
9
10
11
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

/** 等待队列的队头,懒加载。只能通过 setHead 方法修改。 */
private transient volatile Node head;
/** 等待队列的队尾,懒加载。只能通过 enq 方法添加新的等待节点。*/
private transient volatile Node tail;
/** 同步状态 */
private volatile int state;
}

阅读 AQS 的源码,可以发现:AQS 继承自 AbstractOwnableSynchronize,它有以下核心属性:

  • state - AQS 使用一个整型的 volatile 变量来 维护同步状态。这个整数状态的意义由子类来赋予,如 ReentrantLock 中该状态值表示所有者线程已经重复获取该锁的次数;Semaphore 中该状态值表示剩余的许可数量。
  • headtail - AQS **维护了一个 Node 类型(AQS 的内部类)的双向队列来完成同步状态的管理 **。这个双向队列是一个双向的 FIFO 队列,通过 headtail 指针进行访问。当 **有线程获取锁失败后,就被添加到队列末尾 **。

再来看一下 Node 的源码,很显然,Node 是一个双向队列结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static final class Node {
/** 该等待同步的节点处于共享模式 */
static final Node SHARED = new Node();
/** 该等待同步的节点处于独占模式 */
static final Node EXCLUSIVE = null;

/** 线程等待状态,状态值有:0、1、-1、-2、-3 */
volatile int waitStatus;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

/** 前驱节点 */
volatile Node prev;
/** 后继节点 */
volatile Node next;
/** 等待锁的线程 */
volatile Thread thread;

/** 和节点是否共享有关 */
Node nextWaiter;
}

属性说明:

方法和属性值 含义
waitStatus 当前节点在队列中的状态
thread 表示处于该节点的线程
prev 前驱指针
next 后继指针

waitStatus 是一个整型的 volatile 变量,用来维护 AQS 同步队列中线程节点的状态。waitStatus 有五个状态值:

  • 0 - 一个 Node 被初始化的时候的默认值
  • CANCELLED(1) - 表示线程获取锁的请求已经取消了
  • SIGNAL(-1) - 表示线程已经准备好了,就等资源释放了
  • CONDITION(-2) - 表示节点在等待队列中,节点线程等待唤醒
  • PROPAGATE(-3) - 当前线程处在 SHARED 情况下,该字段才会使用

独占锁的获取和释放

获取独占锁

AQS 中使用 acquire(int arg) 方法获取独占锁的相关源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// 利用 CAS 操作将当前线程加入等待队列队尾
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

// 自旋尝试为等待队列中的线程节点获取独占锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 获取锁成功,退出
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 线程中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

其大致流程如下:

  1. 先通过 tryAcquire 尝试获取同步状态,如果获取同步状态成功,则结束方法,直接返回。
  2. 若不成功,调用 addWaiter 方法,利用 CAS 操作将当前线程加入等待队列队尾。
  3. 接着,自旋尝试为等待队列中的线程节点获取独占锁,直到获取成功或线程中断。

释放独占锁

AQS 中使用 acquire(int arg) 方法获取独占锁的相关源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
Node h = head;
// 如果队列不为空,唤醒下一个节点中的线程
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
  1. 先尝试获取解锁线程的同步状态,如果获取同步状态不成功,则结束方法,直接返回。
  2. 如果获取同步状态成功且队列不为空,AQS 会尝试唤醒下一个节点中的线程。

获取可中断的独占锁

AQS 中使用 acquireInterruptibly(int arg) 方法获取可中断的独占锁。

acquireInterruptibly(int arg) 实现方式 相较于获取独占锁方法( acquire)非常相似,区别仅在于它会 通过 Thread.interrupted 检测当前线程是否被中断,如果是,则立即抛出中断异常(InterruptedException)。

限时获取独占锁

AQS 中使用 tryAcquireNanos(int arg) 方法获取超时等待的独占锁。

doAcquireNanos 的实现方式 相较于获取独占锁方法( acquire)非常相似,区别在于它会根据超时时间和当前时间计算出截止时间。在获取锁的流程中,会不断判断是否超时,如果超时,直接返回 false;如果没超时,则用 LockSupport.parkNanos 来阻塞当前线程。

共享锁的获取和释放

获取共享锁

AQS 中使用 acquireShared(int arg) 方法获取共享锁。

acquireShared 方法和 acquire 方法的逻辑很相似,区别仅在于自旋的条件以及节点出队的操作有所不同。

成功获得共享锁的条件如下:

  • tryAcquireShared(arg) 返回值大于等于 0 (这意味着共享锁的 permit 还没有用完)。
  • 当前节点的前驱节点是头结点。

释放共享锁

AQS 中使用 releaseShared(int arg) 方法释放共享锁。

releaseShared 首先会尝试释放同步状态,如果成功,则解锁一个或多个后继线程节点。释放共享锁和释放独占锁流程大体相似,区别在于:

对于独占模式,如果需要 SIGNAL,释放仅相当于调用头节点的 unparkSuccessor

获取可中断的共享锁

AQS 中使用 acquireSharedInterruptibly(int arg) 方法获取可中断的共享锁。

acquireSharedInterruptibly 方法与 acquireInterruptibly 几乎一致,不再赘述。

限时获取共享锁

AQS 中使用 tryAcquireSharedNanos(int arg) 方法获取超时等待式的共享锁。

tryAcquireSharedNanos 方法与 tryAcquireNanos 几乎一致,不再赘述。

自定义同步器

同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):

  1. 使用者继承 AbstractQueuedSynchronizer 并重写指定的方法。
  2. 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。

AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的钩子方法:

1
2
3
4
5
6
7
8
9
10
// 独占方式。尝试获取资源,成功则返回 true,失败则返回 false。
protected boolean tryAcquire(int)
// 独占方式。尝试释放资源,成功则返回 true,失败则返回 false。
protected boolean tryRelease(int)
// 共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int)
// 共享方式。尝试释放资源,成功则返回 true,失败则返回 false。
protected boolean tryReleaseShared(int)
// 该线程是否正在独占资源。只有用到 condition 才需要去实现它。
protected boolean isHeldExclusively()

什么是钩子方法呢? 钩子方法是一种被声明在抽象类中的方法,一般使用 protected 关键字修饰,它可以是空方法(由子类实现),也可以是默认实现的方法。模板设计模式通过钩子方法控制固定步骤的实现。

参考资料

Java 并发之锁

本文先阐述 Java 中各种锁的概念。

然后,重点介绍 Lock 和 Condition 两个接口及其实现。并发编程有两个核心问题:同步和互斥。

互斥,即同一时刻只允许一个线程访问共享资源;

同步,即线程之间如何通信、协作。

这两大问题,管程(sychronized)都是能够解决的。J.U.C 包还提供了 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题

并发锁简介

确保线程安全最常见的做法是利用锁机制(Locksychronized)来对共享数据做互斥同步,这样在同一个时刻,只有一个线程可以执行某个方法或者某个代码块,那么操作必然是原子性的,线程安全的。

在工作、面试中,经常会听到各种五花八门的锁,听的人云里雾里。锁的概念术语很多,它们是针对不同的问题所提出的,通过简单的梳理,也不难理解。

可重入锁

可重入锁,顾名思义,指的是线程可以重复获取同一把锁。即同一个线程在外层方法获取了锁,在进入内层方法会自动获取锁。

可重入锁可以在一定程度上避免死锁

  • ReentrantLockReentrantReadWriteLock 是可重入锁。这点,从其命名也不难看出。
  • synchronized 也是一个可重入锁

【示例】synchronized 的可重入示例

1
2
3
4
5
6
7
8
synchronized void setA() throws Exception{
Thread.sleep(1000);
setB();
}

synchronized void setB() throws Exception{
Thread.sleep(1000);
}

上面的代码就是一个典型场景:如果使用的锁不是可重入锁的话,setB 可能不会被当前线程执行,从而造成死锁。

【示例】ReentrantLock 的可重入示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Task {

private int value;
private final Lock lock = new ReentrantLock();

public Task() {
this.value = 0;
}

public int get() {
// 获取锁
lock.lock();
try {
return value;
} finally {
// 保证锁能释放
lock.unlock();
}
}

public void addOne() {
// 获取锁
lock.lock();
try {
// 注意:此处已经成功获取锁,进入 get 方法后,又尝试获取锁,
// 如果锁不是可重入的,会导致死锁
value = 1 + get();
} finally {
// 保证锁能释放
lock.unlock();
}
}

}

公平锁与非公平锁

  • 公平锁 - 公平锁是指 多线程按照申请锁的顺序来获取锁
  • 非公平锁 - 非公平锁是指 多线程不按照申请锁的顺序来获取锁 。这就可能会出现优先级反转(后来者居上)或者饥饿现象(某线程总是抢不过别的线程,导致始终无法执行)。

公平锁为了保证线程申请顺序,势必要付出一定的性能代价,因此其吞吐量一般低于非公平锁。

公平锁与非公平锁 在 Java 中的典型实现:

  • synchronized 只支持非公平锁
  • ReentrantLockReentrantReadWriteLock,默认是非公平锁,但支持公平锁

独占锁与共享锁

独占锁与共享锁是一种广义上的说法,从实际用途上来看,也常被称为互斥锁与读写锁。

  • 独占锁 - 独占锁是指 锁一次只能被一个线程所持有
  • 共享锁 - 共享锁是指 锁可被多个线程所持有

独占锁与共享锁在 Java 中的典型实现:

  • synchronizedReentrantLock 只支持独占锁
  • ReentrantReadWriteLock 其写锁是独占锁,其读锁是共享锁。读锁是共享锁使得并发读是非常高效的,读写,写读 ,写写的过程是互斥的。

悲观锁与乐观锁

乐观锁与悲观锁不是指具体的什么类型的锁,而是处理并发同步的策略

悲观锁(Pessimistic Lock)

  • 总是假设最坏的情况,认为:不加锁的并发操作一定会出问题
  • 悲观锁在 Java 中的应用就是通过使用 synchronizedLock 显示加锁来进行互斥同步,这是一种阻塞同步。
  • 悲观锁适合写操作频繁的场景。高并发的场景下,激烈的锁竞争会造成线程阻塞,大量阻塞线程会导致系统的上下文切换,增加系统的性能开销。并且,悲观锁还可能会存在死锁问题,影响代码的正常运行。

【示例】悲观锁示例

1
2
3
4
5
6
7
8
9
10
11
12
13
public void syncTask() {
synchronized (this) {
// 需要同步的操作
}
}

private Lock lock = new ReentrantLock();
lock.lock();
try {
// 需要同步的操作
} finally {
lock.unlock();
}

乐观锁(OptimisticLock)

  • 乐观锁总是假设最好的情况,认为:不加锁的并发操作也没什么问题。每次访问数据时,都假设数据不会被其他线程修改,不必加锁。虽然不加锁,但不意味着什么都不做,而是在更新的时候,判断一下在此期间是否有其他线程更新该数据。
  • 乐观锁最常见的实现方式,是使用版本号机制或 CAS 算法(Compare And Swap)去实现。Java 中的原子类就是基于 CAS 实现的。
  • 乐观锁的优点是:减少锁竞争,提高并发度。
  • 乐观锁的缺点是:
    • 存在 ABA 问题。所谓的 ABA 问题是指在并发编程中,如果一个变量初次读取的时候是 A 值,它的值被改成了 B,然后又其他线程把 B 值改成了 A,而另一个早期线程在对比值时会误以为此值没有发生改变,但其实已经发生变化了
    • 如果乐观锁所检查的数据存在大量锁竞争,会由于不断循环重试,产生大量的 CPU 开销
  • 乐观锁适合读多写少的场景。高并发的场景下,乐观锁相比悲观锁来说,不存在锁竞争造成线程阻塞,也不会有死锁的问题,在性能上往往会更胜一筹。但是,如果冲突频繁发生(写占比非常多的情况),会频繁失败和重试,这样同样会非常影响性能,导致 CPU 飙升。

【示例】乐观锁示例

1
2
3
4
5
6
7
8
9
10
// AtomicInteger 的 getAndAccumulate 方法采用了自旋 + CAS 的乐观锁模式
public final int getAndAccumulate(int x,
IntBinaryOperator accumulatorFunction) {
int prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSet(prev, next));
return prev;
}

乐观锁也是一种通用的锁机制,不仅在 Java 中,在其他很多软件领域,也存在乐观锁机制。比如下面的示例是 MySQL 中的乐观锁示例。

假设,order 表中有一个字段 status,表示订单状态:status 为 1 代表订单未支付;status 为 2 代表订单已支付。现在,要将 id 为 1 的订单状态置为已支付,则操作如下:

1
2
3
4
5
select status, version from order where id=#{id}

update order
set status=2, version=version+1
where id=#{id} and version=#{version};

偏向锁、轻量级锁、重量级锁

所谓轻量级锁与重量级锁,指的是锁控制粒度的粗细。显然,控制粒度越细,阻塞开销越小,并发性也就越高。

Java 1.6 以前,重量级锁一般指的是 synchronized ,而轻量级锁指的是 volatile

Java 1.6 以后,针对 synchronized 做了大量优化,引入 4 种锁状态: 无锁状态、偏向锁、轻量级锁和重量级锁。锁可以单向的从偏向锁升级到轻量级锁,再从轻量级锁升级到重量级锁 。

  • 偏向锁 - 偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。
  • 轻量级锁 - 是指当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。
  • 重量级锁 - 是指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让其他申请的线程进入阻塞,性能降低。

分段锁

分段锁其实是一种锁的设计,并不是具体的一种锁。所谓分段锁,就是把锁的对象分成多段,每段独立控制,使得锁粒度更细,减少阻塞开销,从而提高并发性。这其实很好理解,就像高速公路上的收费站,如果只有一个收费口,那所有的车只能排成一条队缴费;如果有多个收费口,就可以分流了。

Hashtable 使用 synchronized 修饰方法来保证线程安全性,那么面对线程的访问,Hashtable 就会锁住整个对象,所有的其它线程只能等待,这种阻塞方式的吞吐量显然很低。

Java 1.7 以前的 ConcurrentHashMap 就是分段锁的典型案例。ConcurrentHashMap 维护了一个 Segment 数组,一般称为分段桶。

1
final Segment<K,V>[] segments;

当有线程访问 ConcurrentHashMap 的数据时,ConcurrentHashMap 会先根据 hashCode 计算出数据在哪个桶(即哪个 Segment),然后锁住这个 Segment

内置锁和显示锁

Java 1.5 之前,协调对共享对象的访问时可以使用的机制只有 synchronizedvolatile。这两个都属于内置锁,即锁的申请和释放都是由 JVM 所控制。

Java 1.5 之后,增加了新的机制:ReentrantLockReentrantReadWriteLock ,这类锁的申请和释放都可以由程序所控制,所以常被称为显示锁。

💡 synchronized 的用法和原理可以参考:Java 并发基础机制 - synchronized

:bell: 注意:如果不需要 ReentrantLockReentrantReadWriteLock 所提供的高级同步特性,**应该优先考虑使用 synchronized**。理由如下:

  • Java 1.6 以后,synchronized 做了大量的优化,其性能已经与 ReentrantLockReentrantReadWriteLock 基本上持平。
  • 从趋势来看,Java 未来更可能会优化 synchronized ,而不是 ReentrantLockReentrantReadWriteLock ,因为 synchronized 是 JVM 内置属性,它能执行一些优化。
  • ReentrantLockReentrantReadWriteLock 申请和释放锁都是由程序控制,如果使用不当,可能造成死锁,这是很危险的。

以下对比一下显示锁和内置锁的差异:

  • 主动获取锁和释放锁
    • synchronized 不能主动获取锁和释放锁。获取锁和释放锁都是 JVM 控制的。
    • ReentrantLock 可以主动获取锁和释放锁。(如果忘记释放锁,就可能产生死锁)。
  • 响应中断
    • synchronized 不能响应中断。
    • ReentrantLock 可以响应中断。
  • 超时机制
    • synchronized 没有超时机制。
    • ReentrantLock 有超时机制。ReentrantLock 可以设置超时时间,超时后自动释放锁,避免一直等待。
  • 支持公平锁
    • synchronized 只支持非公平锁。
    • ReentrantLock 支持非公平锁和公平锁。
  • 是否支持共享
    • synchronized 修饰的方法或代码块,只能被一个线程访问(独享)。如果这个线程被阻塞,其他线程也只能等待
    • ReentrantLock 可以基于 Condition 灵活的控制同步条件。
  • 是否支持读写分离
    • synchronized 不支持读写锁分离;
    • ReentrantReadWriteLock 支持读写锁,从而使阻塞读写的操作分开,有效提高并发性。

Lock 和 Condition

为何引入 Lock 和 Condition

并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。这两大问题,管程都是能够解决的。Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题

synchronized 是管程的一种实现,既然如此,何必再提供 Lock 和 Condition。

JDK 1.6 以前,synchronized 还没有做优化,性能远低于 Lock。但是,性能不是引入 Lock 的最重要因素。真正关键在于:synchronized 使用不当,可能会出现死锁。synchronized 无法通过破坏不可抢占条件来避免死锁。原因是 synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。

与内置锁 synchronized 不同的是,**Lock 提供了一组无条件的、可轮询的、定时的以及可中断的锁操作**,所有获取锁、释放锁的操作都是显式的操作。

  • 能够响应中断。synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。
  • 支持超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
  • 非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。

Lock 接口

Lock 的接口定义如下:

1
2
3
4
5
6
7
8
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
  • lock() - 获取锁。
  • unlock() - 释放锁。
  • tryLock() - 尝试获取锁,仅在调用时锁未被另一个线程持有的情况下,才获取该锁。
  • tryLock(long time, TimeUnit unit) - 和 tryLock() 类似,区别仅在于限定时间,如果限定时间内未获取到锁,视为失败。
  • lockInterruptibly() - 锁未被另一个线程持有,且线程没有被中断的情况下,才能获取锁。
  • newCondition() - 返回一个绑定到 Lock 对象上的 Condition 实例。

Condition

Condition 实现了管程模型里面的条件变量

前文中提过 Lock 接口中 有一个 newCondition() 方法用于返回一个绑定到 Lock 对象上的 Condition 实例。Condition 是什么?有什么作用?本节将一一讲解。

在单线程中,一段代码的执行可能依赖于某个状态,如果不满足状态条件,代码就不会被执行(典型的场景,如:if ... else ...)。在并发环境中,当一个线程判断某个状态条件时,其状态可能是由于其他线程的操作而改变,这时就需要有一定的协调机制来确保在同一时刻,数据只能被一个线程锁修改,且修改的数据状态被所有线程所感知。

Java 1.5 之前,主要是利用 Object 类中的 waitnotifynotifyAll 配合 synchronized 来进行线程间通信。waitnotifynotifyAll 需要配合 synchronized 使用,不适用于 Lock。而使用 Lock 的线程,彼此间通信应该使用 Condition 。这可以理解为,什么样的锁配什么样的钥匙。内置锁(synchronized)配合内置条件队列(waitnotifynotifyAll ),显式锁(Lock)配合显式条件队列(Condition

Condition 的特性

Condition 接口定义如下:

1
2
3
4
5
6
7
8
9
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}

其中,awaitsignalsignalAllwaitnotifynotifyAll 相对应,功能也相似。除此以外,Condition 相比内置条件队列( waitnotifynotifyAll ),提供了更为丰富的功能:

  • 每个锁(Lock)上可以存在多个 Condition,这意味着锁的状态条件可以有多个。
  • 支持公平的或非公平的队列操作。
  • 支持可中断的条件等待,相关方法:awaitUninterruptibly()
  • 支持可定时的等待,相关方法:awaitNanos(long)await(long, TimeUnit)awaitUntil(Date)

Condition 的用法

这里以 Condition 来实现一个消费者、生产者模式。

:bell: 注意:事实上,解决此类问题使用 CountDownLatchSemaphore 等工具更为便捷、安全。想了解详情,可以参考 Java 并发工具类

产品类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class Message {

private final Lock lock = new ReentrantLock();

private final Condition producedMsg = lock.newCondition();

private final Condition consumedMsg = lock.newCondition();

private String message;

private boolean state;

private boolean end;

public void consume() {
//lock
lock.lock();
try {
// no new message wait for new message
while (!state) { producedMsg.await(); }

System.out.println("consume message : " + message);
state = false;
// message consumed, notify waiting thread
consumedMsg.signal();
} catch (InterruptedException ie) {
System.out.println("Thread interrupted - viewMessage");
} finally {
lock.unlock();
}
}

public void produce(String message) {
lock.lock();
try {
// last message not consumed, wait for it be consumed
while (state) { consumedMsg.await(); }

System.out.println("produce msg: " + message);
this.message = message;
state = true;
// new message added, notify waiting thread
producedMsg.signal();
} catch (InterruptedException ie) {
System.out.println("Thread interrupted - publishMessage");
} finally {
lock.unlock();
}
}

public boolean isEnd() {
return end;
}

public void setEnd(boolean end) {
this.end = end;
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class MessageConsumer implements Runnable {

private Message message;

public MessageConsumer(Message msg) {
message = msg;
}

@Override
public void run() {
while (!message.isEnd()) { message.consume(); }
}

}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class MessageProducer implements Runnable {

private Message message;

public MessageProducer(Message msg) {
message = msg;
}

@Override
public void run() {
produce();
}

public void produce() {
List<String> msgs = new ArrayList<>();
msgs.add("Begin");
msgs.add("Msg1");
msgs.add("Msg2");

for (String msg : msgs) {
message.produce(msg);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

message.produce("End");
message.setEnd(true);
}

}

测试

1
2
3
4
5
6
7
8
9
10
public class LockConditionDemo {

public static void main(String[] args) {
Message msg = new Message();
Thread producer = new Thread(new MessageProducer(msg));
Thread consumer = new Thread(new MessageConsumer(msg));
producer.start();
consumer.start();
}
}

ReentrantLock

ReentrantLock 类是 Lock 接口的具体实现,与内置锁 synchronized 相同的是,它是一个可重入锁

ReentrantLock 的特性如下:

  • ReentrantLock 提供了与 synchronized 相同的互斥性、内存可见性和可重入性
  • ReentrantLock 支持公平锁和非公平锁(默认)两种模式。
  • ReentrantLock 实现了 Lock 接口,支持了 synchronized 所不具备的灵活性,增加了轮询、超时、中断等功能。
    • synchronized 无法中断一个正在等待获取锁的线程
    • synchronized 无法在请求获取一个锁时无休止地等待

ReentrantLock 的用法

前文了解了 ReentrantLock 的特性,接下来,我们要讲述其具体用法。

ReentrantLock 的构造方法

ReentrantLock 有两个构造方法:

1
2
public ReentrantLock() {}
public ReentrantLock(boolean fair) {}
  • ReentrantLock() - 默认构造方法会初始化一个非公平锁(NonfairSync)
  • ReentrantLock(boolean) - new ReentrantLock(true) 会初始化一个公平锁(FairSync)

lock 和 unlock 方法

  • lock() - 无条件获取锁。如果当前线程无法获取锁,则当前线程进入休眠状态不可用,直至当前线程获取到锁。如果该锁没有被另一个线程持有,则获取该锁并立即返回,将锁的持有计数设置为 1。
  • unlock() - 用于释放锁

:bell: 注意:请务必牢记,获取锁操作 lock() 必须在 try catch 块中进行,并且将释放锁操作 unlock() 放在 finally 块中进行,以保证锁一定被被释放,防止死锁的发生

示例:ReentrantLock 的基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class ReentrantLockDemo {

public static void main(String[] args) {
Task task = new Task();
MyThread tA = new MyThread("Thread-A", task);
MyThread tB = new MyThread("Thread-B", task);
MyThread tC = new MyThread("Thread-C", task);
tA.start();
tB.start();
tC.start();
}

static class MyThread extends Thread {

private Task task;

public MyThread(String name, Task task) {
super(name);
this.task = task;
}

@Override
public void run() {
task.execute();
}

}

static class Task {

private ReentrantLock lock = new ReentrantLock();

public void execute() {
lock.lock();
try {
for (int i = 0; i < 3; i++) {
System.out.println(lock.toString());

// 查询当前线程 hold 住此锁的次数
System.out.println("\t holdCount: " + lock.getHoldCount());

// 查询正等待获取此锁的线程数
System.out.println("\t queuedLength: " + lock.getQueueLength());

// 是否为公平锁
System.out.println("\t isFair: " + lock.isFair());

// 是否被锁住
System.out.println("\t isLocked: " + lock.isLocked());

// 是否被当前线程持有锁
System.out.println("\t isHeldByCurrentThread: " + lock.isHeldByCurrentThread());

try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
lock.unlock();
}
}

}

}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-A]
holdCount: 1
queuedLength: 2
isFair: false
isLocked: true
isHeldByCurrentThread: true
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-C]
holdCount: 1
queuedLength: 1
isFair: false
isLocked: true
isHeldByCurrentThread: true
// ...

tryLock 方法

与无条件获取锁相比,tryLock 有更完善的容错机制。

  • tryLock() - 可轮询获取锁。如果成功,则返回 true;如果失败,则返回 false。也就是说,这个方法无论成败都会立即返回,获取不到锁(锁已被其他线程获取)时不会一直等待。
  • tryLock(long, TimeUnit) - 可定时获取锁。和 tryLock() 类似,区别仅在于这个方法在获取不到锁时会等待一定的时间,在时间期限之内如果还获取不到锁,就返回 false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回 true。

示例:ReentrantLocktryLock() 操作

修改上个示例中的 execute() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public void execute() {
if (lock.tryLock()) {
try {
for (int i = 0; i < 3; i++) {
// 略。..
}
} finally {
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " 获取锁失败");
}
}

示例:ReentrantLocktryLock(long, TimeUnit) 操作

修改上个示例中的 execute() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void execute() {
try {
if (lock.tryLock(2, TimeUnit.SECONDS)) {
try {
for (int i = 0; i < 3; i++) {
// 略。..
}
} finally {
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " 获取锁失败");
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 获取锁超时");
e.printStackTrace();
}
}

lockInterruptibly 方法

  • lockInterruptibly() - 可中断获取锁。可中断获取锁可以在获得锁的同时保持对中断的响应。可中断获取锁比其它获取锁的方式稍微复杂一些,需要两个 try-catch 块(如果在获取锁的操作中抛出了 InterruptedException ,那么可以使用标准的 try-finally 加锁模式)。
    • 举例来说:假设有两个线程同时通过 lock.lockInterruptibly() 获取某个锁时,若线程 A 获取到了锁,则线程 B 只能等待。若此时对线程 B 调用 threadB.interrupt() 方法能够中断线程 B 的等待过程。由于 lockInterruptibly() 的声明中抛出了异常,所以 lock.lockInterruptibly() 必须放在 try 块中或者在调用 lockInterruptibly() 的方法外声明抛出 InterruptedException

:bell: 注意:当一个线程获取了锁之后,是不会被 interrupt() 方法中断的。单独调用 interrupt() 方法不能中断正在运行状态中的线程,只能中断阻塞状态中的线程。因此当通过 lockInterruptibly() 方法获取某个锁时,如果未获取到锁,只有在等待的状态下,才可以响应中断。

示例:ReentrantLocklockInterruptibly() 操作

修改上个示例中的 execute() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void execute() {
try {
lock.lockInterruptibly();

for (int i = 0; i < 3; i++) {
// 略。..
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断");
e.printStackTrace();
} finally {
lock.unlock();
}
}

newCondition 方法

newCondition() - 返回一个绑定到 Lock 对象上的 Condition 实例。Condition 的特性和具体方法请阅读下文 [Condition](#五 condition)。

ReentrantLock 的原理

ReentrantLock 的可见性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class X {
private final Lock rtl =
new ReentrantLock();
int value;
public void addOne() {
// 获取锁
rtl.lock();
try {
value+=1;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}

ReentrantLock,内部持有一个 volatile 的成员变量 state,获取锁的时候,会读写 state 的值;解锁的时候,也会读写 state 的值(简化后的代码如下面所示)。也就是说,在执行 value+=1 之前,程序先读写了一次 volatile 变量 state,在执行 value+=1 之后,又读写了一次 volatile 变量 state。根据相关的 Happens-Before 规则:

  1. 顺序性规则:对于线程 T1,value+=1 Happens-Before 释放锁的操作 unlock();
  2. volatile 变量规则:由于 state = 1 会先读取 state,所以线程 T1 的 unlock() 操作 Happens-Before 线程 T2 的 lock() 操作;
  3. 传递性规则:线程 T1 的 value+=1 Happens-Before 线程 T2 的 lock() 操作。

ReentrantLock 的数据结构

阅读 ReentrantLock 的源码,可以发现它有一个核心字段:

1
private final Sync sync;
  • sync - 内部抽象类 ReentrantLock.Sync 对象,Sync 继承自 AQS。它有两个子类:
  • ReentrantLock.FairSync - 公平锁。
  • ReentrantLock.NonfairSync - 非公平锁。

查看源码可以发现,ReentrantLock 实现 Lock 接口其实是调用 ReentrantLock.FairSyncReentrantLock.NonfairSync 中各自的实现,这里不一一列举。

ReentrantLock 的获取锁和释放锁

ReentrantLock 获取锁和释放锁的接口,从表象看,是调用 ReentrantLock.FairSyncReentrantLock.NonfairSync 中各自的实现;从本质上看,是基于 AQS 的实现。

仔细阅读源码很容易发现:

  • void lock() 调用 Sync 的 lock() 方法。

  • void lockInterruptibly() 直接调用 AQS 的 [获取可中断的独占锁](#获取可中断的独占锁) 方法 lockInterruptibly()

  • boolean tryLock() 调用 Sync 的 nonfairTryAcquire()

  • boolean tryLock(long time, TimeUnit unit) 直接调用 AQS 的 [获取超时等待式的独占锁](#获取超时等待式的独占锁) 方法 tryAcquireNanos(int arg, long nanosTimeout)

  • void unlock() 直接调用 AQS 的 [释放独占锁](#释放独占锁) 方法 release(int arg)

直接调用 AQS 接口的方法就不再赘述了,其原理在 [AQS 的原理](#AQS 的原理) 中已经用很大篇幅进行过讲解。

nonfairTryAcquire 方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 公平锁和非公平锁都会用这个方法区尝试获取锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
// 如果同步状态为 0,将其设为 acquires,并设置当前线程为排它线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

处理流程很简单:

  • 如果同步状态为 0,设置同步状态设为 acquires,并设置当前线程为排它线程,然后返回 true,获取锁成功。
  • 如果同步状态不为 0 且当前线程为排它线程,设置同步状态为当前状态值+acquires 值,然后返回 true,获取锁成功。
  • 否则,返回 false,获取锁失败。

公平锁和非公平锁

ReentrantLock 这个类有两个构造函数,一个是无参构造函数,一个是传入 fair 参数的构造函数。fair 参数代表的是锁的公平策略,如果传入 true 就表示需要构造一个公平锁,反之则表示要构造一个非公平锁。

锁都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒。

lock 方法在公平锁和非公平锁中的实现:

二者的区别仅在于申请非公平锁时,如果同步状态为 0,尝试将其设为 1,如果成功,直接将当前线程置为排它线程;否则和公平锁一样,调用 AQS 获取独占锁方法 acquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 非公平锁实现
final void lock() {
if (compareAndSetState(0, 1))
// 如果同步状态为 0,将其设为 1,并设置当前线程为排它线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 调用 AQS 获取独占锁方法 acquire
acquire(1);
}

// 公平锁实现
final void lock() {
// 调用 AQS 获取独占锁方法 acquire
acquire(1);
}

ReentrantReadWriteLock

ReadWriteLock 适用于读多写少的场景

ReentrantReadWriteLock 类是 ReadWriteLock 接口的具体实现,它是一个可重入的读写锁。ReentrantReadWriteLock 维护了一对读写锁,将读写锁分开,有利于提高并发效率。

读写锁,并不是 Java 语言特有的,而是一个广为使用的通用技术,所有的读写锁都遵守以下三条基本原则:

  • 允许多个线程同时读共享变量;
  • 只允许一个线程写共享变量;
  • 如果一个写线程正在执行写操作,此时禁止读线程读共享变量。

读写锁与互斥锁的一个重要区别就是读写锁允许多个线程同时读共享变量,而互斥锁是不允许的,这是读写锁在读多写少场景下性能优于互斥锁的关键。但读写锁的写操作是互斥的,当一个线程在写共享变量的时候,是不允许其他线程执行写操作和读操作。

ReentrantReadWriteLock 的特性

ReentrantReadWriteLock 的特性如下:

  • ReentrantReadWriteLock 适用于读多写少的场景。如果是写多读少的场景,由于 ReentrantReadWriteLock 其内部实现比 ReentrantLock 复杂,性能可能反而要差一些。如果存在这样的问题,需要具体问题具体分析。由于 ReentrantReadWriteLock 的读写锁(ReadLockWriteLock)都实现了 Lock 接口,所以要替换为 ReentrantLock 也较为容易。
  • ReentrantReadWriteLock 实现了 ReadWriteLock 接口,支持了 ReentrantLock 所不具备的读写锁分离。ReentrantReadWriteLock 维护了一对读写锁(ReadLockWriteLock)。将读写锁分开,有利于提高并发效率。ReentrantReadWriteLock 的加锁策略是:允许多个读操作并发执行,但每次只允许一个写操作
  • ReentrantReadWriteLock 为读写锁都提供了可重入的加锁语义。
  • ReentrantReadWriteLock 支持公平锁和非公平锁(默认)两种模式。

ReadWriteLock 接口定义如下:

1
2
3
4
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
  • readLock - 返回用于读操作的锁(ReadLock)。
  • writeLock - 返回用于写操作的锁(WriteLock)。

在读写锁和写入锁之间的交互可以采用多种实现方式,ReadWriteLock 的一些可选实现包括:

  • 释放优先 - 当一个写入操作释放写锁,并且队列中同时存在读线程和写线程,那么应该优先选择读线程、写线程,还是最先发出请求的线程?
  • 读线程插队 - 如果锁是由读线程持有,但有写线程正在等待,那么新到达的读线程能否立即获得访问权,还是应该在写线程后面等待?如果允许读线程插队到写线程之前,那么将提高并发性,但可能造成线程饥饿问题。
  • 重入性 - 读锁和写锁是否是可重入的?
  • 降级 - 如果一个线程持有写入锁,那么它能否在不释放该锁的情况下获得读锁?这可能会使得写锁被降级为读锁,同时不允许其他写线程修改被保护的资源。
  • 升级 - 读锁能否优先于其他正在等待的读线程和写线程而升级为一个写锁?在大多数的读写锁实现中并不支持升级,因为如果没有显式的升级操作,那么很容易造成死锁。

ReentrantReadWriteLock 的用法

前文了解了 ReentrantReadWriteLock 的特性,接下来,我们要讲述其具体用法。

ReentrantReadWriteLock 的构造方法

ReentrantReadWriteLockReentrantLock 一样,也有两个构造方法,且用法相似。

1
2
public ReentrantReadWriteLock() {}
public ReentrantReadWriteLock(boolean fair) {}
  • ReentrantReadWriteLock() - 默认构造方法会初始化一个非公平锁(NonfairSync)。在非公平的锁中,线程获得锁的顺序是不确定的。写线程降级为读线程是可以的,但读线程升级为写线程是不可以的(这样会导致死锁)。
  • ReentrantReadWriteLock(boolean) - new ReentrantLock(true) 会初始化一个公平锁(FairSync)。对于公平锁,等待时间最长的线程将优先获得锁。如果这个锁是读线程持有,则另一个线程请求写锁,那么其他读线程都不能获得读锁,直到写线程释放写锁。

ReentrantReadWriteLock 的使用实例

在 [ReentrantReadWriteLock 的特性](#reentrantreadwritelock-的特性) 中已经介绍过,ReentrantReadWriteLock 的读写锁(ReadLockWriteLock) 都实现了 Lock 接口,所以其各自独立的使用方式与 ReentrantLock 一样,这里不再赘述。

ReentrantReadWriteLockReentrantLock 用法上的差异,主要在于读写锁的配合使用。本文以一个典型使用场景来进行讲解。

【示例】基于 ReadWriteLock 实现一个简单的泛型无界缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* 简单的无界缓存实现
* <p>
* 使用 WeakHashMap 存储键值对。WeakHashMap 中存储的对象是弱引用,JVM GC 时会自动清除没有被引用的弱引用对象。
*/
static class UnboundedCache<K, V> {

private final Map<K, V> cacheMap = new WeakHashMap<>();

private final ReadWriteLock cacheLock = new ReentrantReadWriteLock();

public V get(K key) {
cacheLock.readLock().lock();
V value;
try {
value = cacheMap.get(key);
String log = String.format("%s 读数据 %s:%s", Thread.currentThread().getName(), key, value);
System.out.println(log);
} finally {
cacheLock.readLock().unlock();
}
return value;
}

public V put(K key, V value) {
cacheLock.writeLock().lock();
try {
cacheMap.put(key, value);
String log = String.format("%s 写入数据 %s:%s", Thread.currentThread().getName(), key, value);
System.out.println(log);
} finally {
cacheLock.writeLock().unlock();
}
return value;
}

public V remove(K key) {
cacheLock.writeLock().lock();
try {
return cacheMap.remove(key);
} finally {
cacheLock.writeLock().unlock();
}
}

public void clear() {
cacheLock.writeLock().lock();
try {
this.cacheMap.clear();
} finally {
cacheLock.writeLock().unlock();
}
}

}

说明:

  • 使用 WeakHashMap 而不是 HashMap 来存储键值对。WeakHashMap 中存储的对象是弱引用,JVM GC 时会自动清除没有被引用的弱引用对象。
  • Map 写数据前加写锁,写完后,释放写锁。
  • Map 读数据前加读锁,读完后,释放读锁。

测试其线程安全性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @since 2020-01-01
*/
public class ReentrantReadWriteLockDemo {

static UnboundedCache<Integer, Integer> cache = new UnboundedCache<>();

public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
executorService.execute(new MyThread());
cache.get(0);
}
executorService.shutdown();
}

/** 线程任务每次向缓存中写入 3 个随机值,key 固定 */
static class MyThread implements Runnable {

@Override
public void run() {
Random random = new Random();
for (int i = 0; i < 3; i++) {
cache.put(i, random.nextInt(100));
}
}

}

}

说明:示例中,通过线程池启动 20 个并发任务。任务每次向缓存中写入 3 个随机值,key 固定;然后主线程每次固定读取缓存中第一个 key 的值。

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
main 读数据 0:null
pool-1-thread-1 写入数据 0:16
pool-1-thread-1 写入数据 1:58
pool-1-thread-1 写入数据 2:50
main 读数据 0:16
pool-1-thread-1 写入数据 0:85
pool-1-thread-1 写入数据 1:76
pool-1-thread-1 写入数据 2:46
pool-1-thread-2 写入数据 0:21
pool-1-thread-2 写入数据 1:41
pool-1-thread-2 写入数据 2:63
main 读数据 0:21
// ...

ReentrantReadWriteLock 的原理

前面了解了 ReentrantLock 的原理,理解 ReentrantReadWriteLock 就容易多了。

ReentrantReadWriteLock 的数据结构

阅读 ReentrantReadWriteLock 的源码,可以发现它有三个核心字段:

1
2
3
4
5
6
7
8
9
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
  • sync - 内部类 ReentrantReadWriteLock.Sync 对象。与 ReentrantLock 类似,它有两个子类:ReentrantReadWriteLock.FairSyncReentrantReadWriteLock.NonfairSync ,分别表示公平锁和非公平锁的实现。
  • readerLock - 内部类 ReentrantReadWriteLock.ReadLock 对象,这是一把读锁。
  • writerLock - 内部类 ReentrantReadWriteLock.WriteLock 对象,这是一把写锁。

ReentrantReadWriteLock 的获取锁和释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static class ReadLock implements Lock, java.io.Serializable {

// 调用 AQS 获取共享锁方法
public void lock() {
sync.acquireShared(1);
}

// 调用 AQS 释放共享锁方法
public void unlock() {
sync.releaseShared(1);
}
}

public static class WriteLock implements Lock, java.io.Serializable {

// 调用 AQS 获取独占锁方法
public void lock() {
sync.acquire(1);
}

// 调用 AQS 释放独占锁方法
public void unlock() {
sync.release(1);
}
}

StampedLock

ReadWriteLock 支持两种模式:一种是读锁,一种是写锁。而 StampedLock 支持三种模式,分别是:写锁悲观读锁乐观读。其中,写锁、悲观读锁的语义和 ReadWriteLock 的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的。不同的是:StampedLock 里的写锁和悲观读锁加锁成功之后,都会返回一个 stamp;然后解锁的时候,需要传入这个 stamp。

注意这里,用的是“乐观读”这个词,而不是“乐观读锁”,是要提醒你,乐观读这个操作是无锁的,所以相比较 ReadWriteLock 的读锁,乐观读的性能更好一些。

StampedLock 的性能之所以比 ReadWriteLock 还要好,其关键是 StampedLock 支持乐观读的方式。

  • ReadWriteLock 支持多个线程同时读,但是当多个线程同时读的时候,所有的写操作会被阻塞;
  • 而 StampedLock 提供的乐观读,是允许一个线程获取写锁的,也就是说不是所有的写操作都被阻塞。

对于读多写少的场景 StampedLock 性能很好,简单的应用场景基本上可以替代 ReadWriteLock,但是,StampedLock 的功能仅仅是 ReadWriteLock 的子集,在使用的时候,还是有几个地方需要注意一下。

  • StampedLock 不支持重入
  • StampedLock 的悲观读锁、写锁都不支持条件变量。
  • 如果线程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上时,此时调用该阻塞线程的 interrupt() 方法,会导致 CPU 飙升。**使用 StampedLock 一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly()**。

【示例】StampedLock 阻塞时,调用 interrupt() 导致 CPU 飙升

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final StampedLock lock = new StampedLock();
Thread T1 = new Thread(() -> {
// 获取写锁
lock.writeLock();
// 永远阻塞在此处,不释放写锁
LockSupport.park();
});
T1.start();
// 保证 T1 获取写锁
Thread.sleep(100);
Thread T2 = new Thread(() ->
// 阻塞在悲观读锁
lock.readLock()
);
T2.start();
// 保证 T2 阻塞在读锁
Thread.sleep(100);
// 中断线程 T2
// 会导致线程 T2 所在 CPU 飙升
T2.interrupt();
T2.join();

【示例】StampedLock 读模板:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final StampedLock sl = new StampedLock();

// 乐观读
long stamp = sl.tryOptimisticRead();
// 读入方法局部变量
// ......
// 校验 stamp
if (!sl.validate(stamp)) {
// 升级为悲观读锁
stamp = sl.readLock();
try {
// 读入方法局部变量
// .....
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}
// 使用方法局部变量执行业务操作
// ......

【示例】StampedLock 写模板:

1
2
3
4
5
6
7
long stamp = sl.writeLock();
try {
// 写共享变量
......
} finally {
sl.unlockWrite(stamp);
}

参考资料

Java 并发之无锁

并发安全需要保证几个基本特性:

  • 可见性 - 是一个线程修改了某个共享变量,其状态能够立即被其他线程知晓,通常被解释为将线程本地状态反映到主内存上,volatile 就是负责保证可见性的。
  • 有序性 - 是保证线程内串行语义,避免指令重排等。
  • 原子性 - 简单说就是相关操作不会中途被其他线程干扰,一般通过互斥机制(加锁:sychronizedLock)实现。

互斥同步是最常见的原子性保障手段。互斥同步最主要的问题是线程阻塞和唤醒所带来的性能问题。因此,互斥同步也被称为阻塞同步。互斥同步属于一种悲观的并发策略,总是认为只要不去做正确的同步措施,那就肯定会出现问题。无论共享数据是否真的会出现竞争,它都要进行加锁(这里讨论的是概念模型,实际上虚拟机会优化掉很大一部分不必要的加锁)、用户态核心态转换、维护锁计数器和检查是否有被阻塞的线程需要唤醒等操作。

解决并发安全问题,还可以采用无锁方案。无锁方案相对互斥锁方案,最大的好处就是性能。互斥锁方案为了保证互斥性,需要执行加锁、解锁操作,而加锁、解锁操作本身就消耗性能;同时拿不到锁的线程还会进入阻塞状态,进而触发线程切换,线程切换对性能的消耗也很大。 相比之下,无锁方案则完全没有加锁、解锁的性能消耗,同时还能保证互斥性。

Java 中的无锁技术有:

  • CAS
  • 原子类
  • ThreadLocal
  • Copy-on-Write
  • 不变模式

CAS

CAS 的要点

CAS(Compare and Swap),字面意思为比较并交换。

CAS 涉及三个操作数:

  • V:需要读写的内存位置
  • A:进行比较的值
  • B:拟写入的新值

当且仅当 V 的值等于 A 时,才会通过原子方式用新值 B 来更新 A 的值,否则什么都不做

CAS 实际是乐观锁的一种实现方式,因此,CAS 只适用于线程冲突较少的情况

CAS 的应用

CAS 的典型应用场景是:

  • 原子类
  • 自旋锁

原子类

原子类是 CAS 在 Java 中最典型的应用。

我们先来看一个常见的代码片段。

1
2
3
if(a==b) {
a++;
}

如果 a++ 执行前, a 的值被修改了怎么办?还能得到预期值吗?出现该问题的原因是在并发环境下,以上代码片段不是原子操作,随时可能被其他线程所篡改。

解决这种问题的最经典方式是应用原子类的 incrementAndGet 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class AtomicIntegerDemo {

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
final AtomicInteger count = new AtomicInteger(0);
for (int i = 0; i < 10; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
count.incrementAndGet();
}
});
}

executorService.shutdown();
executorService.awaitTermination(3, TimeUnit.SECONDS);
System.out.println("Final Count is : " + count.get());
}

}

J.U.C 包中提供了 AtomicBooleanAtomicIntegerAtomicLong 分别针对 BooleanIntegerLong 执行原子操作,操作和上面的示例大体相似,不做赘述。

自旋锁

利用原子类(本质上是 CAS),可以实现自旋锁。

所谓自旋锁,是指线程反复检查锁变量是否可用,直到成功为止。由于线程在这一过程中保持执行,因此是一种忙等待。一旦获取了自旋锁,线程会一直保持该锁,直至显式释放自旋锁。

示例:非线程安全示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class AtomicReferenceDemo {

private static int ticket = 10;

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executorService.execute(new MyThread());
}
executorService.shutdown();
}

static class MyThread implements Runnable {

@Override
public void run() {
while (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}
}

}

}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
pool-1-thread-2 卖出了第 10 张票
pool-1-thread-1 卖出了第 10 张票
pool-1-thread-3 卖出了第 10 张票
pool-1-thread-1 卖出了第 8 张票
pool-1-thread-2 卖出了第 9 张票
pool-1-thread-1 卖出了第 6 张票
pool-1-thread-3 卖出了第 7 张票
pool-1-thread-1 卖出了第 4 张票
pool-1-thread-2 卖出了第 5 张票
pool-1-thread-1 卖出了第 2 张票
pool-1-thread-3 卖出了第 3 张票
pool-1-thread-2 卖出了第 1 张票

很明显,出现了重复售票的情况。

【示例】使用自旋锁来保证线程安全

可以通过自旋锁这种非阻塞同步来保证线程安全,下面使用 AtomicReference 来实现一个自旋锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class AtomicReferenceDemo2 {

private static int ticket = 10;

public static void main(String[] args) {
threadSafeDemo();
}

private static void threadSafeDemo() {
SpinLock lock = new SpinLock();
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executorService.execute(new MyThread(lock));
}
executorService.shutdown();
}

static class SpinLock {

private AtomicReference<Thread> atomicReference = new AtomicReference<>();

public void lock() {
Thread current = Thread.currentThread();
while (!atomicReference.compareAndSet(null, current)) {}
}

public void unlock() {
Thread current = Thread.currentThread();
atomicReference.compareAndSet(current, null);
}

}

static class MyThread implements Runnable {

private SpinLock lock;

public MyThread(SpinLock lock) {
this.lock = lock;
}

@Override
public void run() {
while (ticket > 0) {
lock.lock();
if (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}
lock.unlock();
}
}

}

}

输出结果:

1
2
3
4
5
6
7
8
9
10
pool-1-thread-2 卖出了第 10 张票
pool-1-thread-1 卖出了第 9 张票
pool-1-thread-3 卖出了第 8 张票
pool-1-thread-2 卖出了第 7 张票
pool-1-thread-3 卖出了第 6 张票
pool-1-thread-1 卖出了第 5 张票
pool-1-thread-2 卖出了第 4 张票
pool-1-thread-1 卖出了第 3 张票
pool-1-thread-3 卖出了第 2 张票
pool-1-thread-1 卖出了第 1 张票

CAS 的原理

在 Java 中,主要利用 Unsafe 这个类实现 CAS

Unsafe 类位于 sun.misc 包下,是一个提供低级别、不安全操作的类。由于其强大的功能和潜在的危险性,它通常用于 JVM 内部或一些需要极高性能和底层访问的库中,而不推荐普通开发者在应用程序中使用。

Unsafe 类提供了 compareAndSwapObjectcompareAndSwapIntcompareAndSwapLong方法来实现的对 Objectintlong 类型的 CAS 操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 以原子方式更新对象字段的值。
*/
boolean compareAndSwapObject(Object o, long offset, Object expected, Object x);

/**
* 以原子方式更新 int 类型的对象字段的值。
*/
boolean compareAndSwapInt(Object o, long offset, int expected, int x);

/**
* 以原子方式更新 long 类型的对象字段的值。
*/
boolean compareAndSwapLong(Object o, long offset, long expected, long x);

Unsafe 类中的 CAS 方法是 native 方法。native 关键字表明这些方法是用本地代码(通常是 C 或 C++)实现的,而不是用 Java 实现的。这些方法直接调用底层的、具有原子性的 CPU 指令来实现。

由于 CAS 操作可能会因为并发冲突而失败,因此通常会伴随着自旋,而所谓自旋,其实就是循环尝试。

Unsafe#getAndAddInt 源码:

1
2
3
4
5
6
7
8
9
10
// 原子地获取并增加整数值
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
// 以 volatile 方式获取对象 o 在内存偏移量 offset 处的整数值
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
// 返回旧值
return v;
}

CAS 的问题

一般情况下,CAS 比锁性能更高。因为 CAS 是一种非阻塞算法,所以其避免了线程阻塞和唤醒的等待时间。但是,事物总会有利有弊,CAS 也存在三大问题:

  • ABA 问题
  • 循环时间长开销大
  • 只能保证一个共享变量的原子性

ABA 问题

如果一个变量初次读取的时候是 A 值,它的值被改成了 B,后来又被改回为 A,那 CAS 操作就会误认为它从来没有被改变过,这就是 ABA 问题

ABA 问题的解决思路是在变量前面追加上版本号或者时间戳。J.U.C 包提供了一个带有标记的原子引用类 AtomicStampedReference 来解决这个问题,它可以通过控制变量值的版本来保证 CAS 的正确性。大部分情况下 ABA 问题不会影响程序并发的正确性,如果需要解决 ABA 问题,改用传统的互斥同步可能会比原子类更高效

循环时间长开销大

自旋 CAS (不断尝试,直到成功为止)如果长时间不成功,会给 CPU 带来非常大的执行开销

如果 JVM 能支持处理器提供的 pause 指令那么效率会有一定的提升,pause 指令有两个作用:

  • 它可以延迟流水线执行指令(de-pipeline), 使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。
  • 它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起 CPU 流水线被清空(CPU pipeline flush),从而提高 CPU 的执行效率。

比较花费 CPU 资源,即使没有任何用也会做一些无用功。

只能保证一个共享变量的原子性

当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就可以用锁。

或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量 i = 2, j = a,合并一下 ij=2a,然后用 CAS 来操作 ij。从 Java 1.5 开始 JDK 提供了 AtomicReference 类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行 CAS 操作。

原子类

原子类简介

原子性是确保并发安全三大特性之一。为了兼顾原子性以及锁带来的性能问题,Java 引入了 CAS (主要体现在 Unsafe 类)来实现非阻塞同步(也叫乐观锁),CAS 底层基于 CPU 指令(硬件支持)支持,具有原子性。并基于 CAS ,提供了一套原子工具类。

原子类比锁的粒度更细,更轻量级,并且对于在多处理器系统上实现高性能的并发代码来说是非常关键的。原子变量将发生竞争的范围缩小到单个变量上。

原子类相当于一种泛化的 volatile 变量,能够支持原子的、有条件的读/改/写操作。

原子类可以分为 5 个类别,这 5 个类别提供的方法基本上是相似的:

  • 基本数据类型
    • AtomicBoolean - 布尔类型原子类
    • AtomicInteger - 整型原子类
    • AtomicLong - 长整型原子类
  • 引用数据类型
    • AtomicReference - 引用类型原子类
    • AtomicMarkableReference - 带有标记位的引用类型原子类
    • AtomicStampedReference - 带有版本号的引用类型原子类
  • 数组数据类型
    • AtomicIntegerArray - 整形数组原子类
    • AtomicLongArray - 长整型数组原子类
    • AtomicReferenceArray - 引用类型数组原子类
  • 属性更新器类型
    • AtomicIntegerFieldUpdater - 整型字段的原子更新器
    • AtomicLongFieldUpdater - 长整型字段的原子更新器
    • AtomicReferenceFieldUpdater - 原子更新引用类型里的字段
  • 累加器
    • DoubleAdder - 浮点型原子累加器
    • LongAdder - 长整型原子累加器
    • DoubleAccumulator - 更复杂的浮点型原子累加器
    • LongAccumulator - 更复杂的长整型原子累加器

原子类之基本数据类型

基本数据类型原子类针对 Java 基本类型提供原子操作

  • AtomicBoolean - 布尔类型原子类
  • AtomicInteger - 整型原子类
  • AtomicLong - 长整型原子类

以上类都支持 CAS(compare-and-swap)技术,此外,AtomicIntegerAtomicLong 还支持算术运算。

:bulb: 提示:

虽然 Java 只提供了 AtomicBooleanAtomicIntegerAtomicLong,但是可以模拟其他基本类型的原子变量。要想模拟其他基本类型的原子变量,可以将 shortbyte 等类型与 int 类型进行转换,以及使用 Float.floatToIntBitsDouble.doubleToLongBits 来转换浮点数。

由于 AtomicBooleanAtomicIntegerAtomicLong 实现方式、使用方式都相近,所以本文仅针对 AtomicInteger 进行介绍。

AtomicInteger 用法

1
2
3
4
5
6
7
public final int get() // 获取当前值
public final int getAndSet(int newValue) // 获取当前值,并设置新值
public final int getAndIncrement()// 获取当前值,并自增
public final int getAndDecrement() // 获取当前值,并自减
public final int getAndAdd(int delta) // 获取当前值,并加上预期值
boolean compareAndSet(int expect, int update) // 如果输入值(update)等于预期值,将该值设置为输入值
public final void lazySet(int newValue) // 最终设置为 newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

AtomicInteger 使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class AtomicIntegerDemo {

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
AtomicInteger count = new AtomicInteger(0);
for (int i = 0; i < 1000; i++) {
executorService.submit((Runnable) () -> {
System.out.println(Thread.currentThread().getName() + " count=" + count.get());
count.incrementAndGet();
});
}

executorService.shutdown();
executorService.awaitTermination(30, TimeUnit.SECONDS);
System.out.println("Final Count is : " + count.get());
}
}

AtomicInteger 实现

阅读 AtomicInteger 源码,可以看到如下定义:

1
2
3
4
5
6
7
8
9
10
11
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

private volatile int value;

说明:

  • value - value 属性使用 volatile 修饰,使得对 value 的修改在并发环境下对所有线程可见。
  • valueOffset - value 属性的偏移量,通过这个偏移量可以快速定位到 value 字段,这个是实现 AtomicInteger 的关键。
  • unsafe - Unsafe 类型的属性,它为 AtomicInteger 提供了 CAS 操作。

原子类之引用数据类型

Java 数据类型分为 基本数据类型引用数据类型 两大类(不了解 Java 数据类型划分可以参考: Java 基本数据类型 )。

上一节中提到了针对基本数据类型的原子类,那么如果想针对引用类型做原子操作怎么办?Java 也提供了相关的原子类:

  • AtomicReference - 引用类型原子类
  • AtomicMarkableReference - 带有标记位的引用类型原子类
  • AtomicStampedReference - 带有版本号的引用类型原子类

AtomicStampedReference 类在引用类型原子类中,彻底地解决了 ABA 问题,其它的 CAS 能力与另外两个类相近,所以最具代表性。因此,本节只针对 AtomicStampedReference 进行说明。

::: tabs#原子类之引用类型示例

@tab AtomicReference 使用示例

【示例】基于 AtomicReference 实现一个简单的自旋锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class AtomicReferenceDemo2 {

private static int ticket = 10;

public static void main(String[] args) {
threadSafeDemo();
}

private static void threadSafeDemo() {
SpinLock lock = new SpinLock();
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executorService.execute(new MyThread(lock));
}
executorService.shutdown();
}

/**
* 基于 {@link AtomicReference} 实现的简单自旋锁
*/
static class SpinLock {

private AtomicReference<Thread> atomicReference = new AtomicReference<>();

public void lock() {
Thread current = Thread.currentThread();
while (!atomicReference.compareAndSet(null, current)) {}
}

public void unlock() {
Thread current = Thread.currentThread();
atomicReference.compareAndSet(current, null);
}

}

/**
* 利用自旋锁 {@link SpinLock} 并发处理数据
*/
static class MyThread implements Runnable {

private SpinLock lock;

public MyThread(SpinLock lock) {
this.lock = lock;
}

@Override
public void run() {
while (ticket > 0) {
lock.lock();
if (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}
lock.unlock();
}
}

}

}

@tab AtomicMarkableReference 使用示例

【示例】AtomicMarkableReference 使用示例(解决 ABA 问题)

原子类的实现基于 CAS 机制,而 CAS 存在 ABA 问题(不了解 ABA 问题,可以参考:Java 并发基础机制 - CAS 的问题)。正是为了解决 ABA 问题,才有了 AtomicMarkableReferenceAtomicStampedReference

AtomicMarkableReference 使用一个布尔值作为标记,修改时在 true / false 之间切换。这种策略不能根本上解决 ABA 问题,但是可以降低 ABA 发生的几率。常用于缓存或者状态描述这样的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class AtomicMarkableReferenceDemo {

private final static String INIT_TEXT = "abc";

public static void main(String[] args) throws InterruptedException {

final AtomicMarkableReference<String> amr = new AtomicMarkableReference<>(INIT_TEXT, false);

ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(Math.abs((int) (Math.random() * 100)));
} catch (InterruptedException e) {
e.printStackTrace();
}

String name = Thread.currentThread().getName();
if (amr.compareAndSet(INIT_TEXT, name, amr.isMarked(), !amr.isMarked())) {
System.out.println(Thread.currentThread().getName() + " 修改了对象!");
System.out.println("新的对象为:" + amr.getReference());
}
}
});
}

executorService.shutdown();
executorService.awaitTermination(3, TimeUnit.SECONDS);
}

}

@tab AtomicStampedReference 使用示例

【示例】AtomicStampedReference 使用示例

AtomicStampedReference 使用一个整型值做为版本号,每次更新前先比较版本号,如果一致,才进行修改。通过这种策略,可以根本上解决 ABA 问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class AtomicStampedReferenceDemo {

private final static String INIT_REF = "pool-1-thread-3";

private final static AtomicStampedReference<String> asr = new AtomicStampedReference<>(INIT_REF, 0);

public static void main(String[] args) throws InterruptedException {

System.out.println("初始对象为:" + asr.getReference());

ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
executorService.execute(new MyThread());
}

executorService.shutdown();
executorService.awaitTermination(3, TimeUnit.SECONDS);
}

static class MyThread implements Runnable {

@Override
public void run() {
try {
Thread.sleep(Math.abs((int) (Math.random() * 100)));
} catch (InterruptedException e) {
e.printStackTrace();
}

final int stamp = asr.getStamp();
if (asr.compareAndSet(INIT_REF, Thread.currentThread().getName(), stamp, stamp + 1)) {
System.out.println(Thread.currentThread().getName() + " 修改了对象!");
System.out.println("新的对象为:" + asr.getReference());
}
}

}

}

:::

原子类之数组数据类型

Java 提供了以下针对数组的原子类:

  • AtomicIntegerArray - 整形数组原子类
  • AtomicLongArray - 长整型数组原子类
  • AtomicReferenceArray - 引用类型数组原子类

已经有了针对基本类型和引用类型的原子类,为什么还要提供针对数组的原子类呢?

数组类型的原子类为数组元素提供了 volatile 类型的访问语义,这是普通数组所不具备的特性——**volatile 类型的数组仅在数组引用上具有 volatile 语义**。

【示例】AtomicIntegerArray 使用示例(AtomicLongArrayAtomicReferenceArray 使用方式也类似)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class AtomicIntegerArrayDemo {

private static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);

public static void main(final String[] arguments) throws InterruptedException {

System.out.println("Init Values: ");
for (int i = 0; i < atomicIntegerArray.length(); i++) {
atomicIntegerArray.set(i, i);
System.out.print(atomicIntegerArray.get(i) + " ");
}
System.out.println();

Thread t1 = new Thread(new Increment());
Thread t2 = new Thread(new Compare());
t1.start();
t2.start();

t1.join();
t2.join();

System.out.println("Final Values: ");
for (int i = 0; i < atomicIntegerArray.length(); i++) {
System.out.print(atomicIntegerArray.get(i) + " ");
}
System.out.println();
}

static class Increment implements Runnable {

@Override
public void run() {

for (int i = 0; i < atomicIntegerArray.length(); i++) {
int value = atomicIntegerArray.incrementAndGet(i);
System.out.println(Thread.currentThread().getName() + ", index = " + i + ", value = " + value);
}
}

}

static class Compare implements Runnable {

@Override
public void run() {
for (int i = 0; i < atomicIntegerArray.length(); i++) {
boolean swapped = atomicIntegerArray.compareAndSet(i, 2, 3);
if (swapped) {
System.out.println(Thread.currentThread().getName() + " swapped, index = " + i + ", value = 3");
}
}
}

}

}

原子类之属性更新器

属性更新器支持基于反射机制的更新字段值的原子操作

  • AtomicIntegerFieldUpdater - 整型字段的原子更新器。
  • AtomicLongFieldUpdater - 长整型字段的原子更新器。
  • AtomicReferenceFieldUpdater - 原子更新引用类型里的字段。

这些类的使用有一定限制:

  • 因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法 newUpdater() 创建一个更新器,并且需要设置想要更新的类和属性。
  • 字段必须是 volatile 类型的;
  • 不能作用于静态变量(static);
  • 不能作用于常量(final);

【示例】AtomicReferenceFieldUpdater 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class AtomicReferenceFieldUpdaterDemo {

static User user = new User("begin");

static AtomicReferenceFieldUpdater<User, String> updater =
AtomicReferenceFieldUpdater.newUpdater(User.class, String.class, "name");

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executorService.execute(new MyThread());
}
executorService.shutdown();
}

static class MyThread implements Runnable {

@Override
public void run() {
if (updater.compareAndSet(user, "begin", "end")) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 已修改 name = " + user.getName());
} else {
System.out.println(Thread.currentThread().getName() + " 已被其他线程修改");
}
}

}

static class User {

volatile String name;

public User(String name) {
this.name = name;
}

public String getName() {
return name;
}

public User setName(String name) {
this.name = name;
return this;
}

}

}

原子类之累加器

DoubleAccumulatorDoubleAdderLongAccumulatorLongAdder,这四个类仅仅用来执行累加操作,相比原子化的基本数据类型,速度更快,但是不支持 compareAndSet() 方法。如果你仅仅需要累加操作,使用原子化的累加器性能会更好,代价就是会消耗更多的内存空间。

LongAdder 内部由一个 base 变量和一个 cell[] 数组组成。

  • 当只有一个写线程,没有竞争的情况下,LongAdder 会直接使用 base 变量作为原子操作变量,通过 CAS 操作修改变量;
  • 当有多个写线程竞争的情况下,除了占用 base 变量的一个写线程之外,其它各个线程会将修改的变量写入到自己的槽 cell[] 数组中。

我们可以发现,LongAdder 在操作后的返回值只是一个近似准确的数值,但是 LongAdder 最终返回的是一个准确的数值, 所以在一些对实时性要求比较高的场景下,LongAdder 并不能取代 AtomicIntegerAtomicLong

ThreadLocal

在多线程环境下,共享变量存在并发安全问题。换个思路,如果变量非共享,而是各个线程独享,就不会有并发安全问题。这种思想有个术语叫线程封闭,其本质上就是避免共享。没有共享,自然也就没有并发安全问题。在 Java 中,ThreadLocal 正是根据这个思路而设计的。

ThreadLocal 为每个线程都创建了一个本地副本,这个副本只能被当前线程访问,其他线程无法访问,那么自然是线程安全的。

ThreadLocal 的应用

ThreadLocal 的方法:

1
2
3
4
5
6
public class ThreadLocal<T> {
public T get() {}
public void set(T value) {}
public void remove() {}
public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {}
}

说明:

  • get - 用于获取 ThreadLocal 在当前线程中保存的变量副本。
  • set - 用于设置当前线程中变量的副本。
  • remove - 用于删除当前线程中变量的副本。如果此线程局部变量随后被当前线程读取,则其值将通过调用其 initialValue 方法重新初始化,除非其值由中间线程中的当前线程设置。 这可能会导致当前线程中多次调用 initialValue 方法。
  • initialValue - 为 ThreadLocal 设置默认的 get 初始值,需要重写 initialValue 方法 。

ThreadLocal 常用于防止对可变的单例(Singleton)变量或全局变量进行共享。典型应用场景有:管理数据库连接、Session 管理等。

::: tabs#ThreadLocal 应用示例

@tab 数据库连接

【示例】数据库连接

1
2
3
4
5
6
7
8
9
10
private static ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>() {
@Override
public Connection initialValue() {
return DriverManager.getConnection(DB_URL);
}
};

public static Connection getConnection() {
return connectionHolder.get();
}

@tab Session 管理

【示例】Session 管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static final ThreadLocal<Session> sessionHolder = new ThreadLocal<>();

public static Session getSession() {
Session session = (Session) sessionHolder.get();
try {
if (session == null) {
session = createSession();
sessionHolder.set(session);
}
} catch (Exception e) {
e.printStackTrace();
}
return session;
}

@tab 线程安全的 SimpleDateFormat

【示例】线程安全的 SimpleDateFormat

SimpleDateFormat 不是线程安全的,如果要保证并发安全,可以使用 ThreadLocal 来解决。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class SafeDateFormat {

//定义 ThreadLocal 变量
static final ThreadLocal<DateFormat>
tl = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));

static DateFormat get() {
return tl.get();
}

public static void main(String[] args) {
//不同线程执行下面代码
//返回的 df 是不同的
DateFormat df = SafeDateFormat.get();
}

}

@tab 完整使用 ThreadLocal 示例

【示例】完整使用 ThreadLocal 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ThreadLocalDemo {

private static ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;
}
};

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executorService.execute(new MyThread());
}
executorService.shutdown();
}

static class MyThread implements Runnable {

@Override
public void run() {
int count = threadLocal.get();
for (int i = 0; i < 10; i++) {
try {
count++;
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
threadLocal.set(count);
threadLocal.remove();
System.out.println(Thread.currentThread().getName() + " : " + count);
}

}

}

:::

ThreadLocal 的原理

存储结构

Thread 类中维护着 2 个 ThreadLocal.ThreadLocalMap 类型的成员 threadLocals 和 inheritableThreadLocals 。这 2 个成员就是用来存储当前线程独占的变量副本。

ThreadLocalMapThreadLocal 的内部类,它维护着一个 Entry 数组,**Entry 继承了 WeakReference** ,所以是弱引用。 Entry 用于保存键值对,其中:

  • keyThreadLocal 对象
  • value 是传递进来的对象(变量副本)

ThreadLocal 关键源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Thread implements Runnable {
// ...
ThreadLocal.ThreadLocalMap threadLocals = null;
// ...
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
}

static class ThreadLocalMap {
// ...
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
// ...
}

如何解决 Hash 冲突

ThreadLocalMap 虽然是类似 Map 结构的数据结构,但它并没有实现 Map 接口。它不支持 Map 接口中的 next 方法,这意味着 ThreadLocalMap 中解决 Hash 冲突的方式并非 拉链表 方式。

实际上,**ThreadLocalMap 采用线性探测的方式来解决 Hash 冲突**。所谓线性探测,就是根据初始 key 的 hashcode 值确定元素在 table 数组中的位置,如果发现这个位置上已经被其他的 key 值占用,则利用固定的算法寻找一定步长的下个位置,依次判断,直至找到能够存放的位置。

内存泄漏问题

ThreadLocal 仅仅是一个代理工具类,内部并不持有任何与线程相关的数据,所有和线程相关的数据都存储在 Thread 里面,这样的设计容易理解。

当然还有一个更加深层次的原因,那就是不容易产生内存泄露。如果 ThreadLocal 和实际实现反其道而行之:将 Thread 的引用维护在一个 Map 中,就会出现这种情况——只要 ThreadLocal 对象存在,那么 Map 中的 Thread 对象就永远不会被回收。而 ThreadLocal 的生命周期往往都比线程要长,所以这种设计方案很容易导致内存泄露。而 Java 的实现中 Thread 持有 ThreadLocalMap,而且 ThreadLocalMap 里对 ThreadLocal 的引用还是弱引用(WeakReference),所以只要 Thread 对象可以被回收,那么 ThreadLocalMap 就能被回收。Java 的这种实现方案虽然看上去复杂一些,但是更加安全。

ThreadLocalMapEntry 继承了 WeakReference,所以它的 key (ThreadLocal 对象)是弱引用,而 value (变量副本)是强引用。如果 ThreadLocal 对象没有外部强引用来引用它,那么 ThreadLocal 对象会在下次 GC 时被回收。此时,Entry 中的 key 已经被回收,但是 value 由于是强引用不会被垃圾收集器回收。如果创建 ThreadLocal 的线程一直持续运行,那么 value 就会一直得不到回收,从而导致内存泄露

那么如何避免内存泄漏呢?方法就是:使用 ThreadLocalset 方法后,在 try {} finally {} 中显示的调用 remove 方法

1
2
3
4
5
6
7
8
9
10
11
12
ExecutorService es;
ThreadLocal tl;
es.execute(() -> {
//ThreadLocal 增加变量
tl.set(obj);
try {
// 省略业务逻辑代码
} finally {
//手动清理 ThreadLocal
tl.remove();
}
});

ThreadLocal 的误区

示例摘自:极客时间教程 - Java 业务开发常见错误 100 例

ThreadLocal 适用于变量在线程间隔离,而在方法或类间共享的场景。

前文提到,ThreadLocal 是线程隔离的,那么是不是使用 ThreadLocal 就一定高枕无忧呢?

ThreadLocal 错误案例

使用 Spring Boot 创建一个 Web 应用程序,使用 ThreadLocal 存放一个 Integer 的值,来暂且代表需要在线程中保存的用户信息,这个值初始是 null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);

@GetMapping("wrong")
public Map<String, String> wrong(@RequestParam("id") Integer userId) {
//设置用户信息之前先查询一次 ThreadLocal 中的用户信息
String before = Thread.currentThread().getName() + ":" + currentUser.get();
//设置用户信息到 ThreadLocal
currentUser.set(userId);
//设置用户信息之后再查询一次 ThreadLocal 中的用户信息
String after = Thread.currentThread().getName() + ":" + currentUser.get();
//汇总输出两次查询结果
Map<String, String> result = new HashMap<>();
result.put("before", before);
result.put("after", after);
return result;
}

【预期】从代码逻辑来看,我们预期第一次获取的值始终应该是 null。

【实际】

为了方便复现,将 Tomcat 工作线程设为 1:

1
server.tomcat.max-threads=1

当访问 id = 1 时,符合预期

img

当访问 id = 2 时,before 的应答不是 null,而是 1,不符合预期。

【分析】实际情况和预期存在偏差。Spring Boot 程序运行在 Tomcat 中,执行程序的线程是 Tomcat 的工作线程,而 Tomcat 的工作线程是基于线程池的。线程池会重用固定的几个线程,一旦线程重用,那么很可能首次从
ThreadLocal 获取的值是之前其他用户的请求遗留的值。这时,ThreadLocal 中的用户信息就是其他用户的信息

并不能认为没有显式开启多线程就不会有线程安全问题。使用类似 ThreadLocal 工具来存放一些数据时,需要特别注意在代码运行完后,显式地去清空设置的数据。

ThreadLocal 错误案例修正

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@GetMapping("right")
public Map<String, String> right(@RequestParam("id") Integer userId) {
String before = Thread.currentThread().getName() + ":" + currentUser.get();
currentUser.set(userId);
try {
String after = Thread.currentThread().getName() + ":" + currentUser.get();
Map<String, String> result = new HashMap<>();
result.put("before", before);
result.put("after", after);
return result;
} finally {
//在 finally 代码块中删除 ThreadLocal 中的数据,确保数据不串
currentUser.remove();
}
}

InheritableThreadLocal

通过 ThreadLocal 创建的线程变量,其子线程是无法继承的。也就是说你在线程中通过 ThreadLocal 创建了线程变量 V,而后该线程创建了子线程,你在子线程中是无法通过 ThreadLocal 来访问父线程的线程变量 V 的。

如果你需要子线程继承父线程的线程变量,那该怎么办呢?其实很简单,Java 提供了 InheritableThreadLocal 来支持这种特性,InheritableThreadLocalThreadLocal 子类,所以用法和 ThreadLocal 相同。与 ThreadLocal 不同的是,InheritableThreadLocal 允许一个线程以及该线程创建的所有子线程都可以访问它保存的数据。

不过,完全不建议你在线程池中使用 InheritableThreadLocal,不仅仅是因为它具有 ThreadLocal 相同的缺点——可能导致内存泄露,更重要的原因是:线程池中线程的创建是动态的,很容易导致继承关系错乱,如果你的业务逻辑依赖 InheritableThreadLocal,那么很可能导致业务逻辑计算错误,而这个错误往往比内存泄露更要命。

原理参考:Java 多线程:InheritableThreadLocal 实现原理

Immutability 模式

解决并发问题,其实最简单的办法就是让共享变量只有读操作,而没有写操作。这个办法如此重要,以至于被上升到了一种解决并发问题的设计模式:不变性(Immutability)模式。所谓不变性,是指:一旦创建,状态不再变化。换句话说,就是变量一旦被赋值,就不允许修改了(没有写操作);没有修改操作,也就是保持了不变性。

快速实现具备不可变性的类

将一个类所有的属性都设置成 final 的,并且只允许存在只读方法,那么这个类基本上就具备不可变性了。更严格的做法是这个类本身也是 final 的,也就是不允许继承。因为子类可以覆盖父类的方法,有可能改变不可变性,所以推荐你在实际工作中,使用这种更严格的做法。

在 Java 中,经常用到的 StringLongIntegerDouble 等基础类型的包装类都具备不可变性,这些对象的线程安全性都是靠不可变性来保证的。如果你仔细翻看这些类的声明、属性和方法,你会发现它们都严格遵守不可变类的三点要求:类和属性都是 final 的,所有方法均是只读的

String 这个类虽然有替换操作,但实际仍是只读的。阅读 String 源码可以发现:String 这个类以及它的属性 value[] 都是 final 的;而 replace() 方法的实现,就的确没有修改 value[],而是将替换后的字符串作为返回值返回了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public final class String {
private final char value[];
// 字符替换
String replace(char oldChar,
char newChar) {
//无需替换,直接返回 this
if (oldChar == newChar){
return this;
}

int len = value.length;
int i = -1;
/* avoid getfield opcode */
char[] val = value;
//定位到需要替换的字符位置
while (++i < len) {
if (val[i] == oldChar) {
break;
}
}
//未找到 oldChar,无需替换
if (i >= len) {
return this;
}
//创建一个 buf[],这是关键
//用来保存替换后的字符串
char buf[] = new char[len];
for (int j = 0; j < i; j++) {
buf[j] = val[j];
}
while (i < len) {
char c = val[i];
buf[i] = (c == oldChar) ?
newChar : c;
i++;
}
//创建一个新的字符串返回
//原字符串不会发生任何变化
return new String(buf, true);
}
}

通过分析 String 的实现,你可能已经发现了,如果具备不可变性的类,需要提供类似修改的功能,具体该怎么操作呢?做法很简单,那就是创建一个新的不可变对象,这是与可变对象的一个重要区别,可变对象往往是修改自己的属性。

使用 Immutability 模式的注意事项

在使用 Immutability 模式的时候,需要注意以下两点:

  1. 对象的所有属性都是 final 的,并不能保证不可变性;
  2. 不可变对象也需要正确发布。

在 Java 语言中,final 修饰的属性一旦被赋值,就不可以再修改,但是如果属性的类型是普通对象,那么这个普通对象的属性是可以被修改的。例如下面的代码中,Bar 的属性 foo 虽然是 final 的,依然可以通过 setAge() 方法来设置 foo 的属性 age。所以,在使用 Immutability 模式的时候一定要确认保持不变性的边界在哪里,是否要求属性对象也具备不可变性

1
2
3
4
5
6
7
8
9
10
class Foo{
int age=0;
int name="abc";
}
final class Bar {
final Foo foo;
void setAge(int a){
foo.age=a;
}
}

不可变对象虽然是线程安全的,但是并不意味着引用这些不可变对象的对象就是线程安全的。例如在下面的代码中,Foo 具备不可变性,线程安全,但是类 Bar 并不是线程安全的,类 Bar 中持有对 Foo 的引用 foo,对 foo 这个引用的修改在多线程中并不能保证可见性和原子性。

1
2
3
4
5
6
7
8
9
10
11
12
//Foo 线程安全
final class Foo{
final int age=0;
final int name="abc";
}
//Bar 线程不安全
class Bar {
Foo foo;
void setFoo(Foo f){
this.foo=f;
}
}

如果你的程序仅仅需要 foo 保持可见性,无需保证原子性,那么可以将 foo 声明为 volatile 变量,这样就能保证可见性。如果你的程序需要保证原子性,那么可以通过原子类来实现。下面的示例代码是合理库存的原子化实现,你应该很熟悉了,其中就是用原子类解决了不可变对象引用的原子性问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class SafeWM {

class WMRange {
final int upper;
final int lower;
WMRange(int upper, int lower) {
//省略构造函数实现
}
}

final AtomicReference<WMRange> rf = new AtomicReference<>(new WMRange(0, 0));

// 设置库存上限
void setUpper(int v) {
while (true) {
WMRange or = rf.get();
// 检查参数合法性
if (v < or.lower) {
throw new IllegalArgumentException();
}
WMRange nr = new WMRange(v, or.lower);
if (rf.compareAndSet(or, nr)) {
return;
}
}
}
}

Copy-on-Write 模式

所谓 Copy-on-Write,经常被缩写为 CoW,顾名思义就是写时复制

Java 支持 CopyOnWriteArrayListCopyOnWriteArraySet 两种并发容器,其设计思想就是 CoW;通过 Copy-on-Write 这两个容器实现的读操作是无锁的,由于无锁,所以将读操作的性能发挥到了极致。

CoW 是一项非常通用的技术方案,在很多领域都有着广泛的应用。不过,它也有缺点的,那就是消耗内存,每次修改都需要复制一个新的副本出来。

参考资料

Java 并发之同步工具

Semaphore

Semaphore 译为信号量,是一种同步机制,用于控制多线程对共享资源的访问。信号量是由计算机科学家 Edsger Dijkstra 于 1965 年提出的,用于解决所谓的“临界区”问题,即多个进程或线程试图同时访问共享资源(如打印机、内存缓冲区等)时可能出现的问题。

信号量模型

信号量模型还是很简单的,可以简单概括为:一个计数器,一个等待队列,三个方法。在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()、down() 和 up()。

  • 这三个方法详细的语义具体如下所示。

    • init():设置计数器的初始值。
    • down():计数器的值减 1;如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程可以继续执行。
    • up():计数器的值加 1;如果此时计数器的值小于或者等于 0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。

    这里提到的 init()、down() 和 up() 三个方法都是原子性的,并且这个原子性是由信号量模型的实现方保证的。在 Java 中,信号量模型是由 java.util.concurrent.Semaphore 实现的,Semaphore 这个类能够保证这三个方法都是原子操作。

    信号量模型里面,down()、up() 这两个操作历史上最早称为 P 操作和 V 操作,所以信号量模型也被称为 PV 原语。

Semaphore 使用

Semaphore 提供了 2 个构造方法:

1
2
3
4
// 参数 permits 表示许可数目,即同时可以允许多少线程进行访问
public Semaphore(int permits) {}
// 参数 fair 表示是否是公平的,即等待时间越久的越先获取许可
public Semaphore(int permits, boolean fair) {}

说明:

  • permits - 初始化固定数量的 permit。
  • fair - 设置是否为公平模式。所谓公平,是指等待久的优先获取 permit。

Semaphore 的重要方法:

1
2
3
4
5
6
7
8
// 获取 1 个许可
public void acquire() throws InterruptedException {}
//获取 permits 个许可
public void acquire(int permits) throws InterruptedException {}
// 释放 1 个许可
public void release() {}
//释放 permits 个许可
public void release(int permits) {}

说明:

  • acquire() - 获取 1 个 permit。
  • acquire(int permits) - 获取 permits 数量的 permit。
  • release() - 释放 1 个 permit。
  • release(int permits) - 释放 permits 数量的 permit。

img

【示例】Semaphore 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class SemaphoreDemo {

private static final int THREAD_COUNT = 30;

private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

private static Semaphore semaphore = new Semaphore(10);

public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("save data");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}

threadPool.shutdown();
}

}

Semaphore 原理

Semaphore 是共享锁的一种实现,它默认构造 AQS 的 state 值为 permits,你可以将 permits 的值理解为许可证的数量,只有拿到许可证的线程才能执行。

调用semaphore.acquire() ,线程尝试获取许可证,如果 state >= 0 的话,则表示可以获取成功。如果获取成功的话,使用 CAS 操作去修改 state 的值 state=state-1。如果 state<0 的话,则表示许可证数量不足。此时会创建一个 Node 节点加入阻塞队列,挂起当前线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 获取 1 个许可证
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 共享模式下获取许可证,获取成功则返回,失败则加入阻塞队列,挂起线程
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取许可证,arg 为获取许可证个数,当可用许可证数减当前获取的许可证数结果小于 0, 则创建一个节点加入阻塞队列,挂起当前线程。
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

调用semaphore.release(); ,线程尝试释放许可证,并使用 CAS 操作去修改 state 的值 state=state+1。释放许可证成功之后,同时会唤醒同步队列中的一个线程。被唤醒的线程会重新尝试去修改 state 的值 state=state-1 ,如果 state>=0 则获取令牌成功,否则重新进入阻塞队列,挂起线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 释放一个许可证
public void release() {
sync.releaseShared(1);
}

// 释放共享锁,同时会唤醒同步队列中的一个线程。
public final boolean releaseShared(int arg) {
//释放共享锁
if (tryReleaseShared(arg)) {
//唤醒同步队列中的一个线程
doReleaseShared();
return true;
}
return false;
}

实现一个限流器

Semaphore 最重要的特性是:Semaphore 可以允许多个线程访问一个临界区

Semaphore 在现实中有很多应用场景:

  • 各种池化资源,例如连接池、对象池、线程池等;
  • 信号量限流(例如 Hystrix 就支持信号量限流模式);

【示例】一个基于信号量实现的简单对象限流器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class SemaphoreRateLimit {

public static void main(String[] args) {
// 创建对象池,大小为 10
ObjectPool<Long, String> pool = new ObjectPool<>(10, 2L);
// 通过对象池获取 t,之后执行
pool.exec(t -> {
System.out.println(t);
return t.toString();
});
}

static class ObjectPool<T, R> {

final List<T> pool;
// 用信号量实现限流器
final Semaphore sem;

// 构造函数
ObjectPool(int size, T t) {
pool = new Vector<T>() { };
for (int i = 0; i < size; i++) {
pool.add(t);
}
sem = new Semaphore(size);
}

// 利用对象池的对象,调用 func
R exec(Function<T, R> func) {
T t = null;
try {
sem.acquire();
t = pool.remove(0);
return func.apply(t);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
pool.add(t);
sem.release();
return null;
}
}

}

}

在这个方法里面,我们首先调用 acquire() 方法(与之匹配的是在 finally 里面调用 release() 方法),假设对象池的大小是 10,信号量的计数器初始化为 10,那么前 10 个线程调用 acquire() 方法,都能继续执行,相当于通过了信号量,而其他线程则会阻塞在 acquire() 方法上。对于通过信号量的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过 pool.remove(0) 实现的),分配完之后会执行一个回调函数 func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过 pool.add(t) 实现的),同时调用 release() 方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于 0,那么说明有线程在等待,此时会自动唤醒等待的线程。

CountDownLatch

CountDownLatch 字面意思为递减计数锁。用于控制一个线程等待多个线程

CountDownLatch 内部维护了一个计数器,表示需要等待的事件数量。countDown 方法递减计数器,表示有一个事件已经发生。调用 await 方法的线程会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。

img

CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用 countDown() 方法时,其实使用了tryReleaseShared方法以 CAS 的操作来减少 state,直至 state 为 0 。当调用 await() 方法的时候,如果 state 不为 0,那就证明任务还没有执行完毕,await() 方法就会一直阻塞,也就是说 await() 方法之后的语句不会被执行。直到count 个线程调用了countDown()使 state 值被减为 0,或者调用await()的线程被中断,该线程才会从阻塞中被唤醒,await() 方法之后的语句得到执行。

CountDownLatch 唯一的构造方法:

1
2
// 初始化计数器
public CountDownLatch(int count) {};

CountDownLatch 的重要方法:

1
2
3
public void await() throws InterruptedException { };
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
public void countDown() { };

说明:

  • await() - 调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行。
  • await(long timeout, TimeUnit unit) - 和 await() 类似,只不过等待一定的时间后 count 值还没变为 0 的话就会继续执行
  • countDown() - 将统计值 count 减 1

【示例】CountDownLatch 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class CountDownLatchDemo {

public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2);

new Thread(new MyThread(latch)).start();

try {
System.out.println("等待 2 个子线程执行完毕。..");
latch.await();
System.out.println("2 个子线程已经执行完毕");
System.out.println("继续执行主线程");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

static class MyThread implements Runnable {

private CountDownLatch latch;

public MyThread(CountDownLatch latch) {
this.latch = latch;
}

@Override
public void run() {
System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
latch.countDown();
}

}

}

CyclicBarrier

CyclicBarrier 字面意思是循环栅栏。**CyclicBarrier 可以让一组线程等待至某个状态(遵循字面意思,不妨称这个状态为栅栏)之后再全部同时执行。之所以叫循环栅栏是因为:当所有等待线程都被释放以后,CyclicBarrier 可以被重用**。

CyclicBarrier 是基于 ReentrantLockReentrantLock 底层也是基于 AQS 实现的)和 Condition 实现的。CyclicBarrier 内部维护一个计数器,每次执行 await 方法之后,计数器加 1,直到计数器的值和设置的值相等,等待的所有线程才会继续执行。

CyclicBarrier 在并行迭代算法中非常有用。

img

CyclicBarrier 提供了 2 个构造方法

1
2
public CyclicBarrier(int parties) {}
public CyclicBarrier(int parties, Runnable barrierAction) {}

说明:

  • parties - parties 数相当于一个阈值,当有 parties 数量的线程在等待时, CyclicBarrier 处于栅栏状态。
  • barrierAction - 当 CyclicBarrier 处于栅栏状态时执行的动作。

CyclicBarrier 的重要方法:

1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {}
// 将屏障重置为初始状态
public void reset() {}

说明:

  • await() - 等待调用 await() 的线程数达到屏障数。如果当前线程是最后一个到达的线程,并且在构造函数中提供了非空屏障操作,则当前线程在允许其他线程继续之前运行该操作。如果在屏障动作期间发生异常,那么该异常将在当前线程中传播并且屏障被置于断开状态。
  • await(long timeout, TimeUnit unit) - 相比于 await() 方法,这个方法让这些线程等待至一定的时间,如果还有线程没有到达栅栏状态就直接让到达栅栏状态的线程执行后续任务。
  • reset() - 将屏障重置为初始状态。

【示例】CyclicBarrier 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class CyclicBarrierDemo {

final static int N = 4;

public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(N,
new Runnable() {
@Override
public void run() {
System.out.println("当前线程" + Thread.currentThread().getName());
}
});

for (int i = 0; i < N; i++) {
MyThread myThread = new MyThread(barrier);
new Thread(myThread).start();
}
}

static class MyThread implements Runnable {

private CyclicBarrier cyclicBarrier;

MyThread(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据。..");
try {
Thread.sleep(3000); // 以睡眠来模拟写入数据操作
System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}

}

}

小结

  • CountDownLatchCyclicBarrier 都能够实现线程之间的等待,只不过它们侧重点不同:
    • CountDownLatch 一般用于某个线程 A 等待若干个其他线程执行完任务之后,它才执行;
    • CyclicBarrier 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
    • 另外,CountDownLatch 是不可以重用的,而 CyclicBarrier 是可以重用的。
  • Semaphore 其实和锁有点类似,它一般用于控制对某组资源的访问权限。

参考资料

Java 并发之线程

线程简介

  • 进程(Process) - 进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动。进程是操作系统进行资源分配的基本单位。进程可视为一个正在运行的程序
  • 线程(Thread) - 线程是操作系统进行调度的基本单位
  • 管程(Monitor) - 管程是指管理共享变量以及对共享变量的操作过程,让他们支持并发
    • Java 通过 synchronized 关键字及 wait()、notify()、notifyAll() 这三个方法来实现管程技术。
    • 管程和信号量是等价的,所谓等价指的是用管程能够实现信号量,也能用信号量实现管程
  • 协程(Coroutine) - 协程可以理解为一种轻量级的线程
    • 从操作系统的角度来看,线程是在内核态中调度的,而协程是在用户态调度的,所以相对于线程来说,协程切换的成本更低。
    • 协程虽然也有自己的栈,但是相比线程栈要小得多,典型的线程栈大小差不多有 1M,而协程栈的大小往往只有几 K 或者几十 K。所以,无论是从时间维度还是空间维度来看,协程都比线程轻量得多。
    • Go、Python、Lua、Kotlin 等语言都支持协程;Java OpenSDK 中的 Loom 项目目标就是支持协程。

进程和线程的差异:

  • 一个程序至少有一个进程,一个进程至少有一个线程。
  • 线程比进程划分更细,所以执行开销更小,并发性更高
  • 进程是一个实体,拥有独立的资源;而同一个进程中的多个线程共享进程的资源。

img

JVM 在单个进程中运行,JVM 中的线程共享属于该进程的堆。这就是为什么几个线程可以访问同一个对象。线程共享堆并拥有自己的堆栈空间。这是一个线程如何调用一个方法以及它的局部变量是如何保持线程安全的。但是堆不是线程安全的并且为了线程安全必须进行同步。

线程创建

一般来说,创建线程有很多种方式,例如:

  • 实现 Runnable 接口
  • 实现 Callable 接口
  • 继承 Thread
  • 通过线程池创建线程
  • 使用 CompletableFuture 创建线程

下面是几种创建线程的示例:

::: tabs#创建线程

@tab Thread

Thread

【示例】继承 Thread 类创建线程

  1. 定义 Thread 类的子类,并覆写该类的 run 方法。run 方法的方法体就代表了线程要完成的任务,因此把 run 方法称为执行体。
  2. 创建 Thread 子类的实例,即创建了线程对象。
  3. 调用线程对象的 start 方法来启动该线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ThreadDemo {

public static void main(String[] args) {
// 实例化对象
MyThread tA = new MyThread("Thread 线程-A");
MyThread tB = new MyThread("Thread 线程-B");
// 调用线程主体
tA.start();
tB.start();
}

static class MyThread extends Thread {

private int ticket = 5;

MyThread(String name) {
super(name);
}

@Override
public void run() {
while (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}
}

}

}

@tab Runnable

Runnable

实现 Runnable 接口优于继承 Thread,因为:

  • Java 不支持多重继承,所有的类都只允许继承一个父类,但可以实现多个接口。如果继承了 Thread 类就无法继承其它类,这不利于扩展。
  • 类可能只要求可执行就行,继承整个 Thread 类开销过大。

【示例】实现 Runnable 接口创建线程

  1. 定义 Runnable 接口的实现类,并覆写该接口的 run 方法。该 run 方法的方法体同样是该线程的线程执行体。
  2. 创建 Runnable 实现类的实例,并以此实例作为 Thread 的 target 来创建 Thread 对象,该 Thread 对象才是真正的线程对象。
  3. 调用线程对象的 start 方法来启动该线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class RunnableDemo {

public static void main(String[] args) {
// 实例化对象
Thread tA = new Thread(new MyThread(), "Runnable 线程-A");
Thread tB = new Thread(new MyThread(), "Runnable 线程-B");
// 调用线程主体
tA.start();
tB.start();
}

static class MyThread implements Runnable {

private int ticket = 5;

@Override
public void run() {
while (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}
}

}

}

@tab Callable

Callable、Future、FutureTask

继承 Thread 类和实现 Runnable 接口这两种创建线程的方式都没有返回值。所以,线程执行完后,无法得到执行结果。但如果期望得到执行结果该怎么做?

为了解决这个问题,Java 1.5 后,提供了 Callable 接口和 Future 接口,通过它们,可以在线程执行结束后,返回执行结果。

Callable

Callable 接口只声明了一个 call 方法:

1
2
3
4
5
6
7
8
9
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

那么怎么使用 Callable 呢?一般情况下是配合 ExecutorService 来使用的,在 ExecutorService 接口中声明了若干个 submit 方法的重载版本:

1
2
3
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

第一个 submit 方法里面的参数类型就是 Callable

Future

Future 就是对于具体的 Callable 任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过 get 方法获取执行结果,该方法会阻塞直到任务返回结果。

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask

FutureTask 类实现了 RunnableFuture 接口,RunnableFuture 继承了 Runnable 接口和 Future 接口。

所以,FutureTask 既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的返回值。

1
2
3
4
5
6
7
8
9
public class FutureTask<V> implements RunnableFuture<V> {
// ...
public FutureTask(Callable<V> callable) {}
public FutureTask(Runnable runnable, V result) {}
}

public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

事实上,FutureTaskFuture 接口的一个唯一实现类。

Callable + Future + FutureTask 示例

通过实现 Callable 接口创建线程的步骤:

  1. 创建 Callable 接口的实现类,并实现 call 方法。该 call 方法将作为线程执行体,并且有返回值。
  2. 创建 Callable 实现类的实例,使用 FutureTask 类来包装 Callable 对象,该 FutureTask 对象封装了该 Callable 对象的 call 方法的返回值。
  3. 使用 FutureTask 对象作为 Thread 对象的 target 创建并启动新线程。
  4. 调用 FutureTask 对象的 get 方法来获得线程执行结束后的返回值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class CallableDemo {

public static void main(String[] args) {
Callable<Long> callable = new MyThread();
FutureTask<Long> future = new FutureTask<>(callable);
new Thread(future, "Callable 线程").start();
try {
System.out.println("任务耗时:" + (future.get() / 1000000) + "毫秒");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

static class MyThread implements Callable<Long> {

private int ticket = 10000;

@Override
public Long call() {
long begin = System.nanoTime();
while (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}

long end = System.nanoTime();
return (end - begin);
}

}

}

:::

虽然,看似有多种多样的创建线程方式。但是,从本质上来说,Java 就只有一种方式可以创建线程,那就是通过 new Thread().start() 创建。不管是哪种方式,最终还是依赖于 new Thread().start()

👉 扩展阅读:大家都说 Java 有三种创建线程的方式!并发编程中的惊天骗局!

线程终止

如何正确停止线程

通常情况下,我们不会手动停止一个线程,而是允许线程运行到结束,然后让它自然停止。但是依然会有许多特殊的情况需要我们提前停止线程,比如:用户突然关闭程序,或程序运行出错重启等。

对于 Java 而言,最正确的停止线程的方式是:通过 Thread.interruptThread.isInterrupted 配合来控制线程终止。但 Thread.interrupt 仅仅起到通知被停止线程的作用。而对于被停止的线程而言,它拥有完全的自主权,它既可以选择立即停止,也可以选择一段时间后停止,也可以选择压根不停止。

事实上,Java 希望程序间能够相互通知、相互协作地管理线程,因为如果不了解对方正在做的工作,贸然强制停止线程就可能会造成一些安全的问题,为了避免造成问题就需要给对方一定的时间来整理收尾工作。比如:线程正在写入一个文件,这时收到终止信号,它就需要根据自身业务判断,是选择立即停止,还是将整个文件写入成功后停止,而如果选择立即停止就可能造成数据不完整,不管是中断命令发起者,还是接收者都不希望数据出现问题。

一旦调用某个线程的 Thread.interrupt 之后,这个线程的中断标记位就会被设置成 true。每个线程都有这样的标记位,当线程执行时,应该定期检查这个标记位,如果标记位被设置成 true,就说明有程序想终止该线程。回到源码,可以看到在 while 循环体判断语句中,首先通过 Thread.currentThread().isInterrupt() 判断线程是否被中断,随后检查是否还有工作要做。&& 逻辑表示只有当两个判断条件同时满足的情况下,才会去执行下面的工作。

需要留意一个特殊场景:**Thread.sleep 后,线程依然可以感知 Thread.interrupt**。

【示例】正确停止线程的方式——Thread.interrupt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ThreadStopDemo {

public static void main(String[] args) throws Exception {
Thread thread = new Thread(new MyTask(), "MyTask");
thread.start();
TimeUnit.MILLISECONDS.sleep(10);
thread.interrupt();
}

private static class MyTask implements Runnable {

private long count = 0L;

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 线程启动");
// 通过 Thread.interrupted 和 interrupt 配合来控制线程终止
while (!Thread.currentThread().isInterrupted() && count < 10000) {
System.out.println("count = " + count++);
}
System.out.println(Thread.currentThread().getName() + " 线程终止");
}

}

}
// 输出(count 未到 10000,线程就主动结束):
// MyTask 线程启动
// count = 0
// count = 1
// ...
// count = 840
// count = 841
// count = 842
// MyTask 线程终止

可以使用 Thread.stopThread.suspendThread.resume 停止线程吗?

Thread.stopThread.suspendThread.resume 方法已经被 Java 标记为 @Deprecated。为什么废弃呢?

  • Thread.stop 会直接把线程停止,这样就没有给线程足够的时间来处理想要在停止前保存数据的逻辑,任务戛然而止,会导致出现数据完整性等问题
  • 而对于Thread.suspendThread.resume 而言,它们的问题在于:如果线程调用 Thread.suspend,它并不会释放锁,就开始进入休眠,但此时有可能仍持有锁,这样就容易导致死锁问题。因为这把锁在线程被 Thread.resume 之前,是不会被释放的。假设线程 A 调用了 Thread.suspend 方法让线程 B 挂起,线程 B 进入休眠,而线程 B 又刚好持有一把锁,此时假设线程 A 想访问线程 B 持有的锁,但由于线程 B 并没有释放锁就进入休眠了,所以对于线程 A 而言,此时拿不到锁,也会陷入阻塞,那么线程 A 和线程 B 就都无法继续向下执行。

【示例】Thread.stop 终止线程,导致线程任务戛然而止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class ThreadStopErrorDemo {

public static void main(String[] args) {
MyTask thread = new MyTask();
thread.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 终止线程
thread.stop();
// 确保线程终止后,才执行下面的代码
while (thread.isAlive()) { }
// 输出两个计数器的最终状态
thread.print();
}

/**
* 持有两个计数器,run 方法中每次执行都会使计数器自增
*/
private static class MyTask extends Thread {

private int i = 0;

private int j = 0;

@Override
public void run() {
synchronized (this) {
++i;
try {
// 模拟耗时操作
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
++j;
}
}

public void print() {
System.out.println("i=" + i + " j=" + j);
}

}

}

使用 volatile 标记方式停止线程正确吗?

使用 volatile 标记方式停止线程并不总是正确的。虽然 volatile 变量可以确保可见性,即当一个线程修改了 volatile 变量的值,其他线程能够立即看到最新的值,但它并不能保证原子性,也就是说并不能保证多个线程对 volatile 变量的操作是互斥的。

当我们使用 volatile 变量来控制线程的停止,通常是通过设置一个 volatile 标志位来告诉线程停止执行。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MyTask extends Thread {
private volatile boolean canceled = false;

public void run() {
while (!canceled) {
// 执行任务
}
}

public void stopTask() {
canceled = true;
}
}

在上述例子中,canceled 是一个 volatile 变量,用来控制线程的停止。虽然这种方式在某些情况下可以工作,但它并不是一个可靠的停止线程的方式,因为在多线程环境中,其他线程修改 canceled 的值时,可能会出现竞态条件,导致线程无法正确停止

线程基本方法

线程(Thread)基本方法清单:

方法 描述
run 线程的执行实体。
start 线程的启动方法。
currentThread 返回对当前正在执行的线程对象的引用。
setName 设置线程名称。
getName 获取线程名称。
setPriority 设置线程优先级。Java 中的线程优先级的范围是 [1,10],一般来说,高优先级的线程在运行时会具有优先权。可以通过 thread.setPriority(Thread.MAX_PRIORITY) 的方式设置,默认优先级为 5。
getPriority 获取线程优先级。
setDaemon 设置线程为守护线程。
isDaemon 判断线程是否为守护线程。
isAlive 判断线程是否启动。
interrupt 中断另一个线程的运行状态。
interrupted 测试当前线程是否已被中断。通过此方法可以清除线程的中断状态。换句话说,如果要连续调用此方法两次,则第二次调用将返回 false(除非当前线程在第一次调用清除其中断状态之后且在第二次调用检查其状态之前再次中断)。
join 可以使一个线程强制运行,线程强制运行期间,其他线程无法运行,必须等待此线程完成之后才可以继续执行。
Thread.sleep 静态方法。将当前正在执行的线程休眠。
Thread.yield 静态方法。将当前正在执行的线程暂停,让其他线程执行。

线程休眠

使用 Thread.sleep 方法可以使得当前正在执行的线程进入休眠状态。

使用 Thread.sleep 需要向其传入一个整数值,这个值表示线程将要休眠的毫秒数。

Thread.sleep 方法可能会抛出 InterruptedException,因为异常不能跨线程传播回 main 中,因此必须在本地进行处理。线程中抛出的其它异常也同样需要在本地进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ThreadSleepDemo {

public static void main(String[] args) {
new Thread(new MyThread("线程 A", 500)).start();
new Thread(new MyThread("线程 B", 1000)).start();
new Thread(new MyThread("线程 C", 1500)).start();
}

static class MyThread implements Runnable {

/** 线程名称 */
private String name;

/** 休眠时间 */
private int time;

private MyThread(String name, int time) {
this.name = name;
this.time = time;
}

@Override
public void run() {
try {
// 休眠指定的时间
Thread.sleep(this.time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.name + "休眠" + this.time + "毫秒。");
}

}

}

线程礼让

Thread.yield 方法的调用声明了当前线程已经完成了生命周期中最重要的部分,可以切换给其它线程来执行 。该方法只是对线程调度器的一个建议,而且也只是建议具有相同优先级的其它线程可以运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ThreadYieldDemo {

public static void main(String[] args) {
MyThread t = new MyThread();
new Thread(t, "线程 A").start();
new Thread(t, "线程 B").start();
}

static class MyThread implements Runnable {

@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "运行,i = " + i);
if (i == 2) {
System.out.print("线程礼让:");
Thread.yield();
}
}
}
}
}

守护线程

什么是守护线程?

  • 守护线程(Daemon Thread)是在后台执行并且不会阻止 JVM 终止的线程当所有非守护线程结束时,程序也就终止,同时会杀死所有守护线程
  • 与守护线程(Daemon Thread)相反的,叫用户线程(User Thread),也就是非守护线程。

为什么需要守护线程?

  • 守护线程的优先级比较低,用于为系统中的其它对象和线程提供服务。典型的应用就是垃圾回收器。

如何使用守护线程?

  • 可以使用 isDaemon 方法判断线程是否为守护线程。
  • 可以使用 setDaemon 方法设置线程为守护线程。
    • 正在运行的用户线程无法设置为守护线程,所以 setDaemon 必须在 thread.start 方法之前设置,否则会抛出 llegalThreadStateException 异常;
    • 一个守护线程创建的子线程依然是守护线程。
    • 不要认为所有的应用都可以分配给守护线程来进行服务,比如读写操作或者计算逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ThreadDaemonDemo {

public static void main(String[] args) {
Thread t = new Thread(new MyThread(), "线程");
t.setDaemon(true); // 此线程在后台运行
System.out.println("线程 t 是否是守护进程:" + t.isDaemon());
t.start(); // 启动线程
}

static class MyThread implements Runnable {

@Override
public void run() {
while (true) {
System.out.println(Thread.currentThread().getName() + "在运行。");
}
}
}
}

参考阅读:Java 中守护线程的总结

线程通信

当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其它部分之前完成,那么就需要对线程进行协调。

wait/notify/notifyAll

  • wait - wait 会自动释放当前线程占有的对象锁,并请求操作系统挂起当前线程,让线程从 RUNNING 状态转入 WAITING 状态,等待 notify / notifyAll 来唤醒。如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify 或者 notifyAll 来唤醒挂起的线程,造成死锁。
  • notify - 唤醒一个正在 WAITING 状态的线程,并让它拿到对象锁,具体唤醒哪一个线程由 JVM 控制 。
  • notifyAll - 唤醒所有正在 WAITING 状态的线程,接下来它们需要竞争对象锁。

注意:

  • waitnotifynotifyAll 都是 Object 类中的方法,而非 Thread
  • **waitnotifynotifyAll 只能用在 synchronized 方法或者 synchronized 代码块中使用,否则会在运行时抛出 IllegalMonitorStateException**。

生产者、消费者模式是 waitnotifynotifyAll 的一个经典使用案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class ThreadWaitNotifyDemo02 {

private static final int QUEUE_SIZE = 10;
private static final PriorityQueue<Integer> queue = new PriorityQueue<>(QUEUE_SIZE);

public static void main(String[] args) {
new Producer("生产者 A").start();
new Producer("生产者 B").start();
new Consumer("消费者 A").start();
new Consumer("消费者 B").start();
}

static class Consumer extends Thread {

Consumer(String name) {
super(name);
}

@Override
public void run() {
while (true) {
synchronized (queue) {
while (queue.size() == 0) {
try {
System.out.println("队列空,等待数据");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
queue.notifyAll();
}
}
queue.poll(); // 每次移走队首元素
queue.notifyAll();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 从队列取走一个元素,队列当前有:" + queue.size() + "个元素");
}
}
}
}

static class Producer extends Thread {

Producer(String name) {
super(name);
}

@Override
public void run() {
while (true) {
synchronized (queue) {
while (queue.size() == QUEUE_SIZE) {
try {
System.out.println("队列满,等待有空余空间");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
queue.notifyAll();
}
}
queue.offer(1); // 每次插入一个元素
queue.notifyAll();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 向队列取中插入一个元素,队列当前有:" + queue.size() + "个元素");
}
}
}
}
}

join

在线程操作中,可以使用 join 方法让一个线程强制运行,线程强制运行期间,其他线程无法运行,必须等待此线程完成之后才可以继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ThreadJoinDemo {

public static void main(String[] args) {
MyThread mt = new MyThread(); // 实例化 Runnable 子类对象
Thread t = new Thread(mt, "mythread"); // 实例化 Thread 对象
t.start(); // 启动线程
for (int i = 0; i < 50; i++) {
if (i > 10) {
try {
t.join(); // 线程强制运行
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Main 线程运行 --> " + i);
}
}

static class MyThread implements Runnable {

@Override
public void run() {
for (int i = 0; i < 50; i++) {
System.out.println(Thread.currentThread().getName() + " 运行,i = " + i); // 取得当前线程的名字
}
}
}
}

管道

管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不同之处在于,它主要用于线程之间的数据传输,而传输的媒介为内存。
管道输入/输出流主要包括了如下 4 种具体实现:PipedOutputStreamPipedInputStreamPipedReaderPipedWriter,前两种面向字节,而后两种面向字符。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Piped {

public static void main(String[] args) throws Exception {
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
// 将输出流和输入流进行连接,否则在使用时会抛出 IOException
out.connect(in);
Thread printThread = new Thread(new Print(in), "PrintThread");
printThread.start();
int receive = 0;
try {
while ((receive = System.in.read()) != -1) {
out.write(receive);
}
} finally {
out.close();
}
}

static class Print implements Runnable {

private PipedReader in;

Print(PipedReader in) {
this.in = in;
}

public void run() {
int receive = 0;
try {
while ((receive = in.read()) != -1) {
System.out.print((char) receive);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

线程生命周期

java.lang.Thread.State 中定义了 6 种不同的线程状态,在给定的一个时刻,线程只能处于其中的一个状态。

以下是各状态的说明,以及状态间的联系:

  • 开始(NEW) - 尚未调用 start 方法的线程处于此状态。此状态意味着:创建的线程尚未启动
  • 可运行(RUNNABLE) - 已经调用了 start 方法的线程处于此状态。此状态意味着,线程已经准备好了,一旦被线程调度器分配了 CPU 时间片,就可以运行线程。
    • 在操作系统层面,线程有 READY 和 RUNNING 状态;而在 JVM 层面,只能看到 RUNNABLE 状态,所以 Java 系统一般将这两个状态统称为 RUNNABLE(运行中) 状态 。
  • 阻塞(BLOCKED) - 此状态意味着:线程处于被阻塞状态。表示线程在等待 synchronized 的隐式锁(Monitor lock)。synchronized 修饰的方法、代码块同一时刻只允许一个线程执行,其他线程只能等待,即处于阻塞状态。当占用 synchronized 隐式锁的线程释放锁,并且等待的线程获得 synchronized 隐式锁时,就又会从 BLOCKED 转换到 RUNNABLE 状态。
  • 等待(WAITING) - 此状态意味着:线程无限期等待,直到被其他线程显式地唤醒。 阻塞和等待的区别在于,阻塞是被动的,它是在等待获取 synchronized 的隐式锁。而等待是主动的,通过调用 Object.wait 等方法进入。
    • 进入:Object.wait();退出:Object.notify / Object.notifyAll
    • 进入:Thread.join();退出:被调用的线程执行完毕
    • 进入:LockSupport.park();退出:LockSupport.unpark
  • 定时等待(TIMED_WAITING) - 等待指定时间的状态。一个线程处于定时等待状态,是由于执行了以下方法中的任意方法:
    • 进入:Thread.sleep(long);退出:时间结束
    • 进入:Object.wait(long);退出:时间结束 / Object.notify / Object.notifyAll
    • 进入:Thread.join(long);退出:时间结束 / 被调用的线程执行完毕
    • 进入:LockSupport.parkNanos(long);退出:LockSupport.unpark
    • 进入:LockSupport.parkUntil(long);退出:LockSupport.unpark
  • 终止 (TERMINATED) - 线程 run() 方法执行结束,或者因异常退出了 run() 方法,则该线程结束生命周期。死亡的线程不可再次复生。

👉 扩展阅读:

线程常见问题

线程启动

典型问题

(1)Thread.start()Thread.run() 有什么区别?

(2)可以直接调用 Thread.run() 方法么?

(3)一个线程两次调用 Thread.start() 方法会怎样

知识点

(1)Thread.start()Thread.run() 的区别:

  • run() 方法是线程的执行体。
  • start() 方法负责启动线程,然后 JVM 会让这个线程去执行 run() 方法。

(2)可以直接调用 Thread.run() 方法,但是它的行为和普通方法一样,不会启动新线程去执行。调用 start() 方法方可启动线程并使线程进入就绪状态,直接执行 run() 方法的话不会以多线程的方式执行。

(3)Java 的线程是不允许启动两次的,第二次调用必然会抛出 IllegalThreadStateException

线程等待

典型问题

(1)Thread.sleep()Thread.yield()Thread.join()Object.wait() 方法有什么区别?

(2)为什么 Thread.sleep()Thread.yield() 设计为静态方法?

知识点

(1)Thread.sleep()Thread.yield()Thread.join() 方法的区别:

  • Thread.sleep()
    • Thread.sleep() 方法需要指定等待的时间,它可以让当前正在执行的线程在指定的时间内暂停执行,进入 TIMED_WAITING 状态。
    • 该方法既可以让其他同优先级或者高优先级的线程得到执行的机会,也可以让低优先级的线程得到执行机会。
    • 但是,Thread.sleep() 方法不会释放“锁标志”,也就是说如果有 synchronized 同步块,其他线程仍然不能访问共享数据。
  • Thread.yield()
    • Thread.yield() 方法可以让当前正在执行的线程暂停,但它不会阻塞该线程,它只是将该线程从 RUNNING 状态转入 RUNNABLE 状态。
    • 当某个线程调用了 Thread.yield() 方法暂停之后,只有优先级大于等于当前线程的处于就绪状态的线程才会获得执行的机会。
  • Thread.join()
    • Thread.join() 方法会使当前线程转入 WAITINGTIMED_WAITING 状态,等待调用 Thread.join() 方法的线程结束后才能继续执行。
  • Object.wait()
    • Object.wait() 用于使当前线程等待,直到其他线程调用相同对象的 Object.notify()Object.notifyAll() 方法唤醒它。
    • 调用 Object.wait() 时,线程会释放对象锁,并进入等待状态。

(2)为什么 Thread.sleep()Thread.yield() 设计为静态方法?

Thread.sleep()Thread.yield() 针对的是 RUNNING 状态的线程,也就是说在非 RUNNING 状态的线程上执行这两个方法没有意义。这就是为什么这两个方法被设计为静态的。它们只针对正在 RUNNING 状态的线程工作,避免程序员错误的认为可以在其他非 RUNNING 状态线程上调用。

👉 扩展阅读:Java 线程中 yield 与 join 方法的区别
👉 扩展阅读:sleep(),wait(),yield() 和 join() 方法的区别

线程通信

线程间通信是线程间共享资源的一种方式。Object.wait(), Object.notify()Object.notifyAll() 是用于线程之间协作和通信的方法,它们通常与synchronized 关键字一起使用来实现线程的同步。

典型问题

(1)为什么线程通信的方法 Object.wait()Object.notify()Object.notifyAll() 被定义在 Object 类里?

(2)为什么 Object.wait()Object.notify()Object.notifyAll() 必须在 synchronized 方法/块中被调用?

(3) Object.wait()Thread.sleep 有什么区别?

知识点

(1)为什么线程通信的方法 Object.wait()Object.notify()Object.notifyAll() 被定义在 Object 类里?

Java 的每个对象中都有一个称之为 monitor 监视器的锁,由于每个对象都可以上锁,这就要求在对象头中有一个用来保存锁信息的位置。这个锁是对象级别的,而非线程级别的,wait/notify/notifyAll 也都是锁级别的操作,它们的锁属于对象,所以把它们定义在 Object 类中是最合适,因为 Object 类是所有对象的父类。

如果把 wait/notify/notifyAll 方法定义在 Thread 类中,会带来很大的局限性,比如一个线程可能持有多把锁,以便实现相互配合的复杂逻辑,假设此时 wait 方法定义在 Thread 类中,如何实现让一个线程持有多把锁呢?又如何明确线程等待的是哪把锁呢?既然我们是让当前线程去等待某个对象的锁,自然应该通过操作对象来实现,而不是操作线程。

  • Object.wait()
    • Object.wait() 方法用于使当前线程进入等待状态,直到其他线程调用相同对象的 notify()notifyAll() 方法唤醒它。
    • 在调用 wait() 方法时,线程会释放对象的锁,并进入等待状态。通常在使用 wait() 方法时需要放在一个循环中,以避免虚假唤醒(spurious wakeups)。
  • Object.notify()
    • Object.notify() 方法用于唤醒正在等待该对象的锁的一个线程。
    • 被唤醒的线程将会尝试重新获取对象的锁,一旦获取到锁,它将继续执行。
  • Object.notifyAll()
    • Object.notifyAll() 方法用于唤醒正在等待该对象的锁的所有线程。
    • 所有被唤醒的线程将会竞争对象的锁,一旦获取到锁,它们将继续执行。

(2)为什么 Object.wait()Object.notify()Object.notifyAll() 必须在 synchronized 方法/块中被调用?

当一个线程需要调用对象的 wait() 方法的时候,这个线程必须拥有该对象的锁,接着它就会释放这个对象锁并进入等待状态直到其他线程调用这个对象上的 notify() 方法。同样的,当一个线程需要调用对象的 notify() 方法时,它会释放这个对象的锁,以便其他在等待的线程就可以得到这个对象锁。

由于所有的这些方法都需要线程持有对象的锁,这样就只能通过 synchronized 来实现,所以他们只能在 synchronized 方法/块中被调用。

(3) Object.wait()Thread.sleep 有什么区别?

相同点:

  1. 它们都可以让线程阻塞。
  2. 它们都可以响应 interrupt 中断:在等待的过程中如果收到中断信号,都可以进行响应,并抛出 InterruptedException 异常。

不同点:

  1. wait 方法必须在 synchronized 保护的代码中使用,而 sleep 方法并没有这个要求。
  2. 在同步代码中执行 sleep 方法时,并不会释放 monitor 锁,但执行 wait 方法时会主动释放 monitor 锁。
  3. sleep 方法中会要求必须定义一个时间,时间到期后会主动恢复,而对于没有参数的 wait 方法而言,意味着永久等待,直到被中断或被唤醒才能恢复,它并不会主动恢复。
  4. wait/notify 是 Object 类的方法,而 sleep 是 Thread 类的方法。

👉 扩展阅读:Java 并发编程:线程间协作的两种方式:wait、notify、notifyAll 和 Condition

线程优先级

典型问题

(1)Java 的线程优先级如何控制?

(2)高优先级的 Java 线程一定先执行吗?

知识点

(1)Java 中的线程优先级的范围是 [1,10],一般来说,高优先级的线程在运行时会具有优先权。可以通过 thread.setPriority(Thread.MAX_PRIORITY) 的方式设置,默认优先级为 5

(2)即使设置了线程的优先级,也无法保证高优先级的线程一定先执行

这是因为 Java 线程优先级依赖于操作系统的支持,然而,不同的操作系统支持的线程优先级并不相同,不能很好的和 Java 中线程优先级一一对应。因此,Java 线程优先级控制并不可靠。

守护线程

典型问题

(1)什么是守护线程?

(2)如何创建守护线程?

知识点

(1)什么是守护线程?

守护线程(Daemon Thread)是在后台执行并且不会阻止 JVM 终止的线程。与守护线程(Daemon Thread)相反的,叫用户线程(User Thread),也就是非守护线程。

守护线程的优先级比较低,一般用于为系统中的其它对象和线程提供服务。典型的应用就是垃圾回收器。

(2)创建守护线程的方式:

  • 使用 thread.setDaemon(true) 可以设置 thread 线程为守护线程。
  • 正在运行的用户线程无法设置为守护线程,所以 thread.setDaemon(true) 必须在 thread.start() 之前设置,否则会抛出 llegalThreadStateException 异常;
  • 一个守护线程创建的子线程依然是守护线程。
  • 不要认为所有的应用都可以分配给守护线程来进行服务,比如读写操作或者计算逻辑。

👉 扩展阅读:Java 中守护线程的总结

线程数

典型问题

(1)线程数是不是越多越好?

(2)创建多少线程才合适?

知识点

使用多线程,初衷是为了提升程序性能。度量性能的核心指标是延迟吞吐量。所谓提升性能,从度量的角度,主要是降低延迟,提高吞吐量。在并发编程领域,提升性能本质上就是提升硬件的利用率,再具体点来说,就是提升 I/O 的利用率和 CPU 的利用率。

多线程并非越多越好,过多的线程可能会导致过多的上下文切换,反而降低系统性能。 通常需要根据服务器硬件资源和预期负载来合理设定线程数大小。

程序一般都是 CPU 计算和 I/O 操作交叉执行的,由于 I/O 设备的速度相对于 CPU 来说都很慢,所以大部分情况下,I/O 操作执行的时间相对于 CPU 计算来说都非常长,这种场景我们一般都称为 I/O 密集型计算;和 I/O 密集型计算相对的就是 CPU 密集型计算了,CPU 密集型计算大部分场景下都是纯 CPU 计算。I/O 密集型程序和 CPU 密集型程序,计算最佳线程数的方法是不同的。

对于 CPU 密集型的计算场景,理论上“线程的数量=CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数+1”,这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率。

对于 I/O 密集型计算场景,最佳的线程数是与程序中 CPU 计算和 I/O 操作的耗时比相关的,我们可以总结出这样一个公式:

最佳线程数=1 +(I/O 耗时 / CPU 耗时)

参考资料

Java 并发之线程池

线程池简介

线程池就是管理一系列线程的资源池,其提供了一种限制和管理线程资源的方式。每个线程池还维护一些基本统计信息,例如已完成任务的数量。

如果并发请求数量很多,但每个线程执行的时间很短,就会出现频繁的创建和销毁线程。如此一来,会大大降低系统的效率,可能频繁创建和销毁线程的时间、资源开销要大于实际工作的所需。

使用 线程池的好处 有以下几点:

  • 降低资源消耗 - 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度 - 当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性 - 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

Executor 框架

Executor 框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架,目的是提供一种将”任务提交”与”任务如何运行”分离开来的机制。

通过 Executor 来启动线程比使用 Threadstart 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题。

this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用,调用尚未构造完全的对象的方法可能引发令人疑惑的错误。

核心 API 概述

Executor 框架核心 API 如下:

  • Executor - 运行任务的接口。
  • ExecutorService - 扩展了 Executor 接口。扩展能力:
    • 支持有返回值的线程;
    • 支持管理线程的生命周期。
  • ScheduledExecutorService - 扩展了 ExecutorService 接口,支持定时调度任务。
  • AbstractExecutorService - ExecutorService 接口的默认实现。
  • ThreadPoolExecutor - Executor 框架最核心的类,它继承了 AbstractExecutorService 类。
  • ScheduledThreadPoolExecutor - ScheduledExecutorService 接口的实现,一个可定时调度任务的线程池。
  • Executors - 可以通过调用 Executors 的静态工厂方法来创建线程池并返回一个 ExecutorService 对象。

img

Executor

Executor 接口中只定义了一个 execute 方法,用于接收一个 Runnable 对象。

1
2
3
public interface Executor {
void execute(Runnable command);
}

ExecutorService

ExecutorService 接口继承了 Executor 接口,它还提供了 invokeAllinvokeAnyshutdownsubmit 等方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public interface ExecutorService extends Executor {

void shutdown();

List<Runnable> shutdownNow();

boolean isShutdown();

boolean isTerminated();

boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

从其支持的方法定义,不难看出:相比于 Executor 接口,ExecutorService 接口主要的扩展是:

  • 支持有返回值的线程 - sumbitinvokeAllinvokeAny 方法中都支持传入Callable 对象。
  • 支持管理线程生命周期 - shutdownshutdownNowisShutdown 等方法。

ScheduledExecutorService

ScheduledExecutorService 接口扩展了 ExecutorService 接口。

它除了支持前面两个接口的所有能力以外,还支持定时调度线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface ScheduledExecutorService extends ExecutorService {

public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);

}

其扩展的接口提供以下能力:

  • schedule 方法可以在指定的延时后执行一个 Runnable 或者 Callable 任务。
  • scheduleAtFixedRate 方法和 scheduleWithFixedDelay 方法可以按照指定时间间隔,定期执行任务。

ThreadPoolExecutor

java.uitl.concurrent.ThreadPoolExecutor 类是 Executor 框架中最核心的类。

构造方法

ThreadPoolExecutor 有四个构造方法,前三个都是基于第四个实现。第四个构造方法定义如下:

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,// 线程池的核心线程数量
int maximumPoolSize,// 线程池的最大线程数
long keepAliveTime,// 当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,// 时间单位
BlockingQueue<Runnable> workQueue,// 任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,// 线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler// 拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {// 略}

参数说明:

  • corePoolSize - 表示线程池保有的最小线程数
  • maximumPoolSize - 表示线程池允许创建的最大线程数
    • 如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。
    • 值得注意的是:如果使用了无界的任务队列这个参数就没什么效果。
  • keepAliveTime & unit - 表示线程保持活动的时间。如果一个线程空闲了keepAliveTime & unit 这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了。
  • workQueue - 等待执行的任务队列。用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。
    • ArrayBlockingQueue - 有界阻塞队列
    • LinkedBlockingQueue - 无界阻塞队列
    • SynchronousQueue - 不会保存提交的任务,而是将直接新建一个线程来执行新来的任务
    • DelayedWorkQueue - 延迟阻塞队列。
    • PriorityBlockingQueue - 具有优先级的无界阻塞队列
  • threadFactory - 线程工厂。线程工程用于自定义如何创建线程。
  • handler - 拒绝策略。它是 RejectedExecutionHandler 类型的变量。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。线程池支持以下策略:
    • AbortPolicy - 丢弃任务并抛出异常。这也是默认策略,会抛出 RejectedExecutionException
    • DiscardPolicy - 丢弃任务但不抛出异常
    • DiscardOldestPolicy - 丢弃队列最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
    • CallerRunsPolicy - 提交任务的线程自己去执行该任务。
    • 如果以上策略都不能满足需要,也可以通过实现 RejectedExecutionHandler 接口来定制处理策略。如记录日志或持久化不能处理的任务。

重要字段

ThreadPoolExecutor 有以下重要字段:

1
2
3
4
5
6
7
8
9
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

参数说明:

  • ctl - 用于控制线程池的运行状态和线程池中的有效线程数量。它包含两部分的信息:
    • 线程池的运行状态 (runState)
    • 线程池内有效线程的数量 (workerCount)
    • 可以看到,ctl 使用了 Integer 类型来保存,高 3 位保存 runState,低 29 位保存 workerCountCOUNT_BITS 就是 29,CAPACITY 就是 1 左移 29 位减 1(29 个 1),这个常量表示 workerCount 的上限值,大约是 5 亿。
  • 运行状态 - 线程池一共有五种运行状态:
    • RUNNING - 运行状态。接受新任务,并且也能处理阻塞队列中的任务。
    • SHUTDOWN - 关闭状态。不接受新任务,但可以处理阻塞队列中的任务。
      • 在线程池处于 RUNNING 状态时,调用 shutdown 方法会使线程池进入到该状态。
      • finalize 方法在执行过程中也会调用 shutdown 方法进入该状态。
    • STOP - 停止状态。不接受新任务,也不处理队列中的任务。会中断正在处理任务的线程。在线程池处于 RUNNINGSHUTDOWN 状态时,调用 shutdownNow 方法会使线程池进入到该状态。
    • TIDYING - 整理状态。如果所有的任务都已终止了,workerCount (有效线程数) 为 0,线程池进入该状态后会调用 terminated 方法进入 TERMINATED 状态。
    • TERMINATED - 已终止状态。在 terminated 方法执行完后进入该状态。默认 terminated 方法中什么也没有做。进入 TERMINATED 的条件如下:
      • 线程池不是 RUNNING 状态;
      • 线程池状态不是 TIDYING 状态或 TERMINATED 状态;
      • 如果线程池状态是 SHUTDOWN 并且 workerQueue 为空;
      • workerCount 为 0;
      • 设置 TIDYING 状态成功。

其他重要方法

ThreadPoolExecutor 类中还有一些重要的方法:

  • submit - 类似于 execute,但是针对的是有返回值的线程。submit 方法是在 ExecutorService 中声明的方法,在 AbstractExecutorService 就已经有了具体的实现。ThreadPoolExecutor 直接复用 AbstractExecutorServicesubmit 方法。
  • shutdown - 不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务。
    • 将线程池切换到 SHUTDOWN 状态;
    • 并调用 interruptIdleWorkers 方法请求中断所有空闲的 worker;
    • 最后调用 tryTerminate 尝试结束线程池。
  • shutdownNow - 立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。与 shutdown 方法类似,不同的地方在于:
    • 设置状态为 STOP
    • 中断所有工作线程,无论是否是空闲的;
    • 取出阻塞队列中没有被执行的任务并返回。
  • isShutdown - 调用了 shutdownshutdownNow 方法后,isShutdown 方法就会返回 true。
  • isTerminaed - 当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。
  • setCorePoolSize - 设置核心线程数大小。
  • setMaximumPoolSize - 设置最大线程数大小。
  • getTaskCount - 线程池已经执行的和未执行的任务总数;
  • getCompletedTaskCount - 线程池已完成的任务数量,该值小于等于 taskCount
  • getLargestPoolSize - 线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了 maximumPoolSize
  • getPoolSize - 线程池当前的线程数量;
  • getActiveCount - 当前线程池中正在执行任务的线程数量。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ThreadPoolExecutorDemo {

public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 500, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

for (int i = 0; i < 100; i++) {
threadPoolExecutor.execute(new MyThread());
String info = String.format("线程池中线程数目:%s,队列中等待执行的任务数目:%s,已执行玩别的任务数目:%s",
threadPoolExecutor.getPoolSize(),
threadPoolExecutor.getQueue().size(),
threadPoolExecutor.getCompletedTaskCount());
System.out.println(info);
}
threadPoolExecutor.shutdown();
}

static class MyThread implements Runnable {

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}

}

}

线程池原理

默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。提交任务可以使用 execute 方法,它是 ThreadPoolExecutor 的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 用于控制线程池的运行状态和线程池中的有效线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

// 获取 ctl 中存储的线程池状态信息
int c = ctl.get();

// 线程池执行可以分为 3 个步骤
// 1. 若工作线程数小于核心线程数,则尝试启动一个新的线程来执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

// 2. 如果任务可以成功地加入队列,还需要再次确认是否需要添加新的线程(因为可能自从上次检查以来已经有线程死亡)或者检查线程池是否已经关闭
// -> 如果是后者,则可能需要回滚入队操作;
// -> 如果是前者,则可能需要启动新的线程
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果任务无法加入队列,则尝试添加一个新的线程
// 如果添加新线程失败,说明线程池已经关闭或者达到了容量上限,此时将拒绝该任务
else if (!addWorker(command, false))
reject(command);
}

execute 方法工作流程如下:

  1. 如果 workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
  2. 如果 workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
  3. 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
  4. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务,默认的处理方式是直接抛异常。

execute 方法中,多次调用 addWorker 方法。addWorker 这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// 全局锁,并发操作必备
private final ReentrantLock mainLock = new ReentrantLock();
// 跟踪线程池的最大大小,只有在持有全局锁 mainLock 的前提下才能访问此集合
private int largestPoolSize;
// 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁 mainLock 的前提下才能访问此集合
private final HashSet<Worker> workers = new HashSet<>();
//获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//判断线程池的状态是否为 Running
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

/**
* 添加新的工作线程到线程池
* @param firstTask 要执行
* @param core 参数为 true 的话表示使用线程池的基本大小,为 false 使用线程池最大大小
* @return 添加成功就返回 true 否则返回 false
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//这两句用来获取线程池的状态
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
//获取线程池中工作的线程的数量
int wc = workerCountOf(c);
// core 参数为 false 的话表明队列也满了,线程池大小变为 maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//原子操作将 workcount 的数量加 1
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果线程的状态改变了就再次执行上述操作
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 标记工作线程是否启动成功
boolean workerStarted = false;
// 标记工作线程是否创建成功
boolean workerAdded = false;
Worker w = null;
try {

w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取线程池状态
int rs = runStateOf(ctl.get());
//rs < SHUTDOWN 如果线程池状态依然为 RUNNING, 并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中
//(rs=SHUTDOWN && firstTask == null) 如果线程池状态小于 STOP,也就是 RUNNING 或者 SHUTDOWN 状态下,同时传入的任务实例 firstTask 为 null,则需要添加到工作线程集合和启动新的 Worker
// firstTask == null 证明只新建线程而不执行任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
//更新当前工作线程的最大容量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 工作线程是否启动成功
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
//// 如果成功添加工作线程,则调用 Worker 内部的线程实例 t 的 Thread#start() 方法启动真实的线程实例
if (workerAdded) {
t.start();
/// 标记线程启动成功
workerStarted = true;
}
}
} finally {
// 线程启动失败,需要从工作线程中移除对应的 Worker
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

Executors

Executors 类中提供了几种内置的 ThreadPoolExecutor 实现:

  • FixedThreadPool:固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • SingleThreadExecutor: 只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
  • CachedThreadPool: 可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
  • ScheduledThreadPool:给定的延迟后运行任务或者定期执行任务的线程池。

注意:

《阿里巴巴 Java 开发手册》中明确要求不要使用 Executors 中的内置化线程池。

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors 返回的线程池对象的弊端如下:

  1. FixedThreadPoolSingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
  2. CachedThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
  3. ScheduledThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

FixedThreadPool

FixedThreadPool 是一个可重用的、线程数固定的线程池Executors 类中的相关源码:

1
2
3
4
5
6
7
8
9
10
11
12
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

FixedThreadPoolcorePoolSizemaximumPoolSize 都被设置为 nThreads,这个 nThreads 参数是我们使用的时候自己传递的。

即使 maximumPoolSize 的值比 corePoolSize 大,也至多只会创建 corePoolSize 个线程。这是因为FixedThreadPool 使用的是容量为 Integer.MAX_VALUELinkedBlockingQueue(无界队列),队列永远不会被放满。

FixedThreadPool 的问题:

FixedThreadPool 使用无界队列 LinkedBlockingQueue(队列的容量为 Integer.MAX_VALUE)作为线程池的工作队列会对线程池带来如下影响:

  1. 当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize
  2. 由于使用无界队列时 maximumPoolSize 将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建 FixedThreadPool的源码可以看出创建的 FixedThreadPoolcorePoolSizemaximumPoolSize 被设置为同一个值。
  3. 由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数;
  4. 运行中的 FixedThreadPool(未执行 shutdown()shutdownNow())不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)。

SingleThreadExecutor

SingleThreadExecutor 是只有一个线程的线程池。SingleThreadExecutor 只会创建唯一的工作线程来执行任务,保证所有任务按照指定顺序 (FIFO, LIFO, 优先级)执行。 如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它

Executors 类中的相关源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

SingleThreadExecutor 的问题:

SingleThreadExecutorFixedThreadPool 一样,使用的都是容量为 Integer.MAX_VALUELinkedBlockingQueue(无界队列)作为线程池的工作队列。SingleThreadExecutor 使用无界队列作为线程池的工作队列会对线程池带来的影响与 FixedThreadPool 相同。说简单点,就是可能会导致 OOM。

CachedThreadPool

CachedThreadPool 是一个会根据需要创建新线程的线程池。

  • 如果线程池大小超过处理任务所需要的线程数,就会回收部分空闲的线程;
  • 如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为 1 分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
  • 此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说 JVM)能够创建的最大线程大小。 因此,使用 CachedThreadPool 时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。
1
2
3
4
5
6
7
8
9
10
11
12
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

CachedThreadPoolcorePoolSize 被设置为空(0),maximumPoolSize被设置为 Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。

CachedThreadPool 的执行流程:

  1. 首先执行 SynchronousQueue.offer(Runnable task) 提交任务到任务队列。如果当前 maximumPool 中有闲线程正在执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行 offer 操作与空闲线程执行的 poll 操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成,否则执行下面的步骤 2;
  2. 当初始 maximumPool 为空,或者 maximumPool 中没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤 1 将失败,此时 CachedThreadPool 会创建新线程执行任务,execute 方法执行完成;

CachedThreadPool 的问题:

CachedThreadPool 使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

ScheduleThreadPool

ScheduledThreadPool 用来在给定的延迟后运行任务或者定期执行任务。这个在实际项目中基本不会被用到,也不推荐使用。

1
2
3
4
5
6
7
8
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

ScheduledThreadPool 是通过 ScheduledThreadPoolExecutor 创建的,使用的DelayedWorkQueue(延迟阻塞队列)作为线程池的任务队列。

DelayedWorkQueue 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。DelayedWorkQueue 添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE,所以最多只能创建核心线程数的线程。

ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,所以创建 ScheduledThreadExecutor 本质也是创建一个 ThreadPoolExecutor 线程池,只是传入的参数不相同。

ScheduledThreadPoolExecutor 和 Timer 对比

  • Timer 对系统时钟的变化敏感,ScheduledThreadPoolExecutor不是;
  • Timer 只有一个执行线程,因此长时间运行的任务可以延迟其他任务。 ScheduledThreadPoolExecutor 可以配置任意数量的线程。 此外,如果你想(通过提供 ThreadFactory),你可以完全控制创建的线程;
  • TimerTask 中抛出的运行时异常会杀死一个线程,从而导致 Timer 死机即计划任务将不再运行。ScheduledThreadExecutor 不仅捕获运行时异常,还允许您在需要时处理它们(通过重写 afterExecute 方法ThreadPoolExecutor)。抛出异常的任务将被取消,但其他任务将继续运行。

WorkStealingPool

WorkStealingPool 是 JDK8 才引入的。

其内部会构建 ForkJoinPool,利用 Work-Stealing 算法,并行地处理任务,不保证处理顺序。

线程池最佳实践

计算线程数量

一般多线程执行的任务类型可以分为 CPU 密集型和 I/O 密集型,根据不同的任务类型,我们计算线程数的方法也不一样。

CPU 密集型任务:这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

I/O 密集型任务:这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

建议使用有界阻塞队列

不建议使用 Executors 的最重要的原因是:Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列

《阿里巴巴 Java 开发手册》中提到,禁止使用这些方法来创建线程池,而应该手动 new ThreadPoolExecutor 来创建线程池。制订这条规则是因为容易导致生产事故,最典型的就是 newFixedThreadPoolnewCachedThreadPool,可能因为资源耗尽导致 OOM 问题。

【示例】newFixedThreadPool OOM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
printStats(threadPool);
for (int i = 0; i < 100000000; i++) {
threadPool.execute(() -> {
String payload = IntStream.rangeClosed(1, 1000000)
.mapToObj(__ -> "a")
.collect(Collectors.joining("")) + UUID.randomUUID().toString();
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
}
log.info(payload);
});
}

threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);

newFixedThreadPool 使用的工作队列是 LinkedBlockingQueue ,而默认构造方法的 LinkedBlockingQueue 是一个 Integer.MAX_VALUE 长度的队列,可以认为是无界的。如果任务较多并且执行较慢的话,队列可能会快速积压,撑爆内存导致 OOM。

【示例】newCachedThreadPool OOM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
printStats(threadPool);
for (int i = 0; i < 100000000; i++) {
threadPool.execute(() -> {
String payload = UUID.randomUUID().toString();
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
}
log.info(payload);
});
}
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);

newCachedThreadPool 的最大线程数是 Integer.MAX_VALUE,可以认为是没有上限的,而其工作队列 SynchronousQueue 是一个没有存储空间的阻塞队列。这意味着,只要有请求到来,就必须找到一条工作线程来处理,如果当前没有空闲的线程就再创建一条新的。

如果大量的任务进来后会创建大量的线程。我们知道线程是需要分配一定的内存空间作为线程栈的,比如 1MB,因此无限制创建线程必然会导致 OOM。

监测线程池运行状态

可以通过一些手段来检测线程池的运行状态比如 SpringBoot 中的 Actuator 组件。

除此之外,我们还可以利用 ThreadPoolExecutor 的相关 API 做一个简陋的监控。从下图可以看出, ThreadPoolExecutor提供了获取线程池当前的线程数和活跃线程数、已经执行完成的任务数、正在排队中的任务数等等。

下面是一个简单的 Demo。printThreadPoolStatus()会每隔一秒打印出线程池的线程数、活跃线程数、完成的任务数、以及队列中的任务数。

1
2
3
4
5
6
7
8
9
10
11
public static void printThreadPoolStatus(ThreadPoolExecutor threadPool) {
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, createThreadFactory("print-images/thread-pool-status", false));
scheduledExecutorService.scheduleAtFixedRate(() -> {
log.info("=========================");
log.info("ThreadPool Size: [{}]", threadPool.getPoolSize());
log.info("Active Threads: {}", threadPool.getActiveCount());
log.info("Number of Tasks : {}", threadPool.getCompletedTaskCount());
log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
log.info("=========================");
}, 0, 1, TimeUnit.SECONDS);
}

线程池和 ThreadLocal

线程池和 ThreadLocal共用,可能会导致线程从ThreadLocal获取到的是旧值/脏数据。这是因为线程池会复用线程对象,与线程对象绑定的类的静态属性 ThreadLocal 变量也会被重用,这就导致一个线程可能获取到其他线程的ThreadLocal 值。

不要以为代码中没有显示使用线程池就不存在线程池了,像常用的 Web 服务器 Tomcat 处理任务为了提高并发量,就使用到了线程池,并且使用的是基于原生 Java 线程池改进完善得到的自定义线程池。

当然了,你可以将 Tomcat 设置为单线程处理任务。不过,这并不合适,会严重影响其处理任务的速度。

1
server.tomcat.max-threads=1

解决上述问题比较建议的办法是使用阿里巴巴开源的 TransmittableThreadLocal(TTL)。TransmittableThreadLocal类继承并加强了 JDK 内置的InheritableThreadLocal类,在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题。

TransmittableThreadLocal 项目地址:https://github.com/alibaba/transmittable-thread-localopen in new window

重要任务应该自定义拒绝策略

使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。

动态线程池

美团技术团队在 《Java 线程池实现原理及其在美团业务中的实践》 这篇文章中介绍到对线程池参数实现可自定义配置的思路和方法。

美团技术团队的思路是主要对线程池的核心参数实现自定义可配置。这三个核心参数是:

  • corePoolSize - 核心线程数线程数定义了最小可以同时运行的线程数量。
  • maximumPoolSize - 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue - 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

JDK 原生线程池 ThreadPoolExecutor 提供了如下几个 public 的 setter 方法,如下图所示:

图 19 JDK 线程池参数设置接口

JDK 允许线程池使用方通过 ThreadPoolExecutor 的实例来动态设置线程池的核心策略。

重点是基于这几个 public 方法,我们只需要维护 ThreadPoolExecutor 的实例,并且在需要修改的时候拿到实例修改其参数即可。基于以上的思路,美团实现了线程池参数的动态化、线程池参数在管理平台可配置可修改,其效果图如下图所示:

图 21 可动态修改线程池参数

如果我们的项目也想要实现这种效果的话,可以借助现成的开源项目:

  • Hippo4jopen - 异步线程池框架,支持线程池动态变更&监控&报警,无需修改代码轻松引入。支持多种使用模式,轻松引入,致力于提高系统运行保障能力。
  • Dynamic TPopen - 轻量级动态线程池,内置监控告警功能,集成三方中间件线程池管理,基于主流配置中心(已支持 Nacos、Apollo,Zookeeper、Consul、Etcd,可通过 SPI 自定义实现)。

参考资料

Maven 插件之代码检查

maven-checkstyle-plugin

maven-checkstyle-plugin,用于检测代码中不符合规范的地方。

定义 checkstyle.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
<!DOCTYPE module PUBLIC
"-//Checkstyle//DTD Checkstyle Configuration 1.3//EN"
"https://checkstyle.org/dtds/configuration_1_3.dtd">

<!-- Generated by RHY @will_awoke -->

<module name="Checker">

<property name="charset" value="UTF-8"/>
<property name="severity" value="warning"/>

<!-- Checks for Size Violations. -->
<!-- 检查文件的长度(行) default max=2000 -->
<module name="FileLength">
<property name="max" value="2500"/>
</module>

<!-- Checks that property files contain the same keys. -->
<!-- 检查**.properties配置文件 是否有相同的key
<module name="Translation">
</module>
-->

<module name="TreeWalker">

<!-- Checks for imports -->
<!-- 必须导入类的完整路径,即不能使用*导入所需的类 -->
<module name="AvoidStarImport"/>

<!-- 检查是否从非法的包中导入了类 illegalPkgs: 定义非法的包名称-->
<module name="IllegalImport"/> <!-- defaults to sun.* packages -->

<!-- 检查是否导入了不必显示导入的类-->
<module name="RedundantImport"/>

<!-- 检查是否导入的包没有使用-->
<module name="UnusedImports"/>

<!-- Checks for whitespace
<module name="EmptyForIteratorPad"/>
<module name="MethodParamPad"/>
<module name="NoWhitespaceAfter"/>
<module name="NoWhitespaceBefore"/>
<module name="OperatorWrap"/>
<module name="ParenPad"/>
<module name="TypecastParenPad"/>
<module name="WhitespaceAfter"/>
<module name="WhitespaceAround"/>
-->

<!-- 检查类和接口的javadoc 默认不检查author 和version tags
authorFormat: 检查author标签的格式
versionFormat: 检查version标签的格式
scope: 可以检查的类的范围,例如:public只能检查public修饰的类,private可以检查所有的类
excludeScope: 不能检查的类的范围,例如:public,public的类将不被检查,但访问权限小于public的类仍然会检查,其他的权限以此类推
tokens: 该属性适用的类型,例如:CLASS_DEF,INTERFACE_DEF -->
<module name="JavadocType">
<property name="authorFormat" value="\S"/>
<property name="scope" value="protected"/>
<property name="tokens" value="CLASS_DEF,INTERFACE_DEF"/>
</module>

<!-- 检查方法的javadoc的注释
scope: 可以检查的方法的范围,例如:public只能检查public修饰的方法,private可以检查所有的方法
allowMissingParamTags: 是否忽略对参数注释的检查
allowMissingThrowsTags: 是否忽略对throws注释的检查
allowMissingReturntags: 是否忽略对return注释的检查 -->
<module name="JavadocMethod">
<property name="scope" value="private"/>
<property name="allowMissingParamTags" value="false"/>
<property name="allowMissingThrowsTags" value="false"/>
<property name="allowMissingReturnTag" value="false"/>
<property name="tokens" value="METHOD_DEF"/>
<property name="allowUndeclaredRTE" value="true"/>
<property name="allowThrowsTagsForSubclasses" value="true"/>
<!--允许get set 方法没有注释-->
<property name="allowMissingPropertyJavadoc" value="true"/>
</module>

<!-- 检查类变量的注释
scope: 检查变量的范围,例如:public只能检查public修饰的变量,private可以检查所有的变量 -->
<module name="JavadocVariable">
<property name="scope" value="private"/>
</module>

<!--option: 定义左大括号'{'显示位置,eol在同一行显示,nl在下一行显示
maxLineLength: 大括号'{'所在行行最多容纳的字符数
tokens: 该属性适用的类型,例:CLASS_DEF,INTERFACE_DEF,METHOD_DEF,CTOR_DEF -->
<module name="LeftCurly">
<property name="option" value="nl"/>
</module>

<!-- NeedBraces 检查是否应该使用括号的地方没有加括号
tokens: 定义检查的类型 -->
<module name="NeedBraces"/>

<!-- Checks the placement of right curly braces ('}') for else, try, and catch tokens. The policy to verify is specified using property option.
option: 右大括号是否单独一行显示
tokens: 定义检查的类型 -->
<module name="RightCurly">
<property name="option" value="alone"/>
</module>

<!-- 检查在重写了equals方法后是否重写了hashCode方法 -->
<module name="EqualsHashCode"/>

<!-- Checks for illegal instantiations where a factory method is preferred.
Rationale: Depending on the project, for some classes it might be preferable to create instances through factory methods rather than calling the constructor.
A simple example is the java.lang.Boolean class. In order to save memory and CPU cycles, it is preferable to use the predefined constants TRUE and FALSE. Constructor invocations should be replaced by calls to Boolean.valueOf().
Some extremely performance sensitive projects may require the use of factory methods for other classes as well, to enforce the usage of number caches or object pools. -->
<module name="IllegalInstantiation">
<property name="classes" value="java.lang.Boolean"/>
</module>

<!-- Checks for Naming Conventions. 命名规范 -->
<!-- local, final variables, including catch parameters -->
<module name="LocalFinalVariableName"/>

<!-- local, non-final variables, including catch parameters-->
<module name="LocalVariableName"/>

<!-- static, non-final fields -->
<module name="StaticVariableName">
<property name="format" value="(^[A-Z0-9_]{0,19}$)"/>
</module>

<!-- packages -->
<module name="PackageName">
<property name="format" value="^[a-z]+(\.[a-z][a-z0-9]*)*$"/>
</module>

<!-- classes and interfaces -->
<module name="TypeName">
<property name="format" value="(^[A-Z][a-zA-Z0-9]{0,19}$)"/>
</module>

<!-- methods -->
<module name="MethodName">
<property name="format" value="(^[a-z][a-zA-Z0-9]{0,19}$)"/>
</module>

<!-- non-static fields -->
<module name="MemberName">
<property name="format" value="(^[a-z][a-z0-9][a-zA-Z0-9]{0,19}$)"/>
</module>

<!-- parameters -->
<module name="ParameterName">
<property name="format" value="(^[a-z][a-zA-Z0-9_]{0,19}$)"/>
</module>

<!-- constants (static, final fields) -->
<module name="ConstantName">
<property name="format" value="(^[A-Z0-9_]{0,19}$)"/>
</module>

<!-- 代码缩进 -->
<module name="Indentation">
</module>

<!-- Checks for redundant exceptions declared in throws clause such as duplicates, unchecked exceptions or subclasses of another declared exception.
检查是否抛出了多余的异常
<module name="RedundantThrows">
<property name="logLoadErrors" value="true"/>
<property name="suppressLoadErrors" value="true"/>
</module>
-->

<!-- Checks for overly complicated boolean expressions. Currently finds code like if (b == true), b || true, !false, etc.
检查boolean值是否冗余的地方
Rationale: Complex boolean logic makes code hard to understand and maintain. -->
<module name="SimplifyBooleanExpression"/>

<!-- Checks for overly complicated boolean return statements. For example the following code
检查是否存在过度复杂的boolean返回值
if (valid())
return false;
else
return true;
could be written as
return !valid();
The Idea for this Check has been shamelessly stolen from the equivalent PMD rule. -->
<module name="SimplifyBooleanReturn"/>

<!-- Checks that a class which has only private constructors is declared as final.只有私有构造器的类必须声明为final-->
<module name="FinalClass"/>

<!-- Make sure that utility classes (classes that contain only static methods or fields in their API) do not have a public constructor.
确保Utils类(只提供static方法和属性的类)没有public构造器。
Rationale: Instantiating utility classes does not make sense. Hence the constructors should either be private or (if you want to allow subclassing) protected. A common mistake is forgetting to hide the default constructor.
If you make the constructor protected you may want to consider the following constructor implementation technique to disallow instantiating subclasses:
public class StringUtils // not final to allow subclassing
{
protected StringUtils() {
throw new UnsupportedOperationException(); // prevents calls from subclass
}
public static int count(char c, String s) {
// ...
}
}
<module name="HideUtilityClassConstructor"/>
-->

<!-- Checks visibility of class members. Only static final members may be public; other class members must be private unless property protectedAllowed or packageAllowed is set.
检查class成员属性可见性。只有static final 修饰的成员是可以public的。其他的成员属性必需是private的,除非属性protectedAllowed或者packageAllowed设置了true.
Public members are not flagged if the name matches the public member regular expression (contains "^serialVersionUID$" by default). Note: Checkstyle 2 used to include "^f[A-Z][a-zA-Z0-9]*$" in the default pattern to allow CMP for EJB 1.1 with the default settings. With EJB 2.0 it is not longer necessary to have public access for persistent fields, hence the default has been changed.
Rationale: Enforce encapsulation. 强制封装 -->
<module name="VisibilityModifier"/>

<!-- 每一行只能定义一个变量 -->
<module name="MultipleVariableDeclarations">
</module>

<!-- Checks the style of array type definitions. Some like Java-style: public static void main(String[] args) and some like C-style: public static void main(String args[])
检查再定义数组时,采用java风格还是c风格,例如:int[] num是java风格,int num[]是c风格。默认是java风格-->
<module name="ArrayTypeStyle">
</module>

<!-- Checks that there are no "magic numbers", where a magic number is a numeric literal that is not defined as a constant. By default, -1, 0, 1, and 2 are not considered to be magic numbers.
<module name="MagicNumber">
</module>
-->

<!-- A check for TODO: comments. Actually it is a generic regular expression matcher on Java comments. To check for other patterns in Java comments, set property format.
检查是否存在TODO(待处理) TODO是javaIDE自动生成的。一般代码写完后要去掉。
-->
<module name="TodoComment"/>

<!-- Checks that long constants are defined with an upper ell. That is ' L' and not 'l'. This is in accordance to the Java Language Specification, Section 3.10.1.
检查是否在long类型是否定义了大写的L.字母小写l和数字1(一)很相似。
looks a lot like 1. -->
<module name="UpperEll"/>

<!-- Checks that switch statement has "default" clause. 检查switch语句是否有‘default’从句
Rationale: It's usually a good idea to introduce a default case in every switch statement.
Even if the developer is sure that all currently possible cases are covered, this should be expressed in the default branch,
e.g. by using an assertion. This way the code is protected aginst later changes, e.g. introduction of new types in an enumeration type. -->
<module name="MissingSwitchDefault"/>

<!--检查switch中case后是否加入了跳出语句,例如:return、break、throw、continue -->
<module name="FallThrough"/>

<!-- Checks the number of parameters of a method or constructor. max default 7个. -->
<module name="ParameterNumber">
<property name="max" value="5"/>
</module>

<!-- 每行字符数 -->
<module name="LineLength">
<property name="max" value="200"/>
</module>

<!-- Checks for long methods and constructors. max default 150行. max=300 设置长度300 -->
<module name="MethodLength">
<property name="max" value="300"/>
</module>

<!-- ModifierOrder 检查修饰符的顺序,默认是 public,protected,private,abstract,static,final,transient,volatile,synchronized,native -->
<module name="ModifierOrder">
</module>

<!-- 检查是否有多余的修饰符,例如:接口中的方法不必使用public、abstract修饰 -->
<module name="RedundantModifier">
</module>

<!--- 字符串比较必须使用 equals() -->
<module name="StringLiteralEquality">
</module>

<!-- if-else嵌套语句个数 最多4层 -->
<module name="NestedIfDepth">
<property name="max" value="3"/>
</module>

<!-- try-catch 嵌套语句个数 最多2层 -->
<module name="NestedTryDepth">
<property name="max" value="2"/>
</module>

<!-- 返回个数 -->
<module name="ReturnCount">
<property name="max" value="5"/>
<property name="format" value="^$"/>
</module>

</module>
</module>

配置 pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

<project>
...
<properties>
<checkstyle.config.location>config/maven_checks.xml</checkstyle.config.location>
</properties>
...
<reporting>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.0</version>
<executions>
<execution>
<!-- 绑定pmd:pmd到validate生命周期,在validate时会自动进行代码规范检查 -->
<id>validate</id>
<phase>validate</phase>
<configuration>
<!-- 配置文件的路径,在style文件夹下 -->
<configLocation>style/checkstyle.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<includeTestSourceDirectory>false</includeTestSourceDirectory>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jxr-plugin</artifactId>
<version>2.3</version>
</plugin>
</plugins>
</reporting>
...
</project>

其中可以修改使用的检查规则文件路径,插件默认提供了四个规则文件可以直接使用,无需手动下载:

  • config/sun_checks.xml - Sun Microsystems Definition (default).
  • config/maven_checks.xml - Maven Development Definitions.
  • config/turbine_checks.xml - Turbine Development Definitions.
  • config/avalon_checks.xml - Avalon Development Definitions.

配置好后,可以执行 mvn clean checkstyle:check 检查代码。

maven-pmd-plugin

maven-pmd-plugin 是阿里编程规范检查插件。

配置 pom.xml

参考 https://github.com/alibaba/p3c/blob/master/p3c-pmd/pom.xml 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
  <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-pmd-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<sourceEncoding>${project.build.sourceEncoding}</sourceEncoding>
<targetJdk>${maven.compiler.target}</targetJdk>
<printFailingErrors>true</printFailingErrors>
<rulesets>
<ruleset>rulesets/java/ali-comment.xml</ruleset>
<ruleset>rulesets/java/ali-concurrent.xml</ruleset>
<ruleset>rulesets/java/ali-constant.xml</ruleset>
<ruleset>rulesets/java/ali-exception.xml</ruleset>
<ruleset>rulesets/java/ali-flowcontrol.xml</ruleset>
<ruleset>rulesets/java/ali-naming.xml</ruleset>
<ruleset>rulesets/java/ali-oop.xml</ruleset>
<ruleset>rulesets/java/ali-orm.xml</ruleset>
<ruleset>rulesets/java/ali-other.xml</ruleset>
<ruleset>rulesets/java/ali-set.xml</ruleset>
</rulesets>
<printFailingErrors>true</printFailingErrors>
</configuration>
<executions>
<execution>
<phase>verify</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>com.alibaba.p3c</groupId>
<artifactId>p3c-pmd</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
</plugin>
</plugins>

配置好后,可以执行 mvn clean pmd:check 检查代码。

参考资料

系统测试架构

软件测试描述一种用来促进鉴定软件的正确性、完整性、安全性和质量的过程。软件测试的经典定义是:在规定的条件下对程序进行操作,以发现程序错误,衡量软件质量,并对其是否能满足设计要求进行评估的过程。

现代软件开发项目中,分工明确,基本上都会有研发、测试、QA 等角色。不同角色由于关注的视角不同,测试目标和测试方法也不完全相同。本文主要从研发、测试的视角去考量软件测试技术。

注意:

  • 为了方便,只有测试人员需要关注的测试点用【测试】标注;
  • 而只有研发人员需要关注的测试点用【研发】标注;
  • 都需要关注的测试点则不作标注。

测试方法分类

从测试设计方法分类

  • 黑盒测试【测试】 - 把软件系统当作一个“黑箱”,无法了解或使用系统的内部结构及知识。从软件的行为,而不是内部结构出发来设计测试。
  • 白盒测试【研发】 - 设计者可以看到软件系统的内部结构,并且使用软件的内部知识来指导测试数据及方法的选择。
  • 灰盒测试 - 介于黑盒和白盒之间。

小结:

  • 黑河测试通常针对的是软件的行为或功能,一般是测试人员主要关注的。
  • 白盒测试通常则需要对软件有一定程度的了解,一般是开发人员所关注的。
  • 灰盒测试通常是为了测试软件在特定的场景下的表现,而非主场景。

从测试的目的分类

功能测试

  • 单元测试(Unit Test) - 在最低粒度的功能/参数上验证程序的准确性,比如测试一个函数的正确性。【研发】
  • 功能测试(Functional Test) - 验证模块的功能。【测试】
  • 集成测试(Integration Test) - 验证几个互相有依赖关系的模块的功能。【测试】
  • 场景测试(Scenario Test)- 验证几个模块是否能完成一个用户场景。【测试】
  • 系统测试(System Test) - 对于整个系统功能的测试。【测试】
  • Alpha 测试 - 软件测试人员在真实用户环境中对软件进行全面的测试。【测试】
  • Beta 测试 - 也叫公测,是真实的用户在真实的环境中进行的测试。

非功能测试

  • 压力测试(Stress test) - 验证软件在超过负载设计的情况下仍能返回正确的结果,没有崩溃
  • 负载测试(Load test) - 测试软件在负载情况下能否正常工作
  • 性能测试(Performance test) - 测试软件的效能,是否提供满意的服务质量。
    • 常用技术:JMeter、JMH。
  • 软件辅助功能测试(Accessibility test) - 测试软件是否向残疾用户提供足够的辅助功能
  • 本地化/全球化测试(Localization/Globalization
  • 兼容性测试(Compatibility Test)
  • 配置测试(Configuration Test) - 测试软件在各种配置下能否正常工作
  • 可用性测试(Usability Test) – 测试软件是否好用
  • 安全性测试(Security Test)

参考资料

Intellij IDEA 快速入门

快捷键

核心快捷键

IntelliJ IDEA 作为一个以快捷键为中心的 IDE,为大多数操作建议了键盘快捷键。在这个主题中,您可以找到最不可缺少的列表,使 IntelliJ IDEA 轻松实现第一步。

核心快捷键表:

操作 快捷键
根据名称查找操作 Ctrl+Shift+A
显示可用 意图操作 列表 Alt+Enter
切换视图 (Project,Structure, etc.). Alt+F1
切换工具窗口和在编辑器中打开的文件 Ctrl+Tab
显示 导航栏. Alt+Home
插入代码模板. Ctrl+J
在周围插入代码模板. Ctrl+Alt+J
Edit an item from the Project or another tree view. F4
注释 Ctrl+/ Ctrl+Shift+/
根据名称查找类或文件. Ctrl+N Ctrl+Shift+N
拷贝当前行或指定的行. Ctrl+D
增加或减少选中的表达式. Ctrl+W and Ctrl+Shift+W
在当前文件查找或替换. Ctrl+F Ctrl+R
在项目中或指定的目录中查找或替换 Ctrl+Shift+F Ctrl+Shift+R
全局搜索 双击 Shift
快速查看选中对象的引用. Ctrl+Shift+F7
展开或折叠编辑器中的代码块. Ctrl+NumPad Plus Ctrl+NumPad -
调用代码完成. Ctrl+Space
智能声明完成. Ctrl+Shift+Enter
智能补全代码 Ctrl+Shift+Space
显示可用的重构方法列表 Ctrl+Shift+Alt+T

快捷键分类

Tradition

快捷键 介绍
Ctrl + Z 撤销
Ctrl + Shift + Z 取消撤销
Ctrl + X 剪切
Ctrl + C 复制
Ctrl + S 保存
Tab 缩进
Shift + Tab 取消缩进
Shift + Home/End 选中光标到当前行头位置/行尾位置
Ctrl + Home/End 跳到文件头/文件尾

Editing

快捷键 介绍
Ctrl + Space 基础代码补全,默认在 Windows 系统上被输入法占用,需要进行修改,建议修改为 Ctrl + 逗号(必备)
Ctrl + Alt + Space 类名自动完成
Ctrl + Shift + Enter 自动结束代码,行末自动添加分号(必备)
Ctrl + P 方法参数提示显示
Ctrl + Q 光标所在的变量/类名/方法名等上面(也可以在提示补充的时候按),显示文档内容
Shift + F1 如果有外部文档可以连接外部文档
Ctrl + F1 在光标所在的错误代码处显示错误信息(必备)
Alt + Insert 代码自动生成,如生成对象的 set/get 方法,构造函数,toString() 等(必备)
Ctrl + O 选择可重写的方法
Ctrl + I 选择可继承的方法
Ctrl + Alt + T 对选中的代码弹出环绕选项弹出层(必备)
Ctrl + / 注释光标所在行代码,会根据当前不同文件类型使用不同的注释符号(必备)
Ctrl + Shift + / 代码块注释(必备)
Ctrl + W 递进式选择代码块。可选中光标所在的单词或段落,连续按会在原有选中的基础上再扩展选中范围(必备)
Ctrl + Shift + W 递进式取消选择代码块。可选中光标所在的单词或段落,连续按会在原有选中的基础上再扩展取消选中范围(必备)
Alt + Q 弹出一个提示,显示当前类的声明/上下文信息
Alt + Enter IntelliJ IDEA 根据光标所在问题,提供快速修复选择,光标放在的位置不同提示的结果也不同(必备)
Ctrl + Alt + L 格式化代码,可以对当前文件和整个包目录使用(必备)
Ctrl + Alt + O 优化导入的类,可以对当前文件和整个包目录使用(必备)
Ctrl + Alt + I 光标所在行 或 选中部分进行自动代码缩进,有点类似格式化
Ctrl + Shift + C 复制当前文件磁盘路径到剪贴板(必备)
Ctrl + Shift + V 弹出缓存的最近拷贝的内容管理器弹出层
Ctrl + Alt + Shift + C 复制参考信息
Ctrl + Alt + Shift + V 无格式黏贴(必备)
Ctrl + D 复制光标所在行 或 复制选择内容,并把复制内容插入光标位置下面(必备)
Ctrl + Y 删除光标所在行 或 删除选中的行(必备)
Ctrl + Shift + J 自动将下一行合并到当前行末尾(必备)
Shift + Enter 开始新一行。光标所在行下空出一行,光标定位到新行位置(必备)
Ctrl + Shift + U 对选中的代码进行大/小写轮流转换(必备)
Ctrl + Shift + ]/[ 选中从光标所在位置到它的底部/顶部的中括号位置(必备)
Ctrl + Delete 删除光标后面的单词或是中文句(必备)
Ctrl + BackSpace 删除光标前面的单词或是中文句(必备)
Ctrl + +/- 展开/折叠代码块
Ctrl + Shift + +/- 展开/折叠所有代码(必备)
Ctrl + F4 关闭当前编辑文件
Ctrl + Shift + Up/Down 光标放在方法名上,将方法移动到上一个/下一个方法前面,调整方法排序(必备)
Alt + Shift + Up/Down 移动光标所在行向上移动/向下移动(必备)
Ctrl + Shift + 左键单击 把光标放在某个类变量上,按此快捷键可以直接定位到该类中(必备)
Alt + Shift + 左键双击 选择被双击的单词/中文句,按住不放,可以同时选择其他单词/中文句(必备)
Ctrl + Shift + T 对当前类生成单元测试类,如果已经存在的单元测试类则可以进行选择(必备)

Search/Replace

快捷键 介绍
Double Shift 弹出 Search Everywhere 弹出层
F3 在查找模式下,定位到下一个匹配处
Shift + F3 在查找模式下,查找匹配上一个
Ctrl + F 在当前文件进行文本查找(必备)
Ctrl + R 在当前文件进行文本替换(必备)
Ctrl + Shift + F 根据输入内容查找整个项目 或 指定目录内文件(必备)
Ctrl + Shift + R 根据输入内容替换对应内容,范围为整个项目 或 指定目录内文件(必备)
快捷键 介绍
Alt + F7 查找光标所在的方法/变量/类被调用的地方
Ctrl + Alt + F7 显示使用的地方。寻找被该类或是变量被调用的地方,用弹出框的方式找出来
Ctrl + Shift + F7 高亮显示所有该选中文本,按 Esc 高亮消失(必备)

Compile and Run

快捷键 介绍
Ctrl + F9 执行 Make Project 操作
Ctrl + Shift + F9 编译选中的文件/包/Module
Shift + F9 Debug
Shift + F10 Run
Alt + Shift + F9 弹出 Debug 的可选择菜单
Alt + Shift + F10 弹出 Run 的可选择菜单

Debugging

快捷键 介绍
F7 在 Debug 模式下,进入下一步,如果当前行断点是一个方法,则进入当前方法体内,如果该方法体还有方法,则不会进入该内嵌的方法中
F8 在 Debug 模式下,进入下一步,如果当前行断点是一个方法,则不进入当前方法体内
Shift + F7 在 Debug 模式下,智能步入。断点所在行上有多个方法调用,会弹出进入哪个方法
Shift + F8 在 Debug 模式下,跳出,表现出来的效果跟 F9 一样
Alt + F8 在 Debug 模式下,选中对象,弹出可输入计算表达式调试框,查看该输入内容的调试结果
Alt + F9 在 Debug 模式下,执行到光标处
F9 在 Debug 模式下,恢复程序运行,但是如果该断点下面代码还有断点则停在下一个断点上
Ctrl + F8 在 Debug 模式下,设置光标当前行为断点,如果当前已经是断点则去掉断点
Ctrl + Shift + F8 在 Debug 模式下,指定断点进入条件
快捷键 介绍
Ctrl + N 跳转到类(必备)
Ctrl + Shift + N 跳转到文件(必备)
Ctrl + Alt + Shift + N 跳转到符号(必备)
Alt + Left/Right 切换当前已打开的窗口中的子视图,比如 Debug 窗口中有 Output、Debugger 等子视图,用此快捷键就可以在子视图中切换(必备)
F12 回到前一个工具窗口(必备)
ESC 从工具窗口进入代码文件窗口(必备)
Shift + ESC 隐藏当前 或 最后一个激活的工具窗口
Ctrl + G 跳转到当前文件的指定行处
Ctrl + E 显示最近打开的文件记录列表(必备)
Ctrl + Shift + E 显示最近编辑的文件记录列表(必备)
Ctrl + Alt + Left/Right 跳转到上一个/下一个操作的地方(必备)
Ctrl + Shift + Backspace 退回到上次修改的地方(必备)
Alt + F1 显示当前文件选择目标弹出层,弹出层中有很多目标可以进行选择(必备)
Ctrl + B/Ctrl + 左键单击 跳转到声明处
Ctrl + Alt + B 在某个调用的方法名上使用会跳到具体的实现处,可以跳过接口
Ctrl + Shift + B 跳转到类型声明处(必备)
Ctrl + Shift + I 快速查看光标所在的方法 或 类的定义
Ctrl + U 前往当前光标所在的方法的父类的方法/接口定义(必备)
Alt + Up/Down 跳转到当前文件的前一个/后一个方法(必备)
Ctrl + ]/[ 跳转到当前所在代码的花括号结束位置/开始位置
Ctrl + F12 弹出当前文件结构层,可以在弹出的层上直接输入,进行筛选
Ctrl + H 显示当前类的层次结构
Ctrl + Shift + H 显示方法层次结构
Ctrl + Alt + H 调用层次
F2/Shift + F2 跳转到下一个/上一个高亮错误 或 警告位置(必备)
F4 编辑源(必备)
Alt + Home 定位/显示到当前文件的 Navigation Bar
F11 添加书签(必备)
Ctrl + F11 选中文件/文件夹,使用助记符设定/取消书签(必备)
Shift + F11 弹出书签显示层(必备)
Alt + 1,2,3…9 显示对应数值的选项卡,其中 1 是 Project 用得最多(必备)
Ctrl + 1,2,3…9 定位到对应数值的书签位置(必备)

Refactoring

快捷键 介绍
Shift + F6 对文件/文件夹 重命名(必备)
Ctrl + Alt + Shift + T 打开重构菜单(必备)

VCS/Local History

快捷键 介绍
Ctrl + K 版本控制提交项目,需要此项目有加入到版本控制才可用
Ctrl + T 版本控制更新项目,需要此项目有加入到版本控制才可用
`Alt + `
Alt + Shift + C 查看最近操作项目的变化情况列表
Alt + Shift + N 选择/添加 task(必备)

Live Templates

快捷键 介绍
Ctrl + J 插入自定义动态代码模板(必备)
Ctrl + Alt + J 弹出模板选择窗口,将选定的代码加入动态模板中

General

快捷键 介绍
Ctrl + Tab 编辑窗口切换,如果在切换的过程又加按上 delete,则是关闭对应选中的窗口
Ctrl + Alt + Y 同步、刷新
Ctrl + Alt + S 打开 IntelliJ IDEA 系统设置(必备)
Ctrl + Alt + Shift + S 打开当前项目设置(必备)
Ctrl + Shift + A 查找动作/设置(必备)
Ctrl + Shift + F12 编辑器最大化(必备)
Alt + Shift + F 显示添加到收藏夹弹出层/添加到收藏夹
Alt + Shift + I 查看项目当前文件

Intellij IDEA 官方快捷键表

img

插件

推荐几个比较好用的插件

个性化

颜色主题

intellij-colors-solarized 个人觉得这种色彩搭配十分优雅

下载地址

FAQ

(1)运行时报错

Error running XXX. Command line is too long. Shorten the command line via JAR manifest or via a classpath file and rerun

解决方案:

找到 .idea/libraies/workspace.xml 中的 <component name="PropertiesComponent">

添加一行配置:

1
<property name="dynamic.classpath" value="true" />

参考资料