Dunwu Blog

大道至简,知易行难

Java 并发之锁

本文先阐述 Java 中各种锁的概念。

然后,重点介绍 Lock 和 Condition 两个接口及其实现。并发编程有两个核心问题:同步和互斥。

互斥,即同一时刻只允许一个线程访问共享资源;

同步,即线程之间如何通信、协作。

这两大问题,管程(sychronized)都是能够解决的。J.U.C 包还提供了 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题

并发锁简介

确保线程安全最常见的做法是利用锁机制(Locksychronized)来对共享数据做互斥同步,这样在同一个时刻,只有一个线程可以执行某个方法或者某个代码块,那么操作必然是原子性的,线程安全的。

在工作、面试中,经常会听到各种五花八门的锁,听的人云里雾里。锁的概念术语很多,它们是针对不同的问题所提出的,通过简单的梳理,也不难理解。

可重入锁

可重入锁,顾名思义,指的是线程可以重复获取同一把锁。即同一个线程在外层方法获取了锁,在进入内层方法会自动获取锁。

可重入锁可以在一定程度上避免死锁

  • ReentrantLockReentrantReadWriteLock 是可重入锁。这点,从其命名也不难看出。
  • synchronized 也是一个可重入锁

【示例】synchronized 的可重入示例

1
2
3
4
5
6
7
8
synchronized void setA() throws Exception{
Thread.sleep(1000);
setB();
}

synchronized void setB() throws Exception{
Thread.sleep(1000);
}

上面的代码就是一个典型场景:如果使用的锁不是可重入锁的话,setB 可能不会被当前线程执行,从而造成死锁。

【示例】ReentrantLock 的可重入示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Task {

private int value;
private final Lock lock = new ReentrantLock();

public Task() {
this.value = 0;
}

public int get() {
// 获取锁
lock.lock();
try {
return value;
} finally {
// 保证锁能释放
lock.unlock();
}
}

public void addOne() {
// 获取锁
lock.lock();
try {
// 注意:此处已经成功获取锁,进入 get 方法后,又尝试获取锁,
// 如果锁不是可重入的,会导致死锁
value = 1 + get();
} finally {
// 保证锁能释放
lock.unlock();
}
}

}

公平锁与非公平锁

  • 公平锁 - 公平锁是指 多线程按照申请锁的顺序来获取锁
  • 非公平锁 - 非公平锁是指 多线程不按照申请锁的顺序来获取锁 。这就可能会出现优先级反转(后来者居上)或者饥饿现象(某线程总是抢不过别的线程,导致始终无法执行)。

公平锁为了保证线程申请顺序,势必要付出一定的性能代价,因此其吞吐量一般低于非公平锁。

公平锁与非公平锁 在 Java 中的典型实现:

  • synchronized 只支持非公平锁
  • ReentrantLockReentrantReadWriteLock,默认是非公平锁,但支持公平锁

独占锁与共享锁

独占锁与共享锁是一种广义上的说法,从实际用途上来看,也常被称为互斥锁与读写锁。

  • 独占锁 - 独占锁是指 锁一次只能被一个线程所持有
  • 共享锁 - 共享锁是指 锁可被多个线程所持有

独占锁与共享锁在 Java 中的典型实现:

  • synchronizedReentrantLock 只支持独占锁
  • ReentrantReadWriteLock 其写锁是独占锁,其读锁是共享锁。读锁是共享锁使得并发读是非常高效的,读写,写读 ,写写的过程是互斥的。

悲观锁与乐观锁

乐观锁与悲观锁不是指具体的什么类型的锁,而是处理并发同步的策略

悲观锁(Pessimistic Lock)

  • 总是假设最坏的情况,认为:不加锁的并发操作一定会出问题
  • 悲观锁在 Java 中的应用就是通过使用 synchronizedLock 显示加锁来进行互斥同步,这是一种阻塞同步。
  • 悲观锁适合写操作频繁的场景。高并发的场景下,激烈的锁竞争会造成线程阻塞,大量阻塞线程会导致系统的上下文切换,增加系统的性能开销。并且,悲观锁还可能会存在死锁问题,影响代码的正常运行。

【示例】悲观锁示例

1
2
3
4
5
6
7
8
9
10
11
12
13
public void syncTask() {
synchronized (this) {
// 需要同步的操作
}
}

private Lock lock = new ReentrantLock();
lock.lock();
try {
// 需要同步的操作
} finally {
lock.unlock();
}

乐观锁(OptimisticLock)

  • 乐观锁总是假设最好的情况,认为:不加锁的并发操作也没什么问题。每次访问数据时,都假设数据不会被其他线程修改,不必加锁。虽然不加锁,但不意味着什么都不做,而是在更新的时候,判断一下在此期间是否有其他线程更新该数据。
  • 乐观锁最常见的实现方式,是使用版本号机制或 CAS 算法(Compare And Swap)去实现。Java 中的原子类就是基于 CAS 实现的。
  • 乐观锁的优点是:减少锁竞争,提高并发度。
  • 乐观锁的缺点是:
    • 存在 ABA 问题。所谓的 ABA 问题是指在并发编程中,如果一个变量初次读取的时候是 A 值,它的值被改成了 B,然后又其他线程把 B 值改成了 A,而另一个早期线程在对比值时会误以为此值没有发生改变,但其实已经发生变化了
    • 如果乐观锁所检查的数据存在大量锁竞争,会由于不断循环重试,产生大量的 CPU 开销
  • 乐观锁适合读多写少的场景。高并发的场景下,乐观锁相比悲观锁来说,不存在锁竞争造成线程阻塞,也不会有死锁的问题,在性能上往往会更胜一筹。但是,如果冲突频繁发生(写占比非常多的情况),会频繁失败和重试,这样同样会非常影响性能,导致 CPU 飙升。

【示例】乐观锁示例

1
2
3
4
5
6
7
8
9
10
// AtomicInteger 的 getAndAccumulate 方法采用了自旋 + CAS 的乐观锁模式
public final int getAndAccumulate(int x,
IntBinaryOperator accumulatorFunction) {
int prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSet(prev, next));
return prev;
}

乐观锁也是一种通用的锁机制,不仅在 Java 中,在其他很多软件领域,也存在乐观锁机制。比如下面的示例是 MySQL 中的乐观锁示例。

假设,order 表中有一个字段 status,表示订单状态:status 为 1 代表订单未支付;status 为 2 代表订单已支付。现在,要将 id 为 1 的订单状态置为已支付,则操作如下:

1
2
3
4
5
select status, version from order where id=#{id}

update order
set status=2, version=version+1
where id=#{id} and version=#{version};

偏向锁、轻量级锁、重量级锁

所谓轻量级锁与重量级锁,指的是锁控制粒度的粗细。显然,控制粒度越细,阻塞开销越小,并发性也就越高。

Java 1.6 以前,重量级锁一般指的是 synchronized ,而轻量级锁指的是 volatile

Java 1.6 以后,针对 synchronized 做了大量优化,引入 4 种锁状态: 无锁状态、偏向锁、轻量级锁和重量级锁。锁可以单向的从偏向锁升级到轻量级锁,再从轻量级锁升级到重量级锁 。

  • 偏向锁 - 偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。
  • 轻量级锁 - 是指当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。
  • 重量级锁 - 是指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让其他申请的线程进入阻塞,性能降低。

分段锁

分段锁其实是一种锁的设计,并不是具体的一种锁。所谓分段锁,就是把锁的对象分成多段,每段独立控制,使得锁粒度更细,减少阻塞开销,从而提高并发性。这其实很好理解,就像高速公路上的收费站,如果只有一个收费口,那所有的车只能排成一条队缴费;如果有多个收费口,就可以分流了。

Hashtable 使用 synchronized 修饰方法来保证线程安全性,那么面对线程的访问,Hashtable 就会锁住整个对象,所有的其它线程只能等待,这种阻塞方式的吞吐量显然很低。

Java 1.7 以前的 ConcurrentHashMap 就是分段锁的典型案例。ConcurrentHashMap 维护了一个 Segment 数组,一般称为分段桶。

1
final Segment<K,V>[] segments;

当有线程访问 ConcurrentHashMap 的数据时,ConcurrentHashMap 会先根据 hashCode 计算出数据在哪个桶(即哪个 Segment),然后锁住这个 Segment

内置锁和显示锁

Java 1.5 之前,协调对共享对象的访问时可以使用的机制只有 synchronizedvolatile。这两个都属于内置锁,即锁的申请和释放都是由 JVM 所控制。

Java 1.5 之后,增加了新的机制:ReentrantLockReentrantReadWriteLock ,这类锁的申请和释放都可以由程序所控制,所以常被称为显示锁。

💡 synchronized 的用法和原理可以参考:Java 并发基础机制 - synchronized

:bell: 注意:如果不需要 ReentrantLockReentrantReadWriteLock 所提供的高级同步特性,**应该优先考虑使用 synchronized**。理由如下:

  • Java 1.6 以后,synchronized 做了大量的优化,其性能已经与 ReentrantLockReentrantReadWriteLock 基本上持平。
  • 从趋势来看,Java 未来更可能会优化 synchronized ,而不是 ReentrantLockReentrantReadWriteLock ,因为 synchronized 是 JVM 内置属性,它能执行一些优化。
  • ReentrantLockReentrantReadWriteLock 申请和释放锁都是由程序控制,如果使用不当,可能造成死锁,这是很危险的。

以下对比一下显示锁和内置锁的差异:

  • 主动获取锁和释放锁
    • synchronized 不能主动获取锁和释放锁。获取锁和释放锁都是 JVM 控制的。
    • ReentrantLock 可以主动获取锁和释放锁。(如果忘记释放锁,就可能产生死锁)。
  • 响应中断
    • synchronized 不能响应中断。
    • ReentrantLock 可以响应中断。
  • 超时机制
    • synchronized 没有超时机制。
    • ReentrantLock 有超时机制。ReentrantLock 可以设置超时时间,超时后自动释放锁,避免一直等待。
  • 支持公平锁
    • synchronized 只支持非公平锁。
    • ReentrantLock 支持非公平锁和公平锁。
  • 是否支持共享
    • synchronized 修饰的方法或代码块,只能被一个线程访问(独享)。如果这个线程被阻塞,其他线程也只能等待
    • ReentrantLock 可以基于 Condition 灵活的控制同步条件。
  • 是否支持读写分离
    • synchronized 不支持读写锁分离;
    • ReentrantReadWriteLock 支持读写锁,从而使阻塞读写的操作分开,有效提高并发性。

Lock 和 Condition

为何引入 Lock 和 Condition

并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。这两大问题,管程都是能够解决的。Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题

synchronized 是管程的一种实现,既然如此,何必再提供 Lock 和 Condition。

JDK 1.6 以前,synchronized 还没有做优化,性能远低于 Lock。但是,性能不是引入 Lock 的最重要因素。真正关键在于:synchronized 使用不当,可能会出现死锁。synchronized 无法通过破坏不可抢占条件来避免死锁。原因是 synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。

与内置锁 synchronized 不同的是,**Lock 提供了一组无条件的、可轮询的、定时的以及可中断的锁操作**,所有获取锁、释放锁的操作都是显式的操作。

  • 能够响应中断。synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。
  • 支持超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
  • 非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。

Lock 接口

Lock 的接口定义如下:

1
2
3
4
5
6
7
8
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
  • lock() - 获取锁。
  • unlock() - 释放锁。
  • tryLock() - 尝试获取锁,仅在调用时锁未被另一个线程持有的情况下,才获取该锁。
  • tryLock(long time, TimeUnit unit) - 和 tryLock() 类似,区别仅在于限定时间,如果限定时间内未获取到锁,视为失败。
  • lockInterruptibly() - 锁未被另一个线程持有,且线程没有被中断的情况下,才能获取锁。
  • newCondition() - 返回一个绑定到 Lock 对象上的 Condition 实例。

Condition

Condition 实现了管程模型里面的条件变量

前文中提过 Lock 接口中 有一个 newCondition() 方法用于返回一个绑定到 Lock 对象上的 Condition 实例。Condition 是什么?有什么作用?本节将一一讲解。

在单线程中,一段代码的执行可能依赖于某个状态,如果不满足状态条件,代码就不会被执行(典型的场景,如:if ... else ...)。在并发环境中,当一个线程判断某个状态条件时,其状态可能是由于其他线程的操作而改变,这时就需要有一定的协调机制来确保在同一时刻,数据只能被一个线程锁修改,且修改的数据状态被所有线程所感知。

Java 1.5 之前,主要是利用 Object 类中的 waitnotifynotifyAll 配合 synchronized 来进行线程间通信。waitnotifynotifyAll 需要配合 synchronized 使用,不适用于 Lock。而使用 Lock 的线程,彼此间通信应该使用 Condition 。这可以理解为,什么样的锁配什么样的钥匙。内置锁(synchronized)配合内置条件队列(waitnotifynotifyAll ),显式锁(Lock)配合显式条件队列(Condition

Condition 的特性

Condition 接口定义如下:

1
2
3
4
5
6
7
8
9
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}

其中,awaitsignalsignalAllwaitnotifynotifyAll 相对应,功能也相似。除此以外,Condition 相比内置条件队列( waitnotifynotifyAll ),提供了更为丰富的功能:

  • 每个锁(Lock)上可以存在多个 Condition,这意味着锁的状态条件可以有多个。
  • 支持公平的或非公平的队列操作。
  • 支持可中断的条件等待,相关方法:awaitUninterruptibly()
  • 支持可定时的等待,相关方法:awaitNanos(long)await(long, TimeUnit)awaitUntil(Date)

Condition 的用法

这里以 Condition 来实现一个消费者、生产者模式。

:bell: 注意:事实上,解决此类问题使用 CountDownLatchSemaphore 等工具更为便捷、安全。想了解详情,可以参考 Java 并发工具类

产品类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class Message {

private final Lock lock = new ReentrantLock();

private final Condition producedMsg = lock.newCondition();

private final Condition consumedMsg = lock.newCondition();

private String message;

private boolean state;

private boolean end;

public void consume() {
//lock
lock.lock();
try {
// no new message wait for new message
while (!state) { producedMsg.await(); }

System.out.println("consume message : " + message);
state = false;
// message consumed, notify waiting thread
consumedMsg.signal();
} catch (InterruptedException ie) {
System.out.println("Thread interrupted - viewMessage");
} finally {
lock.unlock();
}
}

public void produce(String message) {
lock.lock();
try {
// last message not consumed, wait for it be consumed
while (state) { consumedMsg.await(); }

System.out.println("produce msg: " + message);
this.message = message;
state = true;
// new message added, notify waiting thread
producedMsg.signal();
} catch (InterruptedException ie) {
System.out.println("Thread interrupted - publishMessage");
} finally {
lock.unlock();
}
}

public boolean isEnd() {
return end;
}

public void setEnd(boolean end) {
this.end = end;
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class MessageConsumer implements Runnable {

private Message message;

public MessageConsumer(Message msg) {
message = msg;
}

@Override
public void run() {
while (!message.isEnd()) { message.consume(); }
}

}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class MessageProducer implements Runnable {

private Message message;

public MessageProducer(Message msg) {
message = msg;
}

@Override
public void run() {
produce();
}

public void produce() {
List<String> msgs = new ArrayList<>();
msgs.add("Begin");
msgs.add("Msg1");
msgs.add("Msg2");

for (String msg : msgs) {
message.produce(msg);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

message.produce("End");
message.setEnd(true);
}

}

测试

1
2
3
4
5
6
7
8
9
10
public class LockConditionDemo {

public static void main(String[] args) {
Message msg = new Message();
Thread producer = new Thread(new MessageProducer(msg));
Thread consumer = new Thread(new MessageConsumer(msg));
producer.start();
consumer.start();
}
}

ReentrantLock

ReentrantLock 类是 Lock 接口的具体实现,与内置锁 synchronized 相同的是,它是一个可重入锁

ReentrantLock 的特性如下:

  • ReentrantLock 提供了与 synchronized 相同的互斥性、内存可见性和可重入性
  • ReentrantLock 支持公平锁和非公平锁(默认)两种模式。
  • ReentrantLock 实现了 Lock 接口,支持了 synchronized 所不具备的灵活性,增加了轮询、超时、中断等功能。
    • synchronized 无法中断一个正在等待获取锁的线程
    • synchronized 无法在请求获取一个锁时无休止地等待

ReentrantLock 的用法

前文了解了 ReentrantLock 的特性,接下来,我们要讲述其具体用法。

ReentrantLock 的构造方法

ReentrantLock 有两个构造方法:

1
2
public ReentrantLock() {}
public ReentrantLock(boolean fair) {}
  • ReentrantLock() - 默认构造方法会初始化一个非公平锁(NonfairSync)
  • ReentrantLock(boolean) - new ReentrantLock(true) 会初始化一个公平锁(FairSync)

lock 和 unlock 方法

  • lock() - 无条件获取锁。如果当前线程无法获取锁,则当前线程进入休眠状态不可用,直至当前线程获取到锁。如果该锁没有被另一个线程持有,则获取该锁并立即返回,将锁的持有计数设置为 1。
  • unlock() - 用于释放锁

:bell: 注意:请务必牢记,获取锁操作 lock() 必须在 try catch 块中进行,并且将释放锁操作 unlock() 放在 finally 块中进行,以保证锁一定被被释放,防止死锁的发生

示例:ReentrantLock 的基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class ReentrantLockDemo {

public static void main(String[] args) {
Task task = new Task();
MyThread tA = new MyThread("Thread-A", task);
MyThread tB = new MyThread("Thread-B", task);
MyThread tC = new MyThread("Thread-C", task);
tA.start();
tB.start();
tC.start();
}

static class MyThread extends Thread {

private Task task;

public MyThread(String name, Task task) {
super(name);
this.task = task;
}

@Override
public void run() {
task.execute();
}

}

static class Task {

private ReentrantLock lock = new ReentrantLock();

public void execute() {
lock.lock();
try {
for (int i = 0; i < 3; i++) {
System.out.println(lock.toString());

// 查询当前线程 hold 住此锁的次数
System.out.println("\t holdCount: " + lock.getHoldCount());

// 查询正等待获取此锁的线程数
System.out.println("\t queuedLength: " + lock.getQueueLength());

// 是否为公平锁
System.out.println("\t isFair: " + lock.isFair());

// 是否被锁住
System.out.println("\t isLocked: " + lock.isLocked());

// 是否被当前线程持有锁
System.out.println("\t isHeldByCurrentThread: " + lock.isHeldByCurrentThread());

try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
lock.unlock();
}
}

}

}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-A]
holdCount: 1
queuedLength: 2
isFair: false
isLocked: true
isHeldByCurrentThread: true
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-C]
holdCount: 1
queuedLength: 1
isFair: false
isLocked: true
isHeldByCurrentThread: true
// ...

tryLock 方法

与无条件获取锁相比,tryLock 有更完善的容错机制。

  • tryLock() - 可轮询获取锁。如果成功,则返回 true;如果失败,则返回 false。也就是说,这个方法无论成败都会立即返回,获取不到锁(锁已被其他线程获取)时不会一直等待。
  • tryLock(long, TimeUnit) - 可定时获取锁。和 tryLock() 类似,区别仅在于这个方法在获取不到锁时会等待一定的时间,在时间期限之内如果还获取不到锁,就返回 false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回 true。

示例:ReentrantLocktryLock() 操作

修改上个示例中的 execute() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public void execute() {
if (lock.tryLock()) {
try {
for (int i = 0; i < 3; i++) {
// 略。..
}
} finally {
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " 获取锁失败");
}
}

示例:ReentrantLocktryLock(long, TimeUnit) 操作

修改上个示例中的 execute() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void execute() {
try {
if (lock.tryLock(2, TimeUnit.SECONDS)) {
try {
for (int i = 0; i < 3; i++) {
// 略。..
}
} finally {
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " 获取锁失败");
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 获取锁超时");
e.printStackTrace();
}
}

lockInterruptibly 方法

  • lockInterruptibly() - 可中断获取锁。可中断获取锁可以在获得锁的同时保持对中断的响应。可中断获取锁比其它获取锁的方式稍微复杂一些,需要两个 try-catch 块(如果在获取锁的操作中抛出了 InterruptedException ,那么可以使用标准的 try-finally 加锁模式)。
    • 举例来说:假设有两个线程同时通过 lock.lockInterruptibly() 获取某个锁时,若线程 A 获取到了锁,则线程 B 只能等待。若此时对线程 B 调用 threadB.interrupt() 方法能够中断线程 B 的等待过程。由于 lockInterruptibly() 的声明中抛出了异常,所以 lock.lockInterruptibly() 必须放在 try 块中或者在调用 lockInterruptibly() 的方法外声明抛出 InterruptedException

:bell: 注意:当一个线程获取了锁之后,是不会被 interrupt() 方法中断的。单独调用 interrupt() 方法不能中断正在运行状态中的线程,只能中断阻塞状态中的线程。因此当通过 lockInterruptibly() 方法获取某个锁时,如果未获取到锁,只有在等待的状态下,才可以响应中断。

示例:ReentrantLocklockInterruptibly() 操作

修改上个示例中的 execute() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void execute() {
try {
lock.lockInterruptibly();

for (int i = 0; i < 3; i++) {
// 略。..
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断");
e.printStackTrace();
} finally {
lock.unlock();
}
}

newCondition 方法

newCondition() - 返回一个绑定到 Lock 对象上的 Condition 实例。Condition 的特性和具体方法请阅读下文 [Condition](#五 condition)。

ReentrantLock 的原理

ReentrantLock 的可见性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class X {
private final Lock rtl =
new ReentrantLock();
int value;
public void addOne() {
// 获取锁
rtl.lock();
try {
value+=1;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}

ReentrantLock,内部持有一个 volatile 的成员变量 state,获取锁的时候,会读写 state 的值;解锁的时候,也会读写 state 的值(简化后的代码如下面所示)。也就是说,在执行 value+=1 之前,程序先读写了一次 volatile 变量 state,在执行 value+=1 之后,又读写了一次 volatile 变量 state。根据相关的 Happens-Before 规则:

  1. 顺序性规则:对于线程 T1,value+=1 Happens-Before 释放锁的操作 unlock();
  2. volatile 变量规则:由于 state = 1 会先读取 state,所以线程 T1 的 unlock() 操作 Happens-Before 线程 T2 的 lock() 操作;
  3. 传递性规则:线程 T1 的 value+=1 Happens-Before 线程 T2 的 lock() 操作。

ReentrantLock 的数据结构

阅读 ReentrantLock 的源码,可以发现它有一个核心字段:

1
private final Sync sync;
  • sync - 内部抽象类 ReentrantLock.Sync 对象,Sync 继承自 AQS。它有两个子类:
  • ReentrantLock.FairSync - 公平锁。
  • ReentrantLock.NonfairSync - 非公平锁。

查看源码可以发现,ReentrantLock 实现 Lock 接口其实是调用 ReentrantLock.FairSyncReentrantLock.NonfairSync 中各自的实现,这里不一一列举。

ReentrantLock 的获取锁和释放锁

ReentrantLock 获取锁和释放锁的接口,从表象看,是调用 ReentrantLock.FairSyncReentrantLock.NonfairSync 中各自的实现;从本质上看,是基于 AQS 的实现。

仔细阅读源码很容易发现:

  • void lock() 调用 Sync 的 lock() 方法。

  • void lockInterruptibly() 直接调用 AQS 的 [获取可中断的独占锁](#获取可中断的独占锁) 方法 lockInterruptibly()

  • boolean tryLock() 调用 Sync 的 nonfairTryAcquire()

  • boolean tryLock(long time, TimeUnit unit) 直接调用 AQS 的 [获取超时等待式的独占锁](#获取超时等待式的独占锁) 方法 tryAcquireNanos(int arg, long nanosTimeout)

  • void unlock() 直接调用 AQS 的 [释放独占锁](#释放独占锁) 方法 release(int arg)

直接调用 AQS 接口的方法就不再赘述了,其原理在 [AQS 的原理](#AQS 的原理) 中已经用很大篇幅进行过讲解。

nonfairTryAcquire 方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 公平锁和非公平锁都会用这个方法区尝试获取锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
// 如果同步状态为 0,将其设为 acquires,并设置当前线程为排它线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

处理流程很简单:

  • 如果同步状态为 0,设置同步状态设为 acquires,并设置当前线程为排它线程,然后返回 true,获取锁成功。
  • 如果同步状态不为 0 且当前线程为排它线程,设置同步状态为当前状态值+acquires 值,然后返回 true,获取锁成功。
  • 否则,返回 false,获取锁失败。

公平锁和非公平锁

ReentrantLock 这个类有两个构造函数,一个是无参构造函数,一个是传入 fair 参数的构造函数。fair 参数代表的是锁的公平策略,如果传入 true 就表示需要构造一个公平锁,反之则表示要构造一个非公平锁。

锁都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒。

lock 方法在公平锁和非公平锁中的实现:

二者的区别仅在于申请非公平锁时,如果同步状态为 0,尝试将其设为 1,如果成功,直接将当前线程置为排它线程;否则和公平锁一样,调用 AQS 获取独占锁方法 acquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 非公平锁实现
final void lock() {
if (compareAndSetState(0, 1))
// 如果同步状态为 0,将其设为 1,并设置当前线程为排它线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 调用 AQS 获取独占锁方法 acquire
acquire(1);
}

// 公平锁实现
final void lock() {
// 调用 AQS 获取独占锁方法 acquire
acquire(1);
}

ReentrantReadWriteLock

ReadWriteLock 适用于读多写少的场景

ReentrantReadWriteLock 类是 ReadWriteLock 接口的具体实现,它是一个可重入的读写锁。ReentrantReadWriteLock 维护了一对读写锁,将读写锁分开,有利于提高并发效率。

读写锁,并不是 Java 语言特有的,而是一个广为使用的通用技术,所有的读写锁都遵守以下三条基本原则:

  • 允许多个线程同时读共享变量;
  • 只允许一个线程写共享变量;
  • 如果一个写线程正在执行写操作,此时禁止读线程读共享变量。

读写锁与互斥锁的一个重要区别就是读写锁允许多个线程同时读共享变量,而互斥锁是不允许的,这是读写锁在读多写少场景下性能优于互斥锁的关键。但读写锁的写操作是互斥的,当一个线程在写共享变量的时候,是不允许其他线程执行写操作和读操作。

ReentrantReadWriteLock 的特性

ReentrantReadWriteLock 的特性如下:

  • ReentrantReadWriteLock 适用于读多写少的场景。如果是写多读少的场景,由于 ReentrantReadWriteLock 其内部实现比 ReentrantLock 复杂,性能可能反而要差一些。如果存在这样的问题,需要具体问题具体分析。由于 ReentrantReadWriteLock 的读写锁(ReadLockWriteLock)都实现了 Lock 接口,所以要替换为 ReentrantLock 也较为容易。
  • ReentrantReadWriteLock 实现了 ReadWriteLock 接口,支持了 ReentrantLock 所不具备的读写锁分离。ReentrantReadWriteLock 维护了一对读写锁(ReadLockWriteLock)。将读写锁分开,有利于提高并发效率。ReentrantReadWriteLock 的加锁策略是:允许多个读操作并发执行,但每次只允许一个写操作
  • ReentrantReadWriteLock 为读写锁都提供了可重入的加锁语义。
  • ReentrantReadWriteLock 支持公平锁和非公平锁(默认)两种模式。

ReadWriteLock 接口定义如下:

1
2
3
4
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
  • readLock - 返回用于读操作的锁(ReadLock)。
  • writeLock - 返回用于写操作的锁(WriteLock)。

在读写锁和写入锁之间的交互可以采用多种实现方式,ReadWriteLock 的一些可选实现包括:

  • 释放优先 - 当一个写入操作释放写锁,并且队列中同时存在读线程和写线程,那么应该优先选择读线程、写线程,还是最先发出请求的线程?
  • 读线程插队 - 如果锁是由读线程持有,但有写线程正在等待,那么新到达的读线程能否立即获得访问权,还是应该在写线程后面等待?如果允许读线程插队到写线程之前,那么将提高并发性,但可能造成线程饥饿问题。
  • 重入性 - 读锁和写锁是否是可重入的?
  • 降级 - 如果一个线程持有写入锁,那么它能否在不释放该锁的情况下获得读锁?这可能会使得写锁被降级为读锁,同时不允许其他写线程修改被保护的资源。
  • 升级 - 读锁能否优先于其他正在等待的读线程和写线程而升级为一个写锁?在大多数的读写锁实现中并不支持升级,因为如果没有显式的升级操作,那么很容易造成死锁。

ReentrantReadWriteLock 的用法

前文了解了 ReentrantReadWriteLock 的特性,接下来,我们要讲述其具体用法。

ReentrantReadWriteLock 的构造方法

ReentrantReadWriteLockReentrantLock 一样,也有两个构造方法,且用法相似。

1
2
public ReentrantReadWriteLock() {}
public ReentrantReadWriteLock(boolean fair) {}
  • ReentrantReadWriteLock() - 默认构造方法会初始化一个非公平锁(NonfairSync)。在非公平的锁中,线程获得锁的顺序是不确定的。写线程降级为读线程是可以的,但读线程升级为写线程是不可以的(这样会导致死锁)。
  • ReentrantReadWriteLock(boolean) - new ReentrantLock(true) 会初始化一个公平锁(FairSync)。对于公平锁,等待时间最长的线程将优先获得锁。如果这个锁是读线程持有,则另一个线程请求写锁,那么其他读线程都不能获得读锁,直到写线程释放写锁。

ReentrantReadWriteLock 的使用实例

在 [ReentrantReadWriteLock 的特性](#reentrantreadwritelock-的特性) 中已经介绍过,ReentrantReadWriteLock 的读写锁(ReadLockWriteLock) 都实现了 Lock 接口,所以其各自独立的使用方式与 ReentrantLock 一样,这里不再赘述。

ReentrantReadWriteLockReentrantLock 用法上的差异,主要在于读写锁的配合使用。本文以一个典型使用场景来进行讲解。

【示例】基于 ReadWriteLock 实现一个简单的泛型无界缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* 简单的无界缓存实现
* <p>
* 使用 WeakHashMap 存储键值对。WeakHashMap 中存储的对象是弱引用,JVM GC 时会自动清除没有被引用的弱引用对象。
*/
static class UnboundedCache<K, V> {

private final Map<K, V> cacheMap = new WeakHashMap<>();

private final ReadWriteLock cacheLock = new ReentrantReadWriteLock();

public V get(K key) {
cacheLock.readLock().lock();
V value;
try {
value = cacheMap.get(key);
String log = String.format("%s 读数据 %s:%s", Thread.currentThread().getName(), key, value);
System.out.println(log);
} finally {
cacheLock.readLock().unlock();
}
return value;
}

public V put(K key, V value) {
cacheLock.writeLock().lock();
try {
cacheMap.put(key, value);
String log = String.format("%s 写入数据 %s:%s", Thread.currentThread().getName(), key, value);
System.out.println(log);
} finally {
cacheLock.writeLock().unlock();
}
return value;
}

public V remove(K key) {
cacheLock.writeLock().lock();
try {
return cacheMap.remove(key);
} finally {
cacheLock.writeLock().unlock();
}
}

public void clear() {
cacheLock.writeLock().lock();
try {
this.cacheMap.clear();
} finally {
cacheLock.writeLock().unlock();
}
}

}

说明:

  • 使用 WeakHashMap 而不是 HashMap 来存储键值对。WeakHashMap 中存储的对象是弱引用,JVM GC 时会自动清除没有被引用的弱引用对象。
  • Map 写数据前加写锁,写完后,释放写锁。
  • Map 读数据前加读锁,读完后,释放读锁。

测试其线程安全性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @since 2020-01-01
*/
public class ReentrantReadWriteLockDemo {

static UnboundedCache<Integer, Integer> cache = new UnboundedCache<>();

public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
executorService.execute(new MyThread());
cache.get(0);
}
executorService.shutdown();
}

/** 线程任务每次向缓存中写入 3 个随机值,key 固定 */
static class MyThread implements Runnable {

@Override
public void run() {
Random random = new Random();
for (int i = 0; i < 3; i++) {
cache.put(i, random.nextInt(100));
}
}

}

}

说明:示例中,通过线程池启动 20 个并发任务。任务每次向缓存中写入 3 个随机值,key 固定;然后主线程每次固定读取缓存中第一个 key 的值。

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
main 读数据 0:null
pool-1-thread-1 写入数据 0:16
pool-1-thread-1 写入数据 1:58
pool-1-thread-1 写入数据 2:50
main 读数据 0:16
pool-1-thread-1 写入数据 0:85
pool-1-thread-1 写入数据 1:76
pool-1-thread-1 写入数据 2:46
pool-1-thread-2 写入数据 0:21
pool-1-thread-2 写入数据 1:41
pool-1-thread-2 写入数据 2:63
main 读数据 0:21
// ...

ReentrantReadWriteLock 的原理

前面了解了 ReentrantLock 的原理,理解 ReentrantReadWriteLock 就容易多了。

ReentrantReadWriteLock 的数据结构

阅读 ReentrantReadWriteLock 的源码,可以发现它有三个核心字段:

1
2
3
4
5
6
7
8
9
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
  • sync - 内部类 ReentrantReadWriteLock.Sync 对象。与 ReentrantLock 类似,它有两个子类:ReentrantReadWriteLock.FairSyncReentrantReadWriteLock.NonfairSync ,分别表示公平锁和非公平锁的实现。
  • readerLock - 内部类 ReentrantReadWriteLock.ReadLock 对象,这是一把读锁。
  • writerLock - 内部类 ReentrantReadWriteLock.WriteLock 对象,这是一把写锁。

ReentrantReadWriteLock 的获取锁和释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static class ReadLock implements Lock, java.io.Serializable {

// 调用 AQS 获取共享锁方法
public void lock() {
sync.acquireShared(1);
}

// 调用 AQS 释放共享锁方法
public void unlock() {
sync.releaseShared(1);
}
}

public static class WriteLock implements Lock, java.io.Serializable {

// 调用 AQS 获取独占锁方法
public void lock() {
sync.acquire(1);
}

// 调用 AQS 释放独占锁方法
public void unlock() {
sync.release(1);
}
}

StampedLock

ReadWriteLock 支持两种模式:一种是读锁,一种是写锁。而 StampedLock 支持三种模式,分别是:写锁悲观读锁乐观读。其中,写锁、悲观读锁的语义和 ReadWriteLock 的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的。不同的是:StampedLock 里的写锁和悲观读锁加锁成功之后,都会返回一个 stamp;然后解锁的时候,需要传入这个 stamp。

注意这里,用的是“乐观读”这个词,而不是“乐观读锁”,是要提醒你,乐观读这个操作是无锁的,所以相比较 ReadWriteLock 的读锁,乐观读的性能更好一些。

StampedLock 的性能之所以比 ReadWriteLock 还要好,其关键是 StampedLock 支持乐观读的方式。

  • ReadWriteLock 支持多个线程同时读,但是当多个线程同时读的时候,所有的写操作会被阻塞;
  • 而 StampedLock 提供的乐观读,是允许一个线程获取写锁的,也就是说不是所有的写操作都被阻塞。

对于读多写少的场景 StampedLock 性能很好,简单的应用场景基本上可以替代 ReadWriteLock,但是,StampedLock 的功能仅仅是 ReadWriteLock 的子集,在使用的时候,还是有几个地方需要注意一下。

  • StampedLock 不支持重入
  • StampedLock 的悲观读锁、写锁都不支持条件变量。
  • 如果线程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上时,此时调用该阻塞线程的 interrupt() 方法,会导致 CPU 飙升。**使用 StampedLock 一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly()**。

【示例】StampedLock 阻塞时,调用 interrupt() 导致 CPU 飙升

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final StampedLock lock = new StampedLock();
Thread T1 = new Thread(() -> {
// 获取写锁
lock.writeLock();
// 永远阻塞在此处,不释放写锁
LockSupport.park();
});
T1.start();
// 保证 T1 获取写锁
Thread.sleep(100);
Thread T2 = new Thread(() ->
// 阻塞在悲观读锁
lock.readLock()
);
T2.start();
// 保证 T2 阻塞在读锁
Thread.sleep(100);
// 中断线程 T2
// 会导致线程 T2 所在 CPU 飙升
T2.interrupt();
T2.join();

【示例】StampedLock 读模板:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final StampedLock sl = new StampedLock();

// 乐观读
long stamp = sl.tryOptimisticRead();
// 读入方法局部变量
// ......
// 校验 stamp
if (!sl.validate(stamp)) {
// 升级为悲观读锁
stamp = sl.readLock();
try {
// 读入方法局部变量
// .....
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}
// 使用方法局部变量执行业务操作
// ......

【示例】StampedLock 写模板:

1
2
3
4
5
6
7
long stamp = sl.writeLock();
try {
// 写共享变量
......
} finally {
sl.unlockWrite(stamp);
}

参考资料

Java 并发之无锁

并发安全需要保证几个基本特性:

  • 可见性 - 是一个线程修改了某个共享变量,其状态能够立即被其他线程知晓,通常被解释为将线程本地状态反映到主内存上,volatile 就是负责保证可见性的。
  • 有序性 - 是保证线程内串行语义,避免指令重排等。
  • 原子性 - 简单说就是相关操作不会中途被其他线程干扰,一般通过互斥机制(加锁:sychronizedLock)实现。

互斥同步是最常见的原子性保障手段。互斥同步最主要的问题是线程阻塞和唤醒所带来的性能问题。因此,互斥同步也被称为阻塞同步。互斥同步属于一种悲观的并发策略,总是认为只要不去做正确的同步措施,那就肯定会出现问题。无论共享数据是否真的会出现竞争,它都要进行加锁(这里讨论的是概念模型,实际上虚拟机会优化掉很大一部分不必要的加锁)、用户态核心态转换、维护锁计数器和检查是否有被阻塞的线程需要唤醒等操作。

解决并发安全问题,还可以采用无锁方案。无锁方案相对互斥锁方案,最大的好处就是性能。互斥锁方案为了保证互斥性,需要执行加锁、解锁操作,而加锁、解锁操作本身就消耗性能;同时拿不到锁的线程还会进入阻塞状态,进而触发线程切换,线程切换对性能的消耗也很大。 相比之下,无锁方案则完全没有加锁、解锁的性能消耗,同时还能保证互斥性。

Java 中的无锁技术有:

  • CAS
  • 原子类
  • ThreadLocal
  • Copy-on-Write
  • 不变模式

CAS

CAS 的要点

CAS(Compare and Swap),字面意思为比较并交换。

CAS 涉及三个操作数:

  • V:需要读写的内存位置
  • A:进行比较的值
  • B:拟写入的新值

当且仅当 V 的值等于 A 时,才会通过原子方式用新值 B 来更新 A 的值,否则什么都不做

CAS 实际是乐观锁的一种实现方式,因此,CAS 只适用于线程冲突较少的情况

CAS 的应用

CAS 的典型应用场景是:

  • 原子类
  • 自旋锁

原子类

原子类是 CAS 在 Java 中最典型的应用。

我们先来看一个常见的代码片段。

1
2
3
if(a==b) {
a++;
}

如果 a++ 执行前, a 的值被修改了怎么办?还能得到预期值吗?出现该问题的原因是在并发环境下,以上代码片段不是原子操作,随时可能被其他线程所篡改。

解决这种问题的最经典方式是应用原子类的 incrementAndGet 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class AtomicIntegerDemo {

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
final AtomicInteger count = new AtomicInteger(0);
for (int i = 0; i < 10; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
count.incrementAndGet();
}
});
}

executorService.shutdown();
executorService.awaitTermination(3, TimeUnit.SECONDS);
System.out.println("Final Count is : " + count.get());
}

}

J.U.C 包中提供了 AtomicBooleanAtomicIntegerAtomicLong 分别针对 BooleanIntegerLong 执行原子操作,操作和上面的示例大体相似,不做赘述。

自旋锁

利用原子类(本质上是 CAS),可以实现自旋锁。

所谓自旋锁,是指线程反复检查锁变量是否可用,直到成功为止。由于线程在这一过程中保持执行,因此是一种忙等待。一旦获取了自旋锁,线程会一直保持该锁,直至显式释放自旋锁。

示例:非线程安全示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class AtomicReferenceDemo {

private static int ticket = 10;

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executorService.execute(new MyThread());
}
executorService.shutdown();
}

static class MyThread implements Runnable {

@Override
public void run() {
while (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}
}

}

}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
pool-1-thread-2 卖出了第 10 张票
pool-1-thread-1 卖出了第 10 张票
pool-1-thread-3 卖出了第 10 张票
pool-1-thread-1 卖出了第 8 张票
pool-1-thread-2 卖出了第 9 张票
pool-1-thread-1 卖出了第 6 张票
pool-1-thread-3 卖出了第 7 张票
pool-1-thread-1 卖出了第 4 张票
pool-1-thread-2 卖出了第 5 张票
pool-1-thread-1 卖出了第 2 张票
pool-1-thread-3 卖出了第 3 张票
pool-1-thread-2 卖出了第 1 张票

很明显,出现了重复售票的情况。

【示例】使用自旋锁来保证线程安全

可以通过自旋锁这种非阻塞同步来保证线程安全,下面使用 AtomicReference 来实现一个自旋锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class AtomicReferenceDemo2 {

private static int ticket = 10;

public static void main(String[] args) {
threadSafeDemo();
}

private static void threadSafeDemo() {
SpinLock lock = new SpinLock();
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executorService.execute(new MyThread(lock));
}
executorService.shutdown();
}

static class SpinLock {

private AtomicReference<Thread> atomicReference = new AtomicReference<>();

public void lock() {
Thread current = Thread.currentThread();
while (!atomicReference.compareAndSet(null, current)) {}
}

public void unlock() {
Thread current = Thread.currentThread();
atomicReference.compareAndSet(current, null);
}

}

static class MyThread implements Runnable {

private SpinLock lock;

public MyThread(SpinLock lock) {
this.lock = lock;
}

@Override
public void run() {
while (ticket > 0) {
lock.lock();
if (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}
lock.unlock();
}
}

}

}

输出结果:

1
2
3
4
5
6
7
8
9
10
pool-1-thread-2 卖出了第 10 张票
pool-1-thread-1 卖出了第 9 张票
pool-1-thread-3 卖出了第 8 张票
pool-1-thread-2 卖出了第 7 张票
pool-1-thread-3 卖出了第 6 张票
pool-1-thread-1 卖出了第 5 张票
pool-1-thread-2 卖出了第 4 张票
pool-1-thread-1 卖出了第 3 张票
pool-1-thread-3 卖出了第 2 张票
pool-1-thread-1 卖出了第 1 张票

CAS 的原理

在 Java 中,主要利用 Unsafe 这个类实现 CAS

Unsafe 类位于 sun.misc 包下,是一个提供低级别、不安全操作的类。由于其强大的功能和潜在的危险性,它通常用于 JVM 内部或一些需要极高性能和底层访问的库中,而不推荐普通开发者在应用程序中使用。

Unsafe 类提供了 compareAndSwapObjectcompareAndSwapIntcompareAndSwapLong方法来实现的对 Objectintlong 类型的 CAS 操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 以原子方式更新对象字段的值。
*/
boolean compareAndSwapObject(Object o, long offset, Object expected, Object x);

/**
* 以原子方式更新 int 类型的对象字段的值。
*/
boolean compareAndSwapInt(Object o, long offset, int expected, int x);

/**
* 以原子方式更新 long 类型的对象字段的值。
*/
boolean compareAndSwapLong(Object o, long offset, long expected, long x);

Unsafe 类中的 CAS 方法是 native 方法。native 关键字表明这些方法是用本地代码(通常是 C 或 C++)实现的,而不是用 Java 实现的。这些方法直接调用底层的、具有原子性的 CPU 指令来实现。

由于 CAS 操作可能会因为并发冲突而失败,因此通常会伴随着自旋,而所谓自旋,其实就是循环尝试。

Unsafe#getAndAddInt 源码:

1
2
3
4
5
6
7
8
9
10
// 原子地获取并增加整数值
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
// 以 volatile 方式获取对象 o 在内存偏移量 offset 处的整数值
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
// 返回旧值
return v;
}

CAS 的问题

一般情况下,CAS 比锁性能更高。因为 CAS 是一种非阻塞算法,所以其避免了线程阻塞和唤醒的等待时间。但是,事物总会有利有弊,CAS 也存在三大问题:

  • ABA 问题
  • 循环时间长开销大
  • 只能保证一个共享变量的原子性

ABA 问题

如果一个变量初次读取的时候是 A 值,它的值被改成了 B,后来又被改回为 A,那 CAS 操作就会误认为它从来没有被改变过,这就是 ABA 问题

ABA 问题的解决思路是在变量前面追加上版本号或者时间戳。J.U.C 包提供了一个带有标记的原子引用类 AtomicStampedReference 来解决这个问题,它可以通过控制变量值的版本来保证 CAS 的正确性。大部分情况下 ABA 问题不会影响程序并发的正确性,如果需要解决 ABA 问题,改用传统的互斥同步可能会比原子类更高效

循环时间长开销大

自旋 CAS (不断尝试,直到成功为止)如果长时间不成功,会给 CPU 带来非常大的执行开销

如果 JVM 能支持处理器提供的 pause 指令那么效率会有一定的提升,pause 指令有两个作用:

  • 它可以延迟流水线执行指令(de-pipeline), 使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。
  • 它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起 CPU 流水线被清空(CPU pipeline flush),从而提高 CPU 的执行效率。

比较花费 CPU 资源,即使没有任何用也会做一些无用功。

只能保证一个共享变量的原子性

当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就可以用锁。

或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量 i = 2, j = a,合并一下 ij=2a,然后用 CAS 来操作 ij。从 Java 1.5 开始 JDK 提供了 AtomicReference 类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行 CAS 操作。

原子类

原子类简介

原子性是确保并发安全三大特性之一。为了兼顾原子性以及锁带来的性能问题,Java 引入了 CAS (主要体现在 Unsafe 类)来实现非阻塞同步(也叫乐观锁),CAS 底层基于 CPU 指令(硬件支持)支持,具有原子性。并基于 CAS ,提供了一套原子工具类。

原子类比锁的粒度更细,更轻量级,并且对于在多处理器系统上实现高性能的并发代码来说是非常关键的。原子变量将发生竞争的范围缩小到单个变量上。

原子类相当于一种泛化的 volatile 变量,能够支持原子的、有条件的读/改/写操作。

原子类可以分为 5 个类别,这 5 个类别提供的方法基本上是相似的:

  • 基本数据类型
    • AtomicBoolean - 布尔类型原子类
    • AtomicInteger - 整型原子类
    • AtomicLong - 长整型原子类
  • 引用数据类型
    • AtomicReference - 引用类型原子类
    • AtomicMarkableReference - 带有标记位的引用类型原子类
    • AtomicStampedReference - 带有版本号的引用类型原子类
  • 数组数据类型
    • AtomicIntegerArray - 整形数组原子类
    • AtomicLongArray - 长整型数组原子类
    • AtomicReferenceArray - 引用类型数组原子类
  • 属性更新器类型
    • AtomicIntegerFieldUpdater - 整型字段的原子更新器
    • AtomicLongFieldUpdater - 长整型字段的原子更新器
    • AtomicReferenceFieldUpdater - 原子更新引用类型里的字段
  • 累加器
    • DoubleAdder - 浮点型原子累加器
    • LongAdder - 长整型原子累加器
    • DoubleAccumulator - 更复杂的浮点型原子累加器
    • LongAccumulator - 更复杂的长整型原子累加器

原子类之基本数据类型

基本数据类型原子类针对 Java 基本类型提供原子操作

  • AtomicBoolean - 布尔类型原子类
  • AtomicInteger - 整型原子类
  • AtomicLong - 长整型原子类

以上类都支持 CAS(compare-and-swap)技术,此外,AtomicIntegerAtomicLong 还支持算术运算。

:bulb: 提示:

虽然 Java 只提供了 AtomicBooleanAtomicIntegerAtomicLong,但是可以模拟其他基本类型的原子变量。要想模拟其他基本类型的原子变量,可以将 shortbyte 等类型与 int 类型进行转换,以及使用 Float.floatToIntBitsDouble.doubleToLongBits 来转换浮点数。

由于 AtomicBooleanAtomicIntegerAtomicLong 实现方式、使用方式都相近,所以本文仅针对 AtomicInteger 进行介绍。

AtomicInteger 用法

1
2
3
4
5
6
7
public final int get() // 获取当前值
public final int getAndSet(int newValue) // 获取当前值,并设置新值
public final int getAndIncrement()// 获取当前值,并自增
public final int getAndDecrement() // 获取当前值,并自减
public final int getAndAdd(int delta) // 获取当前值,并加上预期值
boolean compareAndSet(int expect, int update) // 如果输入值(update)等于预期值,将该值设置为输入值
public final void lazySet(int newValue) // 最终设置为 newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

AtomicInteger 使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class AtomicIntegerDemo {

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
AtomicInteger count = new AtomicInteger(0);
for (int i = 0; i < 1000; i++) {
executorService.submit((Runnable) () -> {
System.out.println(Thread.currentThread().getName() + " count=" + count.get());
count.incrementAndGet();
});
}

executorService.shutdown();
executorService.awaitTermination(30, TimeUnit.SECONDS);
System.out.println("Final Count is : " + count.get());
}
}

AtomicInteger 实现

阅读 AtomicInteger 源码,可以看到如下定义:

1
2
3
4
5
6
7
8
9
10
11
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

private volatile int value;

说明:

  • value - value 属性使用 volatile 修饰,使得对 value 的修改在并发环境下对所有线程可见。
  • valueOffset - value 属性的偏移量,通过这个偏移量可以快速定位到 value 字段,这个是实现 AtomicInteger 的关键。
  • unsafe - Unsafe 类型的属性,它为 AtomicInteger 提供了 CAS 操作。

原子类之引用数据类型

Java 数据类型分为 基本数据类型引用数据类型 两大类(不了解 Java 数据类型划分可以参考: Java 基本数据类型 )。

上一节中提到了针对基本数据类型的原子类,那么如果想针对引用类型做原子操作怎么办?Java 也提供了相关的原子类:

  • AtomicReference - 引用类型原子类
  • AtomicMarkableReference - 带有标记位的引用类型原子类
  • AtomicStampedReference - 带有版本号的引用类型原子类

AtomicStampedReference 类在引用类型原子类中,彻底地解决了 ABA 问题,其它的 CAS 能力与另外两个类相近,所以最具代表性。因此,本节只针对 AtomicStampedReference 进行说明。

::: tabs#原子类之引用类型示例

@tab AtomicReference 使用示例

【示例】基于 AtomicReference 实现一个简单的自旋锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class AtomicReferenceDemo2 {

private static int ticket = 10;

public static void main(String[] args) {
threadSafeDemo();
}

private static void threadSafeDemo() {
SpinLock lock = new SpinLock();
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executorService.execute(new MyThread(lock));
}
executorService.shutdown();
}

/**
* 基于 {@link AtomicReference} 实现的简单自旋锁
*/
static class SpinLock {

private AtomicReference<Thread> atomicReference = new AtomicReference<>();

public void lock() {
Thread current = Thread.currentThread();
while (!atomicReference.compareAndSet(null, current)) {}
}

public void unlock() {
Thread current = Thread.currentThread();
atomicReference.compareAndSet(current, null);
}

}

/**
* 利用自旋锁 {@link SpinLock} 并发处理数据
*/
static class MyThread implements Runnable {

private SpinLock lock;

public MyThread(SpinLock lock) {
this.lock = lock;
}

@Override
public void run() {
while (ticket > 0) {
lock.lock();
if (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}
lock.unlock();
}
}

}

}

@tab AtomicMarkableReference 使用示例

【示例】AtomicMarkableReference 使用示例(解决 ABA 问题)

原子类的实现基于 CAS 机制,而 CAS 存在 ABA 问题(不了解 ABA 问题,可以参考:Java 并发基础机制 - CAS 的问题)。正是为了解决 ABA 问题,才有了 AtomicMarkableReferenceAtomicStampedReference

AtomicMarkableReference 使用一个布尔值作为标记,修改时在 true / false 之间切换。这种策略不能根本上解决 ABA 问题,但是可以降低 ABA 发生的几率。常用于缓存或者状态描述这样的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class AtomicMarkableReferenceDemo {

private final static String INIT_TEXT = "abc";

public static void main(String[] args) throws InterruptedException {

final AtomicMarkableReference<String> amr = new AtomicMarkableReference<>(INIT_TEXT, false);

ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(Math.abs((int) (Math.random() * 100)));
} catch (InterruptedException e) {
e.printStackTrace();
}

String name = Thread.currentThread().getName();
if (amr.compareAndSet(INIT_TEXT, name, amr.isMarked(), !amr.isMarked())) {
System.out.println(Thread.currentThread().getName() + " 修改了对象!");
System.out.println("新的对象为:" + amr.getReference());
}
}
});
}

executorService.shutdown();
executorService.awaitTermination(3, TimeUnit.SECONDS);
}

}

@tab AtomicStampedReference 使用示例

【示例】AtomicStampedReference 使用示例

AtomicStampedReference 使用一个整型值做为版本号,每次更新前先比较版本号,如果一致,才进行修改。通过这种策略,可以根本上解决 ABA 问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class AtomicStampedReferenceDemo {

private final static String INIT_REF = "pool-1-thread-3";

private final static AtomicStampedReference<String> asr = new AtomicStampedReference<>(INIT_REF, 0);

public static void main(String[] args) throws InterruptedException {

System.out.println("初始对象为:" + asr.getReference());

ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
executorService.execute(new MyThread());
}

executorService.shutdown();
executorService.awaitTermination(3, TimeUnit.SECONDS);
}

static class MyThread implements Runnable {

@Override
public void run() {
try {
Thread.sleep(Math.abs((int) (Math.random() * 100)));
} catch (InterruptedException e) {
e.printStackTrace();
}

final int stamp = asr.getStamp();
if (asr.compareAndSet(INIT_REF, Thread.currentThread().getName(), stamp, stamp + 1)) {
System.out.println(Thread.currentThread().getName() + " 修改了对象!");
System.out.println("新的对象为:" + asr.getReference());
}
}

}

}

:::

原子类之数组数据类型

Java 提供了以下针对数组的原子类:

  • AtomicIntegerArray - 整形数组原子类
  • AtomicLongArray - 长整型数组原子类
  • AtomicReferenceArray - 引用类型数组原子类

已经有了针对基本类型和引用类型的原子类,为什么还要提供针对数组的原子类呢?

数组类型的原子类为数组元素提供了 volatile 类型的访问语义,这是普通数组所不具备的特性——**volatile 类型的数组仅在数组引用上具有 volatile 语义**。

【示例】AtomicIntegerArray 使用示例(AtomicLongArrayAtomicReferenceArray 使用方式也类似)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class AtomicIntegerArrayDemo {

private static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);

public static void main(final String[] arguments) throws InterruptedException {

System.out.println("Init Values: ");
for (int i = 0; i < atomicIntegerArray.length(); i++) {
atomicIntegerArray.set(i, i);
System.out.print(atomicIntegerArray.get(i) + " ");
}
System.out.println();

Thread t1 = new Thread(new Increment());
Thread t2 = new Thread(new Compare());
t1.start();
t2.start();

t1.join();
t2.join();

System.out.println("Final Values: ");
for (int i = 0; i < atomicIntegerArray.length(); i++) {
System.out.print(atomicIntegerArray.get(i) + " ");
}
System.out.println();
}

static class Increment implements Runnable {

@Override
public void run() {

for (int i = 0; i < atomicIntegerArray.length(); i++) {
int value = atomicIntegerArray.incrementAndGet(i);
System.out.println(Thread.currentThread().getName() + ", index = " + i + ", value = " + value);
}
}

}

static class Compare implements Runnable {

@Override
public void run() {
for (int i = 0; i < atomicIntegerArray.length(); i++) {
boolean swapped = atomicIntegerArray.compareAndSet(i, 2, 3);
if (swapped) {
System.out.println(Thread.currentThread().getName() + " swapped, index = " + i + ", value = 3");
}
}
}

}

}

原子类之属性更新器

属性更新器支持基于反射机制的更新字段值的原子操作

  • AtomicIntegerFieldUpdater - 整型字段的原子更新器。
  • AtomicLongFieldUpdater - 长整型字段的原子更新器。
  • AtomicReferenceFieldUpdater - 原子更新引用类型里的字段。

这些类的使用有一定限制:

  • 因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法 newUpdater() 创建一个更新器,并且需要设置想要更新的类和属性。
  • 字段必须是 volatile 类型的;
  • 不能作用于静态变量(static);
  • 不能作用于常量(final);

【示例】AtomicReferenceFieldUpdater 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class AtomicReferenceFieldUpdaterDemo {

static User user = new User("begin");

static AtomicReferenceFieldUpdater<User, String> updater =
AtomicReferenceFieldUpdater.newUpdater(User.class, String.class, "name");

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executorService.execute(new MyThread());
}
executorService.shutdown();
}

static class MyThread implements Runnable {

@Override
public void run() {
if (updater.compareAndSet(user, "begin", "end")) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 已修改 name = " + user.getName());
} else {
System.out.println(Thread.currentThread().getName() + " 已被其他线程修改");
}
}

}

static class User {

volatile String name;

public User(String name) {
this.name = name;
}

public String getName() {
return name;
}

public User setName(String name) {
this.name = name;
return this;
}

}

}

原子类之累加器

DoubleAccumulatorDoubleAdderLongAccumulatorLongAdder,这四个类仅仅用来执行累加操作,相比原子化的基本数据类型,速度更快,但是不支持 compareAndSet() 方法。如果你仅仅需要累加操作,使用原子化的累加器性能会更好,代价就是会消耗更多的内存空间。

LongAdder 内部由一个 base 变量和一个 cell[] 数组组成。

  • 当只有一个写线程,没有竞争的情况下,LongAdder 会直接使用 base 变量作为原子操作变量,通过 CAS 操作修改变量;
  • 当有多个写线程竞争的情况下,除了占用 base 变量的一个写线程之外,其它各个线程会将修改的变量写入到自己的槽 cell[] 数组中。

我们可以发现,LongAdder 在操作后的返回值只是一个近似准确的数值,但是 LongAdder 最终返回的是一个准确的数值, 所以在一些对实时性要求比较高的场景下,LongAdder 并不能取代 AtomicIntegerAtomicLong

ThreadLocal

在多线程环境下,共享变量存在并发安全问题。换个思路,如果变量非共享,而是各个线程独享,就不会有并发安全问题。这种思想有个术语叫线程封闭,其本质上就是避免共享。没有共享,自然也就没有并发安全问题。在 Java 中,ThreadLocal 正是根据这个思路而设计的。

ThreadLocal 为每个线程都创建了一个本地副本,这个副本只能被当前线程访问,其他线程无法访问,那么自然是线程安全的。

ThreadLocal 的应用

ThreadLocal 的方法:

1
2
3
4
5
6
public class ThreadLocal<T> {
public T get() {}
public void set(T value) {}
public void remove() {}
public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {}
}

说明:

  • get - 用于获取 ThreadLocal 在当前线程中保存的变量副本。
  • set - 用于设置当前线程中变量的副本。
  • remove - 用于删除当前线程中变量的副本。如果此线程局部变量随后被当前线程读取,则其值将通过调用其 initialValue 方法重新初始化,除非其值由中间线程中的当前线程设置。 这可能会导致当前线程中多次调用 initialValue 方法。
  • initialValue - 为 ThreadLocal 设置默认的 get 初始值,需要重写 initialValue 方法 。

ThreadLocal 常用于防止对可变的单例(Singleton)变量或全局变量进行共享。典型应用场景有:管理数据库连接、Session 管理等。

::: tabs#ThreadLocal 应用示例

@tab 数据库连接

【示例】数据库连接

1
2
3
4
5
6
7
8
9
10
private static ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>() {
@Override
public Connection initialValue() {
return DriverManager.getConnection(DB_URL);
}
};

public static Connection getConnection() {
return connectionHolder.get();
}

@tab Session 管理

【示例】Session 管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static final ThreadLocal<Session> sessionHolder = new ThreadLocal<>();

public static Session getSession() {
Session session = (Session) sessionHolder.get();
try {
if (session == null) {
session = createSession();
sessionHolder.set(session);
}
} catch (Exception e) {
e.printStackTrace();
}
return session;
}

@tab 线程安全的 SimpleDateFormat

【示例】线程安全的 SimpleDateFormat

SimpleDateFormat 不是线程安全的,如果要保证并发安全,可以使用 ThreadLocal 来解决。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class SafeDateFormat {

//定义 ThreadLocal 变量
static final ThreadLocal<DateFormat>
tl = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));

static DateFormat get() {
return tl.get();
}

public static void main(String[] args) {
//不同线程执行下面代码
//返回的 df 是不同的
DateFormat df = SafeDateFormat.get();
}

}

@tab 完整使用 ThreadLocal 示例

【示例】完整使用 ThreadLocal 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ThreadLocalDemo {

private static ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;
}
};

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executorService.execute(new MyThread());
}
executorService.shutdown();
}

static class MyThread implements Runnable {

@Override
public void run() {
int count = threadLocal.get();
for (int i = 0; i < 10; i++) {
try {
count++;
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
threadLocal.set(count);
threadLocal.remove();
System.out.println(Thread.currentThread().getName() + " : " + count);
}

}

}

:::

ThreadLocal 的原理

存储结构

Thread 类中维护着 2 个 ThreadLocal.ThreadLocalMap 类型的成员 threadLocals 和 inheritableThreadLocals 。这 2 个成员就是用来存储当前线程独占的变量副本。

ThreadLocalMapThreadLocal 的内部类,它维护着一个 Entry 数组,**Entry 继承了 WeakReference** ,所以是弱引用。 Entry 用于保存键值对,其中:

  • keyThreadLocal 对象
  • value 是传递进来的对象(变量副本)

ThreadLocal 关键源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Thread implements Runnable {
// ...
ThreadLocal.ThreadLocalMap threadLocals = null;
// ...
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
}

static class ThreadLocalMap {
// ...
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
// ...
}

如何解决 Hash 冲突

ThreadLocalMap 虽然是类似 Map 结构的数据结构,但它并没有实现 Map 接口。它不支持 Map 接口中的 next 方法,这意味着 ThreadLocalMap 中解决 Hash 冲突的方式并非 拉链表 方式。

实际上,**ThreadLocalMap 采用线性探测的方式来解决 Hash 冲突**。所谓线性探测,就是根据初始 key 的 hashcode 值确定元素在 table 数组中的位置,如果发现这个位置上已经被其他的 key 值占用,则利用固定的算法寻找一定步长的下个位置,依次判断,直至找到能够存放的位置。

内存泄漏问题

ThreadLocal 仅仅是一个代理工具类,内部并不持有任何与线程相关的数据,所有和线程相关的数据都存储在 Thread 里面,这样的设计容易理解。

当然还有一个更加深层次的原因,那就是不容易产生内存泄露。如果 ThreadLocal 和实际实现反其道而行之:将 Thread 的引用维护在一个 Map 中,就会出现这种情况——只要 ThreadLocal 对象存在,那么 Map 中的 Thread 对象就永远不会被回收。而 ThreadLocal 的生命周期往往都比线程要长,所以这种设计方案很容易导致内存泄露。而 Java 的实现中 Thread 持有 ThreadLocalMap,而且 ThreadLocalMap 里对 ThreadLocal 的引用还是弱引用(WeakReference),所以只要 Thread 对象可以被回收,那么 ThreadLocalMap 就能被回收。Java 的这种实现方案虽然看上去复杂一些,但是更加安全。

ThreadLocalMapEntry 继承了 WeakReference,所以它的 key (ThreadLocal 对象)是弱引用,而 value (变量副本)是强引用。如果 ThreadLocal 对象没有外部强引用来引用它,那么 ThreadLocal 对象会在下次 GC 时被回收。此时,Entry 中的 key 已经被回收,但是 value 由于是强引用不会被垃圾收集器回收。如果创建 ThreadLocal 的线程一直持续运行,那么 value 就会一直得不到回收,从而导致内存泄露

那么如何避免内存泄漏呢?方法就是:使用 ThreadLocalset 方法后,在 try {} finally {} 中显示的调用 remove 方法

1
2
3
4
5
6
7
8
9
10
11
12
ExecutorService es;
ThreadLocal tl;
es.execute(() -> {
//ThreadLocal 增加变量
tl.set(obj);
try {
// 省略业务逻辑代码
} finally {
//手动清理 ThreadLocal
tl.remove();
}
});

ThreadLocal 的误区

示例摘自:极客时间教程 - Java 业务开发常见错误 100 例

ThreadLocal 适用于变量在线程间隔离,而在方法或类间共享的场景。

前文提到,ThreadLocal 是线程隔离的,那么是不是使用 ThreadLocal 就一定高枕无忧呢?

ThreadLocal 错误案例

使用 Spring Boot 创建一个 Web 应用程序,使用 ThreadLocal 存放一个 Integer 的值,来暂且代表需要在线程中保存的用户信息,这个值初始是 null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);

@GetMapping("wrong")
public Map<String, String> wrong(@RequestParam("id") Integer userId) {
//设置用户信息之前先查询一次 ThreadLocal 中的用户信息
String before = Thread.currentThread().getName() + ":" + currentUser.get();
//设置用户信息到 ThreadLocal
currentUser.set(userId);
//设置用户信息之后再查询一次 ThreadLocal 中的用户信息
String after = Thread.currentThread().getName() + ":" + currentUser.get();
//汇总输出两次查询结果
Map<String, String> result = new HashMap<>();
result.put("before", before);
result.put("after", after);
return result;
}

【预期】从代码逻辑来看,我们预期第一次获取的值始终应该是 null。

【实际】

为了方便复现,将 Tomcat 工作线程设为 1:

1
server.tomcat.max-threads=1

当访问 id = 1 时,符合预期

img

当访问 id = 2 时,before 的应答不是 null,而是 1,不符合预期。

【分析】实际情况和预期存在偏差。Spring Boot 程序运行在 Tomcat 中,执行程序的线程是 Tomcat 的工作线程,而 Tomcat 的工作线程是基于线程池的。线程池会重用固定的几个线程,一旦线程重用,那么很可能首次从
ThreadLocal 获取的值是之前其他用户的请求遗留的值。这时,ThreadLocal 中的用户信息就是其他用户的信息

并不能认为没有显式开启多线程就不会有线程安全问题。使用类似 ThreadLocal 工具来存放一些数据时,需要特别注意在代码运行完后,显式地去清空设置的数据。

ThreadLocal 错误案例修正

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@GetMapping("right")
public Map<String, String> right(@RequestParam("id") Integer userId) {
String before = Thread.currentThread().getName() + ":" + currentUser.get();
currentUser.set(userId);
try {
String after = Thread.currentThread().getName() + ":" + currentUser.get();
Map<String, String> result = new HashMap<>();
result.put("before", before);
result.put("after", after);
return result;
} finally {
//在 finally 代码块中删除 ThreadLocal 中的数据,确保数据不串
currentUser.remove();
}
}

InheritableThreadLocal

通过 ThreadLocal 创建的线程变量,其子线程是无法继承的。也就是说你在线程中通过 ThreadLocal 创建了线程变量 V,而后该线程创建了子线程,你在子线程中是无法通过 ThreadLocal 来访问父线程的线程变量 V 的。

如果你需要子线程继承父线程的线程变量,那该怎么办呢?其实很简单,Java 提供了 InheritableThreadLocal 来支持这种特性,InheritableThreadLocalThreadLocal 子类,所以用法和 ThreadLocal 相同。与 ThreadLocal 不同的是,InheritableThreadLocal 允许一个线程以及该线程创建的所有子线程都可以访问它保存的数据。

不过,完全不建议你在线程池中使用 InheritableThreadLocal,不仅仅是因为它具有 ThreadLocal 相同的缺点——可能导致内存泄露,更重要的原因是:线程池中线程的创建是动态的,很容易导致继承关系错乱,如果你的业务逻辑依赖 InheritableThreadLocal,那么很可能导致业务逻辑计算错误,而这个错误往往比内存泄露更要命。

原理参考:Java 多线程:InheritableThreadLocal 实现原理

Immutability 模式

解决并发问题,其实最简单的办法就是让共享变量只有读操作,而没有写操作。这个办法如此重要,以至于被上升到了一种解决并发问题的设计模式:不变性(Immutability)模式。所谓不变性,是指:一旦创建,状态不再变化。换句话说,就是变量一旦被赋值,就不允许修改了(没有写操作);没有修改操作,也就是保持了不变性。

快速实现具备不可变性的类

将一个类所有的属性都设置成 final 的,并且只允许存在只读方法,那么这个类基本上就具备不可变性了。更严格的做法是这个类本身也是 final 的,也就是不允许继承。因为子类可以覆盖父类的方法,有可能改变不可变性,所以推荐你在实际工作中,使用这种更严格的做法。

在 Java 中,经常用到的 StringLongIntegerDouble 等基础类型的包装类都具备不可变性,这些对象的线程安全性都是靠不可变性来保证的。如果你仔细翻看这些类的声明、属性和方法,你会发现它们都严格遵守不可变类的三点要求:类和属性都是 final 的,所有方法均是只读的

String 这个类虽然有替换操作,但实际仍是只读的。阅读 String 源码可以发现:String 这个类以及它的属性 value[] 都是 final 的;而 replace() 方法的实现,就的确没有修改 value[],而是将替换后的字符串作为返回值返回了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public final class String {
private final char value[];
// 字符替换
String replace(char oldChar,
char newChar) {
//无需替换,直接返回 this
if (oldChar == newChar){
return this;
}

int len = value.length;
int i = -1;
/* avoid getfield opcode */
char[] val = value;
//定位到需要替换的字符位置
while (++i < len) {
if (val[i] == oldChar) {
break;
}
}
//未找到 oldChar,无需替换
if (i >= len) {
return this;
}
//创建一个 buf[],这是关键
//用来保存替换后的字符串
char buf[] = new char[len];
for (int j = 0; j < i; j++) {
buf[j] = val[j];
}
while (i < len) {
char c = val[i];
buf[i] = (c == oldChar) ?
newChar : c;
i++;
}
//创建一个新的字符串返回
//原字符串不会发生任何变化
return new String(buf, true);
}
}

通过分析 String 的实现,你可能已经发现了,如果具备不可变性的类,需要提供类似修改的功能,具体该怎么操作呢?做法很简单,那就是创建一个新的不可变对象,这是与可变对象的一个重要区别,可变对象往往是修改自己的属性。

使用 Immutability 模式的注意事项

在使用 Immutability 模式的时候,需要注意以下两点:

  1. 对象的所有属性都是 final 的,并不能保证不可变性;
  2. 不可变对象也需要正确发布。

在 Java 语言中,final 修饰的属性一旦被赋值,就不可以再修改,但是如果属性的类型是普通对象,那么这个普通对象的属性是可以被修改的。例如下面的代码中,Bar 的属性 foo 虽然是 final 的,依然可以通过 setAge() 方法来设置 foo 的属性 age。所以,在使用 Immutability 模式的时候一定要确认保持不变性的边界在哪里,是否要求属性对象也具备不可变性

1
2
3
4
5
6
7
8
9
10
class Foo{
int age=0;
int name="abc";
}
final class Bar {
final Foo foo;
void setAge(int a){
foo.age=a;
}
}

不可变对象虽然是线程安全的,但是并不意味着引用这些不可变对象的对象就是线程安全的。例如在下面的代码中,Foo 具备不可变性,线程安全,但是类 Bar 并不是线程安全的,类 Bar 中持有对 Foo 的引用 foo,对 foo 这个引用的修改在多线程中并不能保证可见性和原子性。

1
2
3
4
5
6
7
8
9
10
11
12
//Foo 线程安全
final class Foo{
final int age=0;
final int name="abc";
}
//Bar 线程不安全
class Bar {
Foo foo;
void setFoo(Foo f){
this.foo=f;
}
}

如果你的程序仅仅需要 foo 保持可见性,无需保证原子性,那么可以将 foo 声明为 volatile 变量,这样就能保证可见性。如果你的程序需要保证原子性,那么可以通过原子类来实现。下面的示例代码是合理库存的原子化实现,你应该很熟悉了,其中就是用原子类解决了不可变对象引用的原子性问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class SafeWM {

class WMRange {
final int upper;
final int lower;
WMRange(int upper, int lower) {
//省略构造函数实现
}
}

final AtomicReference<WMRange> rf = new AtomicReference<>(new WMRange(0, 0));

// 设置库存上限
void setUpper(int v) {
while (true) {
WMRange or = rf.get();
// 检查参数合法性
if (v < or.lower) {
throw new IllegalArgumentException();
}
WMRange nr = new WMRange(v, or.lower);
if (rf.compareAndSet(or, nr)) {
return;
}
}
}
}

Copy-on-Write 模式

所谓 Copy-on-Write,经常被缩写为 CoW,顾名思义就是写时复制

Java 支持 CopyOnWriteArrayListCopyOnWriteArraySet 两种并发容器,其设计思想就是 CoW;通过 Copy-on-Write 这两个容器实现的读操作是无锁的,由于无锁,所以将读操作的性能发挥到了极致。

CoW 是一项非常通用的技术方案,在很多领域都有着广泛的应用。不过,它也有缺点的,那就是消耗内存,每次修改都需要复制一个新的副本出来。

参考资料

Java 并发之同步工具

Semaphore

Semaphore 译为信号量,是一种同步机制,用于控制多线程对共享资源的访问。信号量是由计算机科学家 Edsger Dijkstra 于 1965 年提出的,用于解决所谓的“临界区”问题,即多个进程或线程试图同时访问共享资源(如打印机、内存缓冲区等)时可能出现的问题。

信号量模型

信号量模型还是很简单的,可以简单概括为:一个计数器,一个等待队列,三个方法。在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()、down() 和 up()。

  • 这三个方法详细的语义具体如下所示。

    • init():设置计数器的初始值。
    • down():计数器的值减 1;如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程可以继续执行。
    • up():计数器的值加 1;如果此时计数器的值小于或者等于 0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。

    这里提到的 init()、down() 和 up() 三个方法都是原子性的,并且这个原子性是由信号量模型的实现方保证的。在 Java 中,信号量模型是由 java.util.concurrent.Semaphore 实现的,Semaphore 这个类能够保证这三个方法都是原子操作。

    信号量模型里面,down()、up() 这两个操作历史上最早称为 P 操作和 V 操作,所以信号量模型也被称为 PV 原语。

Semaphore 使用

Semaphore 提供了 2 个构造方法:

1
2
3
4
// 参数 permits 表示许可数目,即同时可以允许多少线程进行访问
public Semaphore(int permits) {}
// 参数 fair 表示是否是公平的,即等待时间越久的越先获取许可
public Semaphore(int permits, boolean fair) {}

说明:

  • permits - 初始化固定数量的 permit。
  • fair - 设置是否为公平模式。所谓公平,是指等待久的优先获取 permit。

Semaphore 的重要方法:

1
2
3
4
5
6
7
8
// 获取 1 个许可
public void acquire() throws InterruptedException {}
//获取 permits 个许可
public void acquire(int permits) throws InterruptedException {}
// 释放 1 个许可
public void release() {}
//释放 permits 个许可
public void release(int permits) {}

说明:

  • acquire() - 获取 1 个 permit。
  • acquire(int permits) - 获取 permits 数量的 permit。
  • release() - 释放 1 个 permit。
  • release(int permits) - 释放 permits 数量的 permit。

img

【示例】Semaphore 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class SemaphoreDemo {

private static final int THREAD_COUNT = 30;

private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

private static Semaphore semaphore = new Semaphore(10);

public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("save data");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}

threadPool.shutdown();
}

}

Semaphore 原理

Semaphore 是共享锁的一种实现,它默认构造 AQS 的 state 值为 permits,你可以将 permits 的值理解为许可证的数量,只有拿到许可证的线程才能执行。

调用semaphore.acquire() ,线程尝试获取许可证,如果 state >= 0 的话,则表示可以获取成功。如果获取成功的话,使用 CAS 操作去修改 state 的值 state=state-1。如果 state<0 的话,则表示许可证数量不足。此时会创建一个 Node 节点加入阻塞队列,挂起当前线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 获取 1 个许可证
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 共享模式下获取许可证,获取成功则返回,失败则加入阻塞队列,挂起线程
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取许可证,arg 为获取许可证个数,当可用许可证数减当前获取的许可证数结果小于 0, 则创建一个节点加入阻塞队列,挂起当前线程。
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

调用semaphore.release(); ,线程尝试释放许可证,并使用 CAS 操作去修改 state 的值 state=state+1。释放许可证成功之后,同时会唤醒同步队列中的一个线程。被唤醒的线程会重新尝试去修改 state 的值 state=state-1 ,如果 state>=0 则获取令牌成功,否则重新进入阻塞队列,挂起线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 释放一个许可证
public void release() {
sync.releaseShared(1);
}

// 释放共享锁,同时会唤醒同步队列中的一个线程。
public final boolean releaseShared(int arg) {
//释放共享锁
if (tryReleaseShared(arg)) {
//唤醒同步队列中的一个线程
doReleaseShared();
return true;
}
return false;
}

实现一个限流器

Semaphore 最重要的特性是:Semaphore 可以允许多个线程访问一个临界区

Semaphore 在现实中有很多应用场景:

  • 各种池化资源,例如连接池、对象池、线程池等;
  • 信号量限流(例如 Hystrix 就支持信号量限流模式);

【示例】一个基于信号量实现的简单对象限流器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class SemaphoreRateLimit {

public static void main(String[] args) {
// 创建对象池,大小为 10
ObjectPool<Long, String> pool = new ObjectPool<>(10, 2L);
// 通过对象池获取 t,之后执行
pool.exec(t -> {
System.out.println(t);
return t.toString();
});
}

static class ObjectPool<T, R> {

final List<T> pool;
// 用信号量实现限流器
final Semaphore sem;

// 构造函数
ObjectPool(int size, T t) {
pool = new Vector<T>() { };
for (int i = 0; i < size; i++) {
pool.add(t);
}
sem = new Semaphore(size);
}

// 利用对象池的对象,调用 func
R exec(Function<T, R> func) {
T t = null;
try {
sem.acquire();
t = pool.remove(0);
return func.apply(t);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
pool.add(t);
sem.release();
return null;
}
}

}

}

在这个方法里面,我们首先调用 acquire() 方法(与之匹配的是在 finally 里面调用 release() 方法),假设对象池的大小是 10,信号量的计数器初始化为 10,那么前 10 个线程调用 acquire() 方法,都能继续执行,相当于通过了信号量,而其他线程则会阻塞在 acquire() 方法上。对于通过信号量的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过 pool.remove(0) 实现的),分配完之后会执行一个回调函数 func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过 pool.add(t) 实现的),同时调用 release() 方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于 0,那么说明有线程在等待,此时会自动唤醒等待的线程。

CountDownLatch

CountDownLatch 字面意思为递减计数锁。用于控制一个线程等待多个线程

CountDownLatch 内部维护了一个计数器,表示需要等待的事件数量。countDown 方法递减计数器,表示有一个事件已经发生。调用 await 方法的线程会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。

img

CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用 countDown() 方法时,其实使用了tryReleaseShared方法以 CAS 的操作来减少 state,直至 state 为 0 。当调用 await() 方法的时候,如果 state 不为 0,那就证明任务还没有执行完毕,await() 方法就会一直阻塞,也就是说 await() 方法之后的语句不会被执行。直到count 个线程调用了countDown()使 state 值被减为 0,或者调用await()的线程被中断,该线程才会从阻塞中被唤醒,await() 方法之后的语句得到执行。

CountDownLatch 唯一的构造方法:

1
2
// 初始化计数器
public CountDownLatch(int count) {};

CountDownLatch 的重要方法:

1
2
3
public void await() throws InterruptedException { };
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
public void countDown() { };

说明:

  • await() - 调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行。
  • await(long timeout, TimeUnit unit) - 和 await() 类似,只不过等待一定的时间后 count 值还没变为 0 的话就会继续执行
  • countDown() - 将统计值 count 减 1

【示例】CountDownLatch 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class CountDownLatchDemo {

public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2);

new Thread(new MyThread(latch)).start();

try {
System.out.println("等待 2 个子线程执行完毕。..");
latch.await();
System.out.println("2 个子线程已经执行完毕");
System.out.println("继续执行主线程");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

static class MyThread implements Runnable {

private CountDownLatch latch;

public MyThread(CountDownLatch latch) {
this.latch = latch;
}

@Override
public void run() {
System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
latch.countDown();
}

}

}

CyclicBarrier

CyclicBarrier 字面意思是循环栅栏。**CyclicBarrier 可以让一组线程等待至某个状态(遵循字面意思,不妨称这个状态为栅栏)之后再全部同时执行。之所以叫循环栅栏是因为:当所有等待线程都被释放以后,CyclicBarrier 可以被重用**。

CyclicBarrier 是基于 ReentrantLockReentrantLock 底层也是基于 AQS 实现的)和 Condition 实现的。CyclicBarrier 内部维护一个计数器,每次执行 await 方法之后,计数器加 1,直到计数器的值和设置的值相等,等待的所有线程才会继续执行。

CyclicBarrier 在并行迭代算法中非常有用。

img

CyclicBarrier 提供了 2 个构造方法

1
2
public CyclicBarrier(int parties) {}
public CyclicBarrier(int parties, Runnable barrierAction) {}

说明:

  • parties - parties 数相当于一个阈值,当有 parties 数量的线程在等待时, CyclicBarrier 处于栅栏状态。
  • barrierAction - 当 CyclicBarrier 处于栅栏状态时执行的动作。

CyclicBarrier 的重要方法:

1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {}
// 将屏障重置为初始状态
public void reset() {}

说明:

  • await() - 等待调用 await() 的线程数达到屏障数。如果当前线程是最后一个到达的线程,并且在构造函数中提供了非空屏障操作,则当前线程在允许其他线程继续之前运行该操作。如果在屏障动作期间发生异常,那么该异常将在当前线程中传播并且屏障被置于断开状态。
  • await(long timeout, TimeUnit unit) - 相比于 await() 方法,这个方法让这些线程等待至一定的时间,如果还有线程没有到达栅栏状态就直接让到达栅栏状态的线程执行后续任务。
  • reset() - 将屏障重置为初始状态。

【示例】CyclicBarrier 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class CyclicBarrierDemo {

final static int N = 4;

public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(N,
new Runnable() {
@Override
public void run() {
System.out.println("当前线程" + Thread.currentThread().getName());
}
});

for (int i = 0; i < N; i++) {
MyThread myThread = new MyThread(barrier);
new Thread(myThread).start();
}
}

static class MyThread implements Runnable {

private CyclicBarrier cyclicBarrier;

MyThread(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据。..");
try {
Thread.sleep(3000); // 以睡眠来模拟写入数据操作
System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}

}

}

小结

  • CountDownLatchCyclicBarrier 都能够实现线程之间的等待,只不过它们侧重点不同:
    • CountDownLatch 一般用于某个线程 A 等待若干个其他线程执行完任务之后,它才执行;
    • CyclicBarrier 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
    • 另外,CountDownLatch 是不可以重用的,而 CyclicBarrier 是可以重用的。
  • Semaphore 其实和锁有点类似,它一般用于控制对某组资源的访问权限。

参考资料

Java 并发之线程池

线程池简介

线程池就是管理一系列线程的资源池,其提供了一种限制和管理线程资源的方式。每个线程池还维护一些基本统计信息,例如已完成任务的数量。

如果并发请求数量很多,但每个线程执行的时间很短,就会出现频繁的创建和销毁线程。如此一来,会大大降低系统的效率,可能频繁创建和销毁线程的时间、资源开销要大于实际工作的所需。

使用 线程池的好处 有以下几点:

  • 降低资源消耗 - 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度 - 当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性 - 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

Executor 框架

Executor 框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架,目的是提供一种将”任务提交”与”任务如何运行”分离开来的机制。

通过 Executor 来启动线程比使用 Threadstart 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题。

this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用,调用尚未构造完全的对象的方法可能引发令人疑惑的错误。

核心 API 概述

Executor 框架核心 API 如下:

  • Executor - 运行任务的接口。
  • ExecutorService - 扩展了 Executor 接口。扩展能力:
    • 支持有返回值的线程;
    • 支持管理线程的生命周期。
  • ScheduledExecutorService - 扩展了 ExecutorService 接口,支持定时调度任务。
  • AbstractExecutorService - ExecutorService 接口的默认实现。
  • ThreadPoolExecutor - Executor 框架最核心的类,它继承了 AbstractExecutorService 类。
  • ScheduledThreadPoolExecutor - ScheduledExecutorService 接口的实现,一个可定时调度任务的线程池。
  • Executors - 可以通过调用 Executors 的静态工厂方法来创建线程池并返回一个 ExecutorService 对象。

img

Executor

Executor 接口中只定义了一个 execute 方法,用于接收一个 Runnable 对象。

1
2
3
public interface Executor {
void execute(Runnable command);
}

ExecutorService

ExecutorService 接口继承了 Executor 接口,它还提供了 invokeAllinvokeAnyshutdownsubmit 等方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public interface ExecutorService extends Executor {

void shutdown();

List<Runnable> shutdownNow();

boolean isShutdown();

boolean isTerminated();

boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

从其支持的方法定义,不难看出:相比于 Executor 接口,ExecutorService 接口主要的扩展是:

  • 支持有返回值的线程 - sumbitinvokeAllinvokeAny 方法中都支持传入Callable 对象。
  • 支持管理线程生命周期 - shutdownshutdownNowisShutdown 等方法。

ScheduledExecutorService

ScheduledExecutorService 接口扩展了 ExecutorService 接口。

它除了支持前面两个接口的所有能力以外,还支持定时调度线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface ScheduledExecutorService extends ExecutorService {

public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);

}

其扩展的接口提供以下能力:

  • schedule 方法可以在指定的延时后执行一个 Runnable 或者 Callable 任务。
  • scheduleAtFixedRate 方法和 scheduleWithFixedDelay 方法可以按照指定时间间隔,定期执行任务。

ThreadPoolExecutor

java.uitl.concurrent.ThreadPoolExecutor 类是 Executor 框架中最核心的类。

构造方法

ThreadPoolExecutor 有四个构造方法,前三个都是基于第四个实现。第四个构造方法定义如下:

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,// 线程池的核心线程数量
int maximumPoolSize,// 线程池的最大线程数
long keepAliveTime,// 当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,// 时间单位
BlockingQueue<Runnable> workQueue,// 任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,// 线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler// 拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {// 略}

参数说明:

  • corePoolSize - 表示线程池保有的最小线程数
  • maximumPoolSize - 表示线程池允许创建的最大线程数
    • 如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。
    • 值得注意的是:如果使用了无界的任务队列这个参数就没什么效果。
  • keepAliveTime & unit - 表示线程保持活动的时间。如果一个线程空闲了keepAliveTime & unit 这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了。
  • workQueue - 等待执行的任务队列。用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。
    • ArrayBlockingQueue - 有界阻塞队列
    • LinkedBlockingQueue - 无界阻塞队列
    • SynchronousQueue - 不会保存提交的任务,而是将直接新建一个线程来执行新来的任务
    • DelayedWorkQueue - 延迟阻塞队列。
    • PriorityBlockingQueue - 具有优先级的无界阻塞队列
  • threadFactory - 线程工厂。线程工程用于自定义如何创建线程。
  • handler - 拒绝策略。它是 RejectedExecutionHandler 类型的变量。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。线程池支持以下策略:
    • AbortPolicy - 丢弃任务并抛出异常。这也是默认策略,会抛出 RejectedExecutionException
    • DiscardPolicy - 丢弃任务但不抛出异常
    • DiscardOldestPolicy - 丢弃队列最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
    • CallerRunsPolicy - 提交任务的线程自己去执行该任务。
    • 如果以上策略都不能满足需要,也可以通过实现 RejectedExecutionHandler 接口来定制处理策略。如记录日志或持久化不能处理的任务。

重要字段

ThreadPoolExecutor 有以下重要字段:

1
2
3
4
5
6
7
8
9
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

参数说明:

  • ctl - 用于控制线程池的运行状态和线程池中的有效线程数量。它包含两部分的信息:
    • 线程池的运行状态 (runState)
    • 线程池内有效线程的数量 (workerCount)
    • 可以看到,ctl 使用了 Integer 类型来保存,高 3 位保存 runState,低 29 位保存 workerCountCOUNT_BITS 就是 29,CAPACITY 就是 1 左移 29 位减 1(29 个 1),这个常量表示 workerCount 的上限值,大约是 5 亿。
  • 运行状态 - 线程池一共有五种运行状态:
    • RUNNING - 运行状态。接受新任务,并且也能处理阻塞队列中的任务。
    • SHUTDOWN - 关闭状态。不接受新任务,但可以处理阻塞队列中的任务。
      • 在线程池处于 RUNNING 状态时,调用 shutdown 方法会使线程池进入到该状态。
      • finalize 方法在执行过程中也会调用 shutdown 方法进入该状态。
    • STOP - 停止状态。不接受新任务,也不处理队列中的任务。会中断正在处理任务的线程。在线程池处于 RUNNINGSHUTDOWN 状态时,调用 shutdownNow 方法会使线程池进入到该状态。
    • TIDYING - 整理状态。如果所有的任务都已终止了,workerCount (有效线程数) 为 0,线程池进入该状态后会调用 terminated 方法进入 TERMINATED 状态。
    • TERMINATED - 已终止状态。在 terminated 方法执行完后进入该状态。默认 terminated 方法中什么也没有做。进入 TERMINATED 的条件如下:
      • 线程池不是 RUNNING 状态;
      • 线程池状态不是 TIDYING 状态或 TERMINATED 状态;
      • 如果线程池状态是 SHUTDOWN 并且 workerQueue 为空;
      • workerCount 为 0;
      • 设置 TIDYING 状态成功。

其他重要方法

ThreadPoolExecutor 类中还有一些重要的方法:

  • submit - 类似于 execute,但是针对的是有返回值的线程。submit 方法是在 ExecutorService 中声明的方法,在 AbstractExecutorService 就已经有了具体的实现。ThreadPoolExecutor 直接复用 AbstractExecutorServicesubmit 方法。
  • shutdown - 不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务。
    • 将线程池切换到 SHUTDOWN 状态;
    • 并调用 interruptIdleWorkers 方法请求中断所有空闲的 worker;
    • 最后调用 tryTerminate 尝试结束线程池。
  • shutdownNow - 立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。与 shutdown 方法类似,不同的地方在于:
    • 设置状态为 STOP
    • 中断所有工作线程,无论是否是空闲的;
    • 取出阻塞队列中没有被执行的任务并返回。
  • isShutdown - 调用了 shutdownshutdownNow 方法后,isShutdown 方法就会返回 true。
  • isTerminaed - 当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。
  • setCorePoolSize - 设置核心线程数大小。
  • setMaximumPoolSize - 设置最大线程数大小。
  • getTaskCount - 线程池已经执行的和未执行的任务总数;
  • getCompletedTaskCount - 线程池已完成的任务数量,该值小于等于 taskCount
  • getLargestPoolSize - 线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了 maximumPoolSize
  • getPoolSize - 线程池当前的线程数量;
  • getActiveCount - 当前线程池中正在执行任务的线程数量。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ThreadPoolExecutorDemo {

public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 500, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

for (int i = 0; i < 100; i++) {
threadPoolExecutor.execute(new MyThread());
String info = String.format("线程池中线程数目:%s,队列中等待执行的任务数目:%s,已执行玩别的任务数目:%s",
threadPoolExecutor.getPoolSize(),
threadPoolExecutor.getQueue().size(),
threadPoolExecutor.getCompletedTaskCount());
System.out.println(info);
}
threadPoolExecutor.shutdown();
}

static class MyThread implements Runnable {

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}

}

}

线程池原理

默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。提交任务可以使用 execute 方法,它是 ThreadPoolExecutor 的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 用于控制线程池的运行状态和线程池中的有效线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

// 获取 ctl 中存储的线程池状态信息
int c = ctl.get();

// 线程池执行可以分为 3 个步骤
// 1. 若工作线程数小于核心线程数,则尝试启动一个新的线程来执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

// 2. 如果任务可以成功地加入队列,还需要再次确认是否需要添加新的线程(因为可能自从上次检查以来已经有线程死亡)或者检查线程池是否已经关闭
// -> 如果是后者,则可能需要回滚入队操作;
// -> 如果是前者,则可能需要启动新的线程
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果任务无法加入队列,则尝试添加一个新的线程
// 如果添加新线程失败,说明线程池已经关闭或者达到了容量上限,此时将拒绝该任务
else if (!addWorker(command, false))
reject(command);
}

execute 方法工作流程如下:

  1. 如果 workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
  2. 如果 workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
  3. 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
  4. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务,默认的处理方式是直接抛异常。

execute 方法中,多次调用 addWorker 方法。addWorker 这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// 全局锁,并发操作必备
private final ReentrantLock mainLock = new ReentrantLock();
// 跟踪线程池的最大大小,只有在持有全局锁 mainLock 的前提下才能访问此集合
private int largestPoolSize;
// 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁 mainLock 的前提下才能访问此集合
private final HashSet<Worker> workers = new HashSet<>();
//获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//判断线程池的状态是否为 Running
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

/**
* 添加新的工作线程到线程池
* @param firstTask 要执行
* @param core 参数为 true 的话表示使用线程池的基本大小,为 false 使用线程池最大大小
* @return 添加成功就返回 true 否则返回 false
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//这两句用来获取线程池的状态
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
//获取线程池中工作的线程的数量
int wc = workerCountOf(c);
// core 参数为 false 的话表明队列也满了,线程池大小变为 maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//原子操作将 workcount 的数量加 1
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果线程的状态改变了就再次执行上述操作
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 标记工作线程是否启动成功
boolean workerStarted = false;
// 标记工作线程是否创建成功
boolean workerAdded = false;
Worker w = null;
try {

w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取线程池状态
int rs = runStateOf(ctl.get());
//rs < SHUTDOWN 如果线程池状态依然为 RUNNING, 并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中
//(rs=SHUTDOWN && firstTask == null) 如果线程池状态小于 STOP,也就是 RUNNING 或者 SHUTDOWN 状态下,同时传入的任务实例 firstTask 为 null,则需要添加到工作线程集合和启动新的 Worker
// firstTask == null 证明只新建线程而不执行任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
//更新当前工作线程的最大容量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 工作线程是否启动成功
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
//// 如果成功添加工作线程,则调用 Worker 内部的线程实例 t 的 Thread#start() 方法启动真实的线程实例
if (workerAdded) {
t.start();
/// 标记线程启动成功
workerStarted = true;
}
}
} finally {
// 线程启动失败,需要从工作线程中移除对应的 Worker
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

Executors

Executors 类中提供了几种内置的 ThreadPoolExecutor 实现:

  • FixedThreadPool:固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • SingleThreadExecutor: 只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
  • CachedThreadPool: 可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
  • ScheduledThreadPool:给定的延迟后运行任务或者定期执行任务的线程池。

注意:

《阿里巴巴 Java 开发手册》中明确要求不要使用 Executors 中的内置化线程池。

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors 返回的线程池对象的弊端如下:

  1. FixedThreadPoolSingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
  2. CachedThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
  3. ScheduledThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

FixedThreadPool

FixedThreadPool 是一个可重用的、线程数固定的线程池Executors 类中的相关源码:

1
2
3
4
5
6
7
8
9
10
11
12
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

FixedThreadPoolcorePoolSizemaximumPoolSize 都被设置为 nThreads,这个 nThreads 参数是我们使用的时候自己传递的。

即使 maximumPoolSize 的值比 corePoolSize 大,也至多只会创建 corePoolSize 个线程。这是因为FixedThreadPool 使用的是容量为 Integer.MAX_VALUELinkedBlockingQueue(无界队列),队列永远不会被放满。

FixedThreadPool 的问题:

FixedThreadPool 使用无界队列 LinkedBlockingQueue(队列的容量为 Integer.MAX_VALUE)作为线程池的工作队列会对线程池带来如下影响:

  1. 当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize
  2. 由于使用无界队列时 maximumPoolSize 将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建 FixedThreadPool的源码可以看出创建的 FixedThreadPoolcorePoolSizemaximumPoolSize 被设置为同一个值。
  3. 由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数;
  4. 运行中的 FixedThreadPool(未执行 shutdown()shutdownNow())不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)。

SingleThreadExecutor

SingleThreadExecutor 是只有一个线程的线程池。SingleThreadExecutor 只会创建唯一的工作线程来执行任务,保证所有任务按照指定顺序 (FIFO, LIFO, 优先级)执行。 如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它

Executors 类中的相关源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

SingleThreadExecutor 的问题:

SingleThreadExecutorFixedThreadPool 一样,使用的都是容量为 Integer.MAX_VALUELinkedBlockingQueue(无界队列)作为线程池的工作队列。SingleThreadExecutor 使用无界队列作为线程池的工作队列会对线程池带来的影响与 FixedThreadPool 相同。说简单点,就是可能会导致 OOM。

CachedThreadPool

CachedThreadPool 是一个会根据需要创建新线程的线程池。

  • 如果线程池大小超过处理任务所需要的线程数,就会回收部分空闲的线程;
  • 如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为 1 分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
  • 此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说 JVM)能够创建的最大线程大小。 因此,使用 CachedThreadPool 时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。
1
2
3
4
5
6
7
8
9
10
11
12
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

CachedThreadPoolcorePoolSize 被设置为空(0),maximumPoolSize被设置为 Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。

CachedThreadPool 的执行流程:

  1. 首先执行 SynchronousQueue.offer(Runnable task) 提交任务到任务队列。如果当前 maximumPool 中有闲线程正在执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行 offer 操作与空闲线程执行的 poll 操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成,否则执行下面的步骤 2;
  2. 当初始 maximumPool 为空,或者 maximumPool 中没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤 1 将失败,此时 CachedThreadPool 会创建新线程执行任务,execute 方法执行完成;

CachedThreadPool 的问题:

CachedThreadPool 使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

ScheduleThreadPool

ScheduledThreadPool 用来在给定的延迟后运行任务或者定期执行任务。这个在实际项目中基本不会被用到,也不推荐使用。

1
2
3
4
5
6
7
8
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

ScheduledThreadPool 是通过 ScheduledThreadPoolExecutor 创建的,使用的DelayedWorkQueue(延迟阻塞队列)作为线程池的任务队列。

DelayedWorkQueue 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。DelayedWorkQueue 添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE,所以最多只能创建核心线程数的线程。

ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,所以创建 ScheduledThreadExecutor 本质也是创建一个 ThreadPoolExecutor 线程池,只是传入的参数不相同。

ScheduledThreadPoolExecutor 和 Timer 对比

  • Timer 对系统时钟的变化敏感,ScheduledThreadPoolExecutor不是;
  • Timer 只有一个执行线程,因此长时间运行的任务可以延迟其他任务。 ScheduledThreadPoolExecutor 可以配置任意数量的线程。 此外,如果你想(通过提供 ThreadFactory),你可以完全控制创建的线程;
  • TimerTask 中抛出的运行时异常会杀死一个线程,从而导致 Timer 死机即计划任务将不再运行。ScheduledThreadExecutor 不仅捕获运行时异常,还允许您在需要时处理它们(通过重写 afterExecute 方法ThreadPoolExecutor)。抛出异常的任务将被取消,但其他任务将继续运行。

WorkStealingPool

WorkStealingPool 是 JDK8 才引入的。

其内部会构建 ForkJoinPool,利用 Work-Stealing 算法,并行地处理任务,不保证处理顺序。

线程池最佳实践

计算线程数量

一般多线程执行的任务类型可以分为 CPU 密集型和 I/O 密集型,根据不同的任务类型,我们计算线程数的方法也不一样。

CPU 密集型任务:这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

I/O 密集型任务:这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

建议使用有界阻塞队列

不建议使用 Executors 的最重要的原因是:Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列

《阿里巴巴 Java 开发手册》中提到,禁止使用这些方法来创建线程池,而应该手动 new ThreadPoolExecutor 来创建线程池。制订这条规则是因为容易导致生产事故,最典型的就是 newFixedThreadPoolnewCachedThreadPool,可能因为资源耗尽导致 OOM 问题。

【示例】newFixedThreadPool OOM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
printStats(threadPool);
for (int i = 0; i < 100000000; i++) {
threadPool.execute(() -> {
String payload = IntStream.rangeClosed(1, 1000000)
.mapToObj(__ -> "a")
.collect(Collectors.joining("")) + UUID.randomUUID().toString();
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
}
log.info(payload);
});
}

threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);

newFixedThreadPool 使用的工作队列是 LinkedBlockingQueue ,而默认构造方法的 LinkedBlockingQueue 是一个 Integer.MAX_VALUE 长度的队列,可以认为是无界的。如果任务较多并且执行较慢的话,队列可能会快速积压,撑爆内存导致 OOM。

【示例】newCachedThreadPool OOM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
printStats(threadPool);
for (int i = 0; i < 100000000; i++) {
threadPool.execute(() -> {
String payload = UUID.randomUUID().toString();
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
}
log.info(payload);
});
}
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);

newCachedThreadPool 的最大线程数是 Integer.MAX_VALUE,可以认为是没有上限的,而其工作队列 SynchronousQueue 是一个没有存储空间的阻塞队列。这意味着,只要有请求到来,就必须找到一条工作线程来处理,如果当前没有空闲的线程就再创建一条新的。

如果大量的任务进来后会创建大量的线程。我们知道线程是需要分配一定的内存空间作为线程栈的,比如 1MB,因此无限制创建线程必然会导致 OOM。

监测线程池运行状态

可以通过一些手段来检测线程池的运行状态比如 SpringBoot 中的 Actuator 组件。

除此之外,我们还可以利用 ThreadPoolExecutor 的相关 API 做一个简陋的监控。从下图可以看出, ThreadPoolExecutor提供了获取线程池当前的线程数和活跃线程数、已经执行完成的任务数、正在排队中的任务数等等。

下面是一个简单的 Demo。printThreadPoolStatus()会每隔一秒打印出线程池的线程数、活跃线程数、完成的任务数、以及队列中的任务数。

1
2
3
4
5
6
7
8
9
10
11
public static void printThreadPoolStatus(ThreadPoolExecutor threadPool) {
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, createThreadFactory("print-images/thread-pool-status", false));
scheduledExecutorService.scheduleAtFixedRate(() -> {
log.info("=========================");
log.info("ThreadPool Size: [{}]", threadPool.getPoolSize());
log.info("Active Threads: {}", threadPool.getActiveCount());
log.info("Number of Tasks : {}", threadPool.getCompletedTaskCount());
log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
log.info("=========================");
}, 0, 1, TimeUnit.SECONDS);
}

线程池和 ThreadLocal

线程池和 ThreadLocal共用,可能会导致线程从ThreadLocal获取到的是旧值/脏数据。这是因为线程池会复用线程对象,与线程对象绑定的类的静态属性 ThreadLocal 变量也会被重用,这就导致一个线程可能获取到其他线程的ThreadLocal 值。

不要以为代码中没有显示使用线程池就不存在线程池了,像常用的 Web 服务器 Tomcat 处理任务为了提高并发量,就使用到了线程池,并且使用的是基于原生 Java 线程池改进完善得到的自定义线程池。

当然了,你可以将 Tomcat 设置为单线程处理任务。不过,这并不合适,会严重影响其处理任务的速度。

1
server.tomcat.max-threads=1

解决上述问题比较建议的办法是使用阿里巴巴开源的 TransmittableThreadLocal(TTL)。TransmittableThreadLocal类继承并加强了 JDK 内置的InheritableThreadLocal类,在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题。

TransmittableThreadLocal 项目地址:https://github.com/alibaba/transmittable-thread-localopen in new window

重要任务应该自定义拒绝策略

使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。

动态线程池

美团技术团队在 《Java 线程池实现原理及其在美团业务中的实践》 这篇文章中介绍到对线程池参数实现可自定义配置的思路和方法。

美团技术团队的思路是主要对线程池的核心参数实现自定义可配置。这三个核心参数是:

  • corePoolSize - 核心线程数线程数定义了最小可以同时运行的线程数量。
  • maximumPoolSize - 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue - 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

JDK 原生线程池 ThreadPoolExecutor 提供了如下几个 public 的 setter 方法,如下图所示:

图 19 JDK 线程池参数设置接口

JDK 允许线程池使用方通过 ThreadPoolExecutor 的实例来动态设置线程池的核心策略。

重点是基于这几个 public 方法,我们只需要维护 ThreadPoolExecutor 的实例,并且在需要修改的时候拿到实例修改其参数即可。基于以上的思路,美团实现了线程池参数的动态化、线程池参数在管理平台可配置可修改,其效果图如下图所示:

图 21 可动态修改线程池参数

如果我们的项目也想要实现这种效果的话,可以借助现成的开源项目:

  • Hippo4jopen - 异步线程池框架,支持线程池动态变更&监控&报警,无需修改代码轻松引入。支持多种使用模式,轻松引入,致力于提高系统运行保障能力。
  • Dynamic TPopen - 轻量级动态线程池,内置监控告警功能,集成三方中间件线程池管理,基于主流配置中心(已支持 Nacos、Apollo,Zookeeper、Consul、Etcd,可通过 SPI 自定义实现)。

参考资料

Java 并发之线程

线程简介

  • 进程(Process) - 进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动。进程是操作系统进行资源分配的基本单位。进程可视为一个正在运行的程序
  • 线程(Thread) - 线程是操作系统进行调度的基本单位
  • 管程(Monitor) - 管程是指管理共享变量以及对共享变量的操作过程,让他们支持并发
    • Java 通过 synchronized 关键字及 wait()、notify()、notifyAll() 这三个方法来实现管程技术。
    • 管程和信号量是等价的,所谓等价指的是用管程能够实现信号量,也能用信号量实现管程
  • 协程(Coroutine) - 协程可以理解为一种轻量级的线程
    • 从操作系统的角度来看,线程是在内核态中调度的,而协程是在用户态调度的,所以相对于线程来说,协程切换的成本更低。
    • 协程虽然也有自己的栈,但是相比线程栈要小得多,典型的线程栈大小差不多有 1M,而协程栈的大小往往只有几 K 或者几十 K。所以,无论是从时间维度还是空间维度来看,协程都比线程轻量得多。
    • Go、Python、Lua、Kotlin 等语言都支持协程;Java OpenSDK 中的 Loom 项目目标就是支持协程。

进程和线程的差异:

  • 一个程序至少有一个进程,一个进程至少有一个线程。
  • 线程比进程划分更细,所以执行开销更小,并发性更高
  • 进程是一个实体,拥有独立的资源;而同一个进程中的多个线程共享进程的资源。

img

JVM 在单个进程中运行,JVM 中的线程共享属于该进程的堆。这就是为什么几个线程可以访问同一个对象。线程共享堆并拥有自己的堆栈空间。这是一个线程如何调用一个方法以及它的局部变量是如何保持线程安全的。但是堆不是线程安全的并且为了线程安全必须进行同步。

线程创建

一般来说,创建线程有很多种方式,例如:

  • 实现 Runnable 接口
  • 实现 Callable 接口
  • 继承 Thread
  • 通过线程池创建线程
  • 使用 CompletableFuture 创建线程

下面是几种创建线程的示例:

::: tabs#创建线程

@tab Thread

Thread

【示例】继承 Thread 类创建线程

  1. 定义 Thread 类的子类,并覆写该类的 run 方法。run 方法的方法体就代表了线程要完成的任务,因此把 run 方法称为执行体。
  2. 创建 Thread 子类的实例,即创建了线程对象。
  3. 调用线程对象的 start 方法来启动该线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ThreadDemo {

public static void main(String[] args) {
// 实例化对象
MyThread tA = new MyThread("Thread 线程-A");
MyThread tB = new MyThread("Thread 线程-B");
// 调用线程主体
tA.start();
tB.start();
}

static class MyThread extends Thread {

private int ticket = 5;

MyThread(String name) {
super(name);
}

@Override
public void run() {
while (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}
}

}

}

@tab Runnable

Runnable

实现 Runnable 接口优于继承 Thread,因为:

  • Java 不支持多重继承,所有的类都只允许继承一个父类,但可以实现多个接口。如果继承了 Thread 类就无法继承其它类,这不利于扩展。
  • 类可能只要求可执行就行,继承整个 Thread 类开销过大。

【示例】实现 Runnable 接口创建线程

  1. 定义 Runnable 接口的实现类,并覆写该接口的 run 方法。该 run 方法的方法体同样是该线程的线程执行体。
  2. 创建 Runnable 实现类的实例,并以此实例作为 Thread 的 target 来创建 Thread 对象,该 Thread 对象才是真正的线程对象。
  3. 调用线程对象的 start 方法来启动该线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class RunnableDemo {

public static void main(String[] args) {
// 实例化对象
Thread tA = new Thread(new MyThread(), "Runnable 线程-A");
Thread tB = new Thread(new MyThread(), "Runnable 线程-B");
// 调用线程主体
tA.start();
tB.start();
}

static class MyThread implements Runnable {

private int ticket = 5;

@Override
public void run() {
while (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}
}

}

}

@tab Callable

Callable、Future、FutureTask

继承 Thread 类和实现 Runnable 接口这两种创建线程的方式都没有返回值。所以,线程执行完后,无法得到执行结果。但如果期望得到执行结果该怎么做?

为了解决这个问题,Java 1.5 后,提供了 Callable 接口和 Future 接口,通过它们,可以在线程执行结束后,返回执行结果。

Callable

Callable 接口只声明了一个 call 方法:

1
2
3
4
5
6
7
8
9
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

那么怎么使用 Callable 呢?一般情况下是配合 ExecutorService 来使用的,在 ExecutorService 接口中声明了若干个 submit 方法的重载版本:

1
2
3
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

第一个 submit 方法里面的参数类型就是 Callable

Future

Future 就是对于具体的 Callable 任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过 get 方法获取执行结果,该方法会阻塞直到任务返回结果。

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask

FutureTask 类实现了 RunnableFuture 接口,RunnableFuture 继承了 Runnable 接口和 Future 接口。

所以,FutureTask 既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的返回值。

1
2
3
4
5
6
7
8
9
public class FutureTask<V> implements RunnableFuture<V> {
// ...
public FutureTask(Callable<V> callable) {}
public FutureTask(Runnable runnable, V result) {}
}

public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

事实上,FutureTaskFuture 接口的一个唯一实现类。

Callable + Future + FutureTask 示例

通过实现 Callable 接口创建线程的步骤:

  1. 创建 Callable 接口的实现类,并实现 call 方法。该 call 方法将作为线程执行体,并且有返回值。
  2. 创建 Callable 实现类的实例,使用 FutureTask 类来包装 Callable 对象,该 FutureTask 对象封装了该 Callable 对象的 call 方法的返回值。
  3. 使用 FutureTask 对象作为 Thread 对象的 target 创建并启动新线程。
  4. 调用 FutureTask 对象的 get 方法来获得线程执行结束后的返回值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class CallableDemo {

public static void main(String[] args) {
Callable<Long> callable = new MyThread();
FutureTask<Long> future = new FutureTask<>(callable);
new Thread(future, "Callable 线程").start();
try {
System.out.println("任务耗时:" + (future.get() / 1000000) + "毫秒");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

static class MyThread implements Callable<Long> {

private int ticket = 10000;

@Override
public Long call() {
long begin = System.nanoTime();
while (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}

long end = System.nanoTime();
return (end - begin);
}

}

}

:::

虽然,看似有多种多样的创建线程方式。但是,从本质上来说,Java 就只有一种方式可以创建线程,那就是通过 new Thread().start() 创建。不管是哪种方式,最终还是依赖于 new Thread().start()

👉 扩展阅读:大家都说 Java 有三种创建线程的方式!并发编程中的惊天骗局!

线程终止

如何正确停止线程

通常情况下,我们不会手动停止一个线程,而是允许线程运行到结束,然后让它自然停止。但是依然会有许多特殊的情况需要我们提前停止线程,比如:用户突然关闭程序,或程序运行出错重启等。

对于 Java 而言,最正确的停止线程的方式是:通过 Thread.interruptThread.isInterrupted 配合来控制线程终止。但 Thread.interrupt 仅仅起到通知被停止线程的作用。而对于被停止的线程而言,它拥有完全的自主权,它既可以选择立即停止,也可以选择一段时间后停止,也可以选择压根不停止。

事实上,Java 希望程序间能够相互通知、相互协作地管理线程,因为如果不了解对方正在做的工作,贸然强制停止线程就可能会造成一些安全的问题,为了避免造成问题就需要给对方一定的时间来整理收尾工作。比如:线程正在写入一个文件,这时收到终止信号,它就需要根据自身业务判断,是选择立即停止,还是将整个文件写入成功后停止,而如果选择立即停止就可能造成数据不完整,不管是中断命令发起者,还是接收者都不希望数据出现问题。

一旦调用某个线程的 Thread.interrupt 之后,这个线程的中断标记位就会被设置成 true。每个线程都有这样的标记位,当线程执行时,应该定期检查这个标记位,如果标记位被设置成 true,就说明有程序想终止该线程。回到源码,可以看到在 while 循环体判断语句中,首先通过 Thread.currentThread().isInterrupt() 判断线程是否被中断,随后检查是否还有工作要做。&& 逻辑表示只有当两个判断条件同时满足的情况下,才会去执行下面的工作。

需要留意一个特殊场景:**Thread.sleep 后,线程依然可以感知 Thread.interrupt**。

【示例】正确停止线程的方式——Thread.interrupt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ThreadStopDemo {

public static void main(String[] args) throws Exception {
Thread thread = new Thread(new MyTask(), "MyTask");
thread.start();
TimeUnit.MILLISECONDS.sleep(10);
thread.interrupt();
}

private static class MyTask implements Runnable {

private long count = 0L;

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 线程启动");
// 通过 Thread.interrupted 和 interrupt 配合来控制线程终止
while (!Thread.currentThread().isInterrupted() && count < 10000) {
System.out.println("count = " + count++);
}
System.out.println(Thread.currentThread().getName() + " 线程终止");
}

}

}
// 输出(count 未到 10000,线程就主动结束):
// MyTask 线程启动
// count = 0
// count = 1
// ...
// count = 840
// count = 841
// count = 842
// MyTask 线程终止

可以使用 Thread.stopThread.suspendThread.resume 停止线程吗?

Thread.stopThread.suspendThread.resume 方法已经被 Java 标记为 @Deprecated。为什么废弃呢?

  • Thread.stop 会直接把线程停止,这样就没有给线程足够的时间来处理想要在停止前保存数据的逻辑,任务戛然而止,会导致出现数据完整性等问题
  • 而对于Thread.suspendThread.resume 而言,它们的问题在于:如果线程调用 Thread.suspend,它并不会释放锁,就开始进入休眠,但此时有可能仍持有锁,这样就容易导致死锁问题。因为这把锁在线程被 Thread.resume 之前,是不会被释放的。假设线程 A 调用了 Thread.suspend 方法让线程 B 挂起,线程 B 进入休眠,而线程 B 又刚好持有一把锁,此时假设线程 A 想访问线程 B 持有的锁,但由于线程 B 并没有释放锁就进入休眠了,所以对于线程 A 而言,此时拿不到锁,也会陷入阻塞,那么线程 A 和线程 B 就都无法继续向下执行。

【示例】Thread.stop 终止线程,导致线程任务戛然而止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class ThreadStopErrorDemo {

public static void main(String[] args) {
MyTask thread = new MyTask();
thread.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 终止线程
thread.stop();
// 确保线程终止后,才执行下面的代码
while (thread.isAlive()) { }
// 输出两个计数器的最终状态
thread.print();
}

/**
* 持有两个计数器,run 方法中每次执行都会使计数器自增
*/
private static class MyTask extends Thread {

private int i = 0;

private int j = 0;

@Override
public void run() {
synchronized (this) {
++i;
try {
// 模拟耗时操作
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
++j;
}
}

public void print() {
System.out.println("i=" + i + " j=" + j);
}

}

}

使用 volatile 标记方式停止线程正确吗?

使用 volatile 标记方式停止线程并不总是正确的。虽然 volatile 变量可以确保可见性,即当一个线程修改了 volatile 变量的值,其他线程能够立即看到最新的值,但它并不能保证原子性,也就是说并不能保证多个线程对 volatile 变量的操作是互斥的。

当我们使用 volatile 变量来控制线程的停止,通常是通过设置一个 volatile 标志位来告诉线程停止执行。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MyTask extends Thread {
private volatile boolean canceled = false;

public void run() {
while (!canceled) {
// 执行任务
}
}

public void stopTask() {
canceled = true;
}
}

在上述例子中,canceled 是一个 volatile 变量,用来控制线程的停止。虽然这种方式在某些情况下可以工作,但它并不是一个可靠的停止线程的方式,因为在多线程环境中,其他线程修改 canceled 的值时,可能会出现竞态条件,导致线程无法正确停止

线程基本方法

线程(Thread)基本方法清单:

方法 描述
run 线程的执行实体。
start 线程的启动方法。
currentThread 返回对当前正在执行的线程对象的引用。
setName 设置线程名称。
getName 获取线程名称。
setPriority 设置线程优先级。Java 中的线程优先级的范围是 [1,10],一般来说,高优先级的线程在运行时会具有优先权。可以通过 thread.setPriority(Thread.MAX_PRIORITY) 的方式设置,默认优先级为 5。
getPriority 获取线程优先级。
setDaemon 设置线程为守护线程。
isDaemon 判断线程是否为守护线程。
isAlive 判断线程是否启动。
interrupt 中断另一个线程的运行状态。
interrupted 测试当前线程是否已被中断。通过此方法可以清除线程的中断状态。换句话说,如果要连续调用此方法两次,则第二次调用将返回 false(除非当前线程在第一次调用清除其中断状态之后且在第二次调用检查其状态之前再次中断)。
join 可以使一个线程强制运行,线程强制运行期间,其他线程无法运行,必须等待此线程完成之后才可以继续执行。
Thread.sleep 静态方法。将当前正在执行的线程休眠。
Thread.yield 静态方法。将当前正在执行的线程暂停,让其他线程执行。

线程休眠

使用 Thread.sleep 方法可以使得当前正在执行的线程进入休眠状态。

使用 Thread.sleep 需要向其传入一个整数值,这个值表示线程将要休眠的毫秒数。

Thread.sleep 方法可能会抛出 InterruptedException,因为异常不能跨线程传播回 main 中,因此必须在本地进行处理。线程中抛出的其它异常也同样需要在本地进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ThreadSleepDemo {

public static void main(String[] args) {
new Thread(new MyThread("线程 A", 500)).start();
new Thread(new MyThread("线程 B", 1000)).start();
new Thread(new MyThread("线程 C", 1500)).start();
}

static class MyThread implements Runnable {

/** 线程名称 */
private String name;

/** 休眠时间 */
private int time;

private MyThread(String name, int time) {
this.name = name;
this.time = time;
}

@Override
public void run() {
try {
// 休眠指定的时间
Thread.sleep(this.time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.name + "休眠" + this.time + "毫秒。");
}

}

}

线程礼让

Thread.yield 方法的调用声明了当前线程已经完成了生命周期中最重要的部分,可以切换给其它线程来执行 。该方法只是对线程调度器的一个建议,而且也只是建议具有相同优先级的其它线程可以运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ThreadYieldDemo {

public static void main(String[] args) {
MyThread t = new MyThread();
new Thread(t, "线程 A").start();
new Thread(t, "线程 B").start();
}

static class MyThread implements Runnable {

@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "运行,i = " + i);
if (i == 2) {
System.out.print("线程礼让:");
Thread.yield();
}
}
}
}
}

守护线程

什么是守护线程?

  • 守护线程(Daemon Thread)是在后台执行并且不会阻止 JVM 终止的线程当所有非守护线程结束时,程序也就终止,同时会杀死所有守护线程
  • 与守护线程(Daemon Thread)相反的,叫用户线程(User Thread),也就是非守护线程。

为什么需要守护线程?

  • 守护线程的优先级比较低,用于为系统中的其它对象和线程提供服务。典型的应用就是垃圾回收器。

如何使用守护线程?

  • 可以使用 isDaemon 方法判断线程是否为守护线程。
  • 可以使用 setDaemon 方法设置线程为守护线程。
    • 正在运行的用户线程无法设置为守护线程,所以 setDaemon 必须在 thread.start 方法之前设置,否则会抛出 llegalThreadStateException 异常;
    • 一个守护线程创建的子线程依然是守护线程。
    • 不要认为所有的应用都可以分配给守护线程来进行服务,比如读写操作或者计算逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ThreadDaemonDemo {

public static void main(String[] args) {
Thread t = new Thread(new MyThread(), "线程");
t.setDaemon(true); // 此线程在后台运行
System.out.println("线程 t 是否是守护进程:" + t.isDaemon());
t.start(); // 启动线程
}

static class MyThread implements Runnable {

@Override
public void run() {
while (true) {
System.out.println(Thread.currentThread().getName() + "在运行。");
}
}
}
}

参考阅读:Java 中守护线程的总结

线程通信

当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其它部分之前完成,那么就需要对线程进行协调。

wait/notify/notifyAll

  • wait - wait 会自动释放当前线程占有的对象锁,并请求操作系统挂起当前线程,让线程从 RUNNING 状态转入 WAITING 状态,等待 notify / notifyAll 来唤醒。如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify 或者 notifyAll 来唤醒挂起的线程,造成死锁。
  • notify - 唤醒一个正在 WAITING 状态的线程,并让它拿到对象锁,具体唤醒哪一个线程由 JVM 控制 。
  • notifyAll - 唤醒所有正在 WAITING 状态的线程,接下来它们需要竞争对象锁。

注意:

  • waitnotifynotifyAll 都是 Object 类中的方法,而非 Thread
  • **waitnotifynotifyAll 只能用在 synchronized 方法或者 synchronized 代码块中使用,否则会在运行时抛出 IllegalMonitorStateException**。

生产者、消费者模式是 waitnotifynotifyAll 的一个经典使用案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class ThreadWaitNotifyDemo02 {

private static final int QUEUE_SIZE = 10;
private static final PriorityQueue<Integer> queue = new PriorityQueue<>(QUEUE_SIZE);

public static void main(String[] args) {
new Producer("生产者 A").start();
new Producer("生产者 B").start();
new Consumer("消费者 A").start();
new Consumer("消费者 B").start();
}

static class Consumer extends Thread {

Consumer(String name) {
super(name);
}

@Override
public void run() {
while (true) {
synchronized (queue) {
while (queue.size() == 0) {
try {
System.out.println("队列空,等待数据");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
queue.notifyAll();
}
}
queue.poll(); // 每次移走队首元素
queue.notifyAll();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 从队列取走一个元素,队列当前有:" + queue.size() + "个元素");
}
}
}
}

static class Producer extends Thread {

Producer(String name) {
super(name);
}

@Override
public void run() {
while (true) {
synchronized (queue) {
while (queue.size() == QUEUE_SIZE) {
try {
System.out.println("队列满,等待有空余空间");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
queue.notifyAll();
}
}
queue.offer(1); // 每次插入一个元素
queue.notifyAll();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 向队列取中插入一个元素,队列当前有:" + queue.size() + "个元素");
}
}
}
}
}

join

在线程操作中,可以使用 join 方法让一个线程强制运行,线程强制运行期间,其他线程无法运行,必须等待此线程完成之后才可以继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ThreadJoinDemo {

public static void main(String[] args) {
MyThread mt = new MyThread(); // 实例化 Runnable 子类对象
Thread t = new Thread(mt, "mythread"); // 实例化 Thread 对象
t.start(); // 启动线程
for (int i = 0; i < 50; i++) {
if (i > 10) {
try {
t.join(); // 线程强制运行
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Main 线程运行 --> " + i);
}
}

static class MyThread implements Runnable {

@Override
public void run() {
for (int i = 0; i < 50; i++) {
System.out.println(Thread.currentThread().getName() + " 运行,i = " + i); // 取得当前线程的名字
}
}
}
}

管道

管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不同之处在于,它主要用于线程之间的数据传输,而传输的媒介为内存。
管道输入/输出流主要包括了如下 4 种具体实现:PipedOutputStreamPipedInputStreamPipedReaderPipedWriter,前两种面向字节,而后两种面向字符。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Piped {

public static void main(String[] args) throws Exception {
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
// 将输出流和输入流进行连接,否则在使用时会抛出 IOException
out.connect(in);
Thread printThread = new Thread(new Print(in), "PrintThread");
printThread.start();
int receive = 0;
try {
while ((receive = System.in.read()) != -1) {
out.write(receive);
}
} finally {
out.close();
}
}

static class Print implements Runnable {

private PipedReader in;

Print(PipedReader in) {
this.in = in;
}

public void run() {
int receive = 0;
try {
while ((receive = in.read()) != -1) {
System.out.print((char) receive);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

线程生命周期

java.lang.Thread.State 中定义了 6 种不同的线程状态,在给定的一个时刻,线程只能处于其中的一个状态。

以下是各状态的说明,以及状态间的联系:

  • 开始(NEW) - 尚未调用 start 方法的线程处于此状态。此状态意味着:创建的线程尚未启动
  • 可运行(RUNNABLE) - 已经调用了 start 方法的线程处于此状态。此状态意味着,线程已经准备好了,一旦被线程调度器分配了 CPU 时间片,就可以运行线程。
    • 在操作系统层面,线程有 READY 和 RUNNING 状态;而在 JVM 层面,只能看到 RUNNABLE 状态,所以 Java 系统一般将这两个状态统称为 RUNNABLE(运行中) 状态 。
  • 阻塞(BLOCKED) - 此状态意味着:线程处于被阻塞状态。表示线程在等待 synchronized 的隐式锁(Monitor lock)。synchronized 修饰的方法、代码块同一时刻只允许一个线程执行,其他线程只能等待,即处于阻塞状态。当占用 synchronized 隐式锁的线程释放锁,并且等待的线程获得 synchronized 隐式锁时,就又会从 BLOCKED 转换到 RUNNABLE 状态。
  • 等待(WAITING) - 此状态意味着:线程无限期等待,直到被其他线程显式地唤醒。 阻塞和等待的区别在于,阻塞是被动的,它是在等待获取 synchronized 的隐式锁。而等待是主动的,通过调用 Object.wait 等方法进入。
    • 进入:Object.wait();退出:Object.notify / Object.notifyAll
    • 进入:Thread.join();退出:被调用的线程执行完毕
    • 进入:LockSupport.park();退出:LockSupport.unpark
  • 定时等待(TIMED_WAITING) - 等待指定时间的状态。一个线程处于定时等待状态,是由于执行了以下方法中的任意方法:
    • 进入:Thread.sleep(long);退出:时间结束
    • 进入:Object.wait(long);退出:时间结束 / Object.notify / Object.notifyAll
    • 进入:Thread.join(long);退出:时间结束 / 被调用的线程执行完毕
    • 进入:LockSupport.parkNanos(long);退出:LockSupport.unpark
    • 进入:LockSupport.parkUntil(long);退出:LockSupport.unpark
  • 终止 (TERMINATED) - 线程 run() 方法执行结束,或者因异常退出了 run() 方法,则该线程结束生命周期。死亡的线程不可再次复生。

👉 扩展阅读:

线程常见问题

线程启动

典型问题

(1)Thread.start()Thread.run() 有什么区别?

(2)可以直接调用 Thread.run() 方法么?

(3)一个线程两次调用 Thread.start() 方法会怎样

知识点

(1)Thread.start()Thread.run() 的区别:

  • run() 方法是线程的执行体。
  • start() 方法负责启动线程,然后 JVM 会让这个线程去执行 run() 方法。

(2)可以直接调用 Thread.run() 方法,但是它的行为和普通方法一样,不会启动新线程去执行。调用 start() 方法方可启动线程并使线程进入就绪状态,直接执行 run() 方法的话不会以多线程的方式执行。

(3)Java 的线程是不允许启动两次的,第二次调用必然会抛出 IllegalThreadStateException

线程等待

典型问题

(1)Thread.sleep()Thread.yield()Thread.join()Object.wait() 方法有什么区别?

(2)为什么 Thread.sleep()Thread.yield() 设计为静态方法?

知识点

(1)Thread.sleep()Thread.yield()Thread.join() 方法的区别:

  • Thread.sleep()
    • Thread.sleep() 方法需要指定等待的时间,它可以让当前正在执行的线程在指定的时间内暂停执行,进入 TIMED_WAITING 状态。
    • 该方法既可以让其他同优先级或者高优先级的线程得到执行的机会,也可以让低优先级的线程得到执行机会。
    • 但是,Thread.sleep() 方法不会释放“锁标志”,也就是说如果有 synchronized 同步块,其他线程仍然不能访问共享数据。
  • Thread.yield()
    • Thread.yield() 方法可以让当前正在执行的线程暂停,但它不会阻塞该线程,它只是将该线程从 RUNNING 状态转入 RUNNABLE 状态。
    • 当某个线程调用了 Thread.yield() 方法暂停之后,只有优先级大于等于当前线程的处于就绪状态的线程才会获得执行的机会。
  • Thread.join()
    • Thread.join() 方法会使当前线程转入 WAITINGTIMED_WAITING 状态,等待调用 Thread.join() 方法的线程结束后才能继续执行。
  • Object.wait()
    • Object.wait() 用于使当前线程等待,直到其他线程调用相同对象的 Object.notify()Object.notifyAll() 方法唤醒它。
    • 调用 Object.wait() 时,线程会释放对象锁,并进入等待状态。

(2)为什么 Thread.sleep()Thread.yield() 设计为静态方法?

Thread.sleep()Thread.yield() 针对的是 RUNNING 状态的线程,也就是说在非 RUNNING 状态的线程上执行这两个方法没有意义。这就是为什么这两个方法被设计为静态的。它们只针对正在 RUNNING 状态的线程工作,避免程序员错误的认为可以在其他非 RUNNING 状态线程上调用。

👉 扩展阅读:Java 线程中 yield 与 join 方法的区别
👉 扩展阅读:sleep(),wait(),yield() 和 join() 方法的区别

线程通信

线程间通信是线程间共享资源的一种方式。Object.wait(), Object.notify()Object.notifyAll() 是用于线程之间协作和通信的方法,它们通常与synchronized 关键字一起使用来实现线程的同步。

典型问题

(1)为什么线程通信的方法 Object.wait()Object.notify()Object.notifyAll() 被定义在 Object 类里?

(2)为什么 Object.wait()Object.notify()Object.notifyAll() 必须在 synchronized 方法/块中被调用?

(3) Object.wait()Thread.sleep 有什么区别?

知识点

(1)为什么线程通信的方法 Object.wait()Object.notify()Object.notifyAll() 被定义在 Object 类里?

Java 的每个对象中都有一个称之为 monitor 监视器的锁,由于每个对象都可以上锁,这就要求在对象头中有一个用来保存锁信息的位置。这个锁是对象级别的,而非线程级别的,wait/notify/notifyAll 也都是锁级别的操作,它们的锁属于对象,所以把它们定义在 Object 类中是最合适,因为 Object 类是所有对象的父类。

如果把 wait/notify/notifyAll 方法定义在 Thread 类中,会带来很大的局限性,比如一个线程可能持有多把锁,以便实现相互配合的复杂逻辑,假设此时 wait 方法定义在 Thread 类中,如何实现让一个线程持有多把锁呢?又如何明确线程等待的是哪把锁呢?既然我们是让当前线程去等待某个对象的锁,自然应该通过操作对象来实现,而不是操作线程。

  • Object.wait()
    • Object.wait() 方法用于使当前线程进入等待状态,直到其他线程调用相同对象的 notify()notifyAll() 方法唤醒它。
    • 在调用 wait() 方法时,线程会释放对象的锁,并进入等待状态。通常在使用 wait() 方法时需要放在一个循环中,以避免虚假唤醒(spurious wakeups)。
  • Object.notify()
    • Object.notify() 方法用于唤醒正在等待该对象的锁的一个线程。
    • 被唤醒的线程将会尝试重新获取对象的锁,一旦获取到锁,它将继续执行。
  • Object.notifyAll()
    • Object.notifyAll() 方法用于唤醒正在等待该对象的锁的所有线程。
    • 所有被唤醒的线程将会竞争对象的锁,一旦获取到锁,它们将继续执行。

(2)为什么 Object.wait()Object.notify()Object.notifyAll() 必须在 synchronized 方法/块中被调用?

当一个线程需要调用对象的 wait() 方法的时候,这个线程必须拥有该对象的锁,接着它就会释放这个对象锁并进入等待状态直到其他线程调用这个对象上的 notify() 方法。同样的,当一个线程需要调用对象的 notify() 方法时,它会释放这个对象的锁,以便其他在等待的线程就可以得到这个对象锁。

由于所有的这些方法都需要线程持有对象的锁,这样就只能通过 synchronized 来实现,所以他们只能在 synchronized 方法/块中被调用。

(3) Object.wait()Thread.sleep 有什么区别?

相同点:

  1. 它们都可以让线程阻塞。
  2. 它们都可以响应 interrupt 中断:在等待的过程中如果收到中断信号,都可以进行响应,并抛出 InterruptedException 异常。

不同点:

  1. wait 方法必须在 synchronized 保护的代码中使用,而 sleep 方法并没有这个要求。
  2. 在同步代码中执行 sleep 方法时,并不会释放 monitor 锁,但执行 wait 方法时会主动释放 monitor 锁。
  3. sleep 方法中会要求必须定义一个时间,时间到期后会主动恢复,而对于没有参数的 wait 方法而言,意味着永久等待,直到被中断或被唤醒才能恢复,它并不会主动恢复。
  4. wait/notify 是 Object 类的方法,而 sleep 是 Thread 类的方法。

👉 扩展阅读:Java 并发编程:线程间协作的两种方式:wait、notify、notifyAll 和 Condition

线程优先级

典型问题

(1)Java 的线程优先级如何控制?

(2)高优先级的 Java 线程一定先执行吗?

知识点

(1)Java 中的线程优先级的范围是 [1,10],一般来说,高优先级的线程在运行时会具有优先权。可以通过 thread.setPriority(Thread.MAX_PRIORITY) 的方式设置,默认优先级为 5

(2)即使设置了线程的优先级,也无法保证高优先级的线程一定先执行

这是因为 Java 线程优先级依赖于操作系统的支持,然而,不同的操作系统支持的线程优先级并不相同,不能很好的和 Java 中线程优先级一一对应。因此,Java 线程优先级控制并不可靠。

守护线程

典型问题

(1)什么是守护线程?

(2)如何创建守护线程?

知识点

(1)什么是守护线程?

守护线程(Daemon Thread)是在后台执行并且不会阻止 JVM 终止的线程。与守护线程(Daemon Thread)相反的,叫用户线程(User Thread),也就是非守护线程。

守护线程的优先级比较低,一般用于为系统中的其它对象和线程提供服务。典型的应用就是垃圾回收器。

(2)创建守护线程的方式:

  • 使用 thread.setDaemon(true) 可以设置 thread 线程为守护线程。
  • 正在运行的用户线程无法设置为守护线程,所以 thread.setDaemon(true) 必须在 thread.start() 之前设置,否则会抛出 llegalThreadStateException 异常;
  • 一个守护线程创建的子线程依然是守护线程。
  • 不要认为所有的应用都可以分配给守护线程来进行服务,比如读写操作或者计算逻辑。

👉 扩展阅读:Java 中守护线程的总结

线程数

典型问题

(1)线程数是不是越多越好?

(2)创建多少线程才合适?

知识点

使用多线程,初衷是为了提升程序性能。度量性能的核心指标是延迟吞吐量。所谓提升性能,从度量的角度,主要是降低延迟,提高吞吐量。在并发编程领域,提升性能本质上就是提升硬件的利用率,再具体点来说,就是提升 I/O 的利用率和 CPU 的利用率。

多线程并非越多越好,过多的线程可能会导致过多的上下文切换,反而降低系统性能。 通常需要根据服务器硬件资源和预期负载来合理设定线程数大小。

程序一般都是 CPU 计算和 I/O 操作交叉执行的,由于 I/O 设备的速度相对于 CPU 来说都很慢,所以大部分情况下,I/O 操作执行的时间相对于 CPU 计算来说都非常长,这种场景我们一般都称为 I/O 密集型计算;和 I/O 密集型计算相对的就是 CPU 密集型计算了,CPU 密集型计算大部分场景下都是纯 CPU 计算。I/O 密集型程序和 CPU 密集型程序,计算最佳线程数的方法是不同的。

对于 CPU 密集型的计算场景,理论上“线程的数量=CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数+1”,这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率。

对于 I/O 密集型计算场景,最佳的线程数是与程序中 CPU 计算和 I/O 操作的耗时比相关的,我们可以总结出这样一个公式:

最佳线程数=1 +(I/O 耗时 / CPU 耗时)

参考资料

Maven 插件之代码检查

maven-checkstyle-plugin

maven-checkstyle-plugin,用于检测代码中不符合规范的地方。

定义 checkstyle.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
<!DOCTYPE module PUBLIC
"-//Checkstyle//DTD Checkstyle Configuration 1.3//EN"
"https://checkstyle.org/dtds/configuration_1_3.dtd">

<!-- Generated by RHY @will_awoke -->

<module name="Checker">

<property name="charset" value="UTF-8"/>
<property name="severity" value="warning"/>

<!-- Checks for Size Violations. -->
<!-- 检查文件的长度(行) default max=2000 -->
<module name="FileLength">
<property name="max" value="2500"/>
</module>

<!-- Checks that property files contain the same keys. -->
<!-- 检查**.properties配置文件 是否有相同的key
<module name="Translation">
</module>
-->

<module name="TreeWalker">

<!-- Checks for imports -->
<!-- 必须导入类的完整路径,即不能使用*导入所需的类 -->
<module name="AvoidStarImport"/>

<!-- 检查是否从非法的包中导入了类 illegalPkgs: 定义非法的包名称-->
<module name="IllegalImport"/> <!-- defaults to sun.* packages -->

<!-- 检查是否导入了不必显示导入的类-->
<module name="RedundantImport"/>

<!-- 检查是否导入的包没有使用-->
<module name="UnusedImports"/>

<!-- Checks for whitespace
<module name="EmptyForIteratorPad"/>
<module name="MethodParamPad"/>
<module name="NoWhitespaceAfter"/>
<module name="NoWhitespaceBefore"/>
<module name="OperatorWrap"/>
<module name="ParenPad"/>
<module name="TypecastParenPad"/>
<module name="WhitespaceAfter"/>
<module name="WhitespaceAround"/>
-->

<!-- 检查类和接口的javadoc 默认不检查author 和version tags
authorFormat: 检查author标签的格式
versionFormat: 检查version标签的格式
scope: 可以检查的类的范围,例如:public只能检查public修饰的类,private可以检查所有的类
excludeScope: 不能检查的类的范围,例如:public,public的类将不被检查,但访问权限小于public的类仍然会检查,其他的权限以此类推
tokens: 该属性适用的类型,例如:CLASS_DEF,INTERFACE_DEF -->
<module name="JavadocType">
<property name="authorFormat" value="\S"/>
<property name="scope" value="protected"/>
<property name="tokens" value="CLASS_DEF,INTERFACE_DEF"/>
</module>

<!-- 检查方法的javadoc的注释
scope: 可以检查的方法的范围,例如:public只能检查public修饰的方法,private可以检查所有的方法
allowMissingParamTags: 是否忽略对参数注释的检查
allowMissingThrowsTags: 是否忽略对throws注释的检查
allowMissingReturntags: 是否忽略对return注释的检查 -->
<module name="JavadocMethod">
<property name="scope" value="private"/>
<property name="allowMissingParamTags" value="false"/>
<property name="allowMissingThrowsTags" value="false"/>
<property name="allowMissingReturnTag" value="false"/>
<property name="tokens" value="METHOD_DEF"/>
<property name="allowUndeclaredRTE" value="true"/>
<property name="allowThrowsTagsForSubclasses" value="true"/>
<!--允许get set 方法没有注释-->
<property name="allowMissingPropertyJavadoc" value="true"/>
</module>

<!-- 检查类变量的注释
scope: 检查变量的范围,例如:public只能检查public修饰的变量,private可以检查所有的变量 -->
<module name="JavadocVariable">
<property name="scope" value="private"/>
</module>

<!--option: 定义左大括号'{'显示位置,eol在同一行显示,nl在下一行显示
maxLineLength: 大括号'{'所在行行最多容纳的字符数
tokens: 该属性适用的类型,例:CLASS_DEF,INTERFACE_DEF,METHOD_DEF,CTOR_DEF -->
<module name="LeftCurly">
<property name="option" value="nl"/>
</module>

<!-- NeedBraces 检查是否应该使用括号的地方没有加括号
tokens: 定义检查的类型 -->
<module name="NeedBraces"/>

<!-- Checks the placement of right curly braces ('}') for else, try, and catch tokens. The policy to verify is specified using property option.
option: 右大括号是否单独一行显示
tokens: 定义检查的类型 -->
<module name="RightCurly">
<property name="option" value="alone"/>
</module>

<!-- 检查在重写了equals方法后是否重写了hashCode方法 -->
<module name="EqualsHashCode"/>

<!-- Checks for illegal instantiations where a factory method is preferred.
Rationale: Depending on the project, for some classes it might be preferable to create instances through factory methods rather than calling the constructor.
A simple example is the java.lang.Boolean class. In order to save memory and CPU cycles, it is preferable to use the predefined constants TRUE and FALSE. Constructor invocations should be replaced by calls to Boolean.valueOf().
Some extremely performance sensitive projects may require the use of factory methods for other classes as well, to enforce the usage of number caches or object pools. -->
<module name="IllegalInstantiation">
<property name="classes" value="java.lang.Boolean"/>
</module>

<!-- Checks for Naming Conventions. 命名规范 -->
<!-- local, final variables, including catch parameters -->
<module name="LocalFinalVariableName"/>

<!-- local, non-final variables, including catch parameters-->
<module name="LocalVariableName"/>

<!-- static, non-final fields -->
<module name="StaticVariableName">
<property name="format" value="(^[A-Z0-9_]{0,19}$)"/>
</module>

<!-- packages -->
<module name="PackageName">
<property name="format" value="^[a-z]+(\.[a-z][a-z0-9]*)*$"/>
</module>

<!-- classes and interfaces -->
<module name="TypeName">
<property name="format" value="(^[A-Z][a-zA-Z0-9]{0,19}$)"/>
</module>

<!-- methods -->
<module name="MethodName">
<property name="format" value="(^[a-z][a-zA-Z0-9]{0,19}$)"/>
</module>

<!-- non-static fields -->
<module name="MemberName">
<property name="format" value="(^[a-z][a-z0-9][a-zA-Z0-9]{0,19}$)"/>
</module>

<!-- parameters -->
<module name="ParameterName">
<property name="format" value="(^[a-z][a-zA-Z0-9_]{0,19}$)"/>
</module>

<!-- constants (static, final fields) -->
<module name="ConstantName">
<property name="format" value="(^[A-Z0-9_]{0,19}$)"/>
</module>

<!-- 代码缩进 -->
<module name="Indentation">
</module>

<!-- Checks for redundant exceptions declared in throws clause such as duplicates, unchecked exceptions or subclasses of another declared exception.
检查是否抛出了多余的异常
<module name="RedundantThrows">
<property name="logLoadErrors" value="true"/>
<property name="suppressLoadErrors" value="true"/>
</module>
-->

<!-- Checks for overly complicated boolean expressions. Currently finds code like if (b == true), b || true, !false, etc.
检查boolean值是否冗余的地方
Rationale: Complex boolean logic makes code hard to understand and maintain. -->
<module name="SimplifyBooleanExpression"/>

<!-- Checks for overly complicated boolean return statements. For example the following code
检查是否存在过度复杂的boolean返回值
if (valid())
return false;
else
return true;
could be written as
return !valid();
The Idea for this Check has been shamelessly stolen from the equivalent PMD rule. -->
<module name="SimplifyBooleanReturn"/>

<!-- Checks that a class which has only private constructors is declared as final.只有私有构造器的类必须声明为final-->
<module name="FinalClass"/>

<!-- Make sure that utility classes (classes that contain only static methods or fields in their API) do not have a public constructor.
确保Utils类(只提供static方法和属性的类)没有public构造器。
Rationale: Instantiating utility classes does not make sense. Hence the constructors should either be private or (if you want to allow subclassing) protected. A common mistake is forgetting to hide the default constructor.
If you make the constructor protected you may want to consider the following constructor implementation technique to disallow instantiating subclasses:
public class StringUtils // not final to allow subclassing
{
protected StringUtils() {
throw new UnsupportedOperationException(); // prevents calls from subclass
}
public static int count(char c, String s) {
// ...
}
}
<module name="HideUtilityClassConstructor"/>
-->

<!-- Checks visibility of class members. Only static final members may be public; other class members must be private unless property protectedAllowed or packageAllowed is set.
检查class成员属性可见性。只有static final 修饰的成员是可以public的。其他的成员属性必需是private的,除非属性protectedAllowed或者packageAllowed设置了true.
Public members are not flagged if the name matches the public member regular expression (contains "^serialVersionUID$" by default). Note: Checkstyle 2 used to include "^f[A-Z][a-zA-Z0-9]*$" in the default pattern to allow CMP for EJB 1.1 with the default settings. With EJB 2.0 it is not longer necessary to have public access for persistent fields, hence the default has been changed.
Rationale: Enforce encapsulation. 强制封装 -->
<module name="VisibilityModifier"/>

<!-- 每一行只能定义一个变量 -->
<module name="MultipleVariableDeclarations">
</module>

<!-- Checks the style of array type definitions. Some like Java-style: public static void main(String[] args) and some like C-style: public static void main(String args[])
检查再定义数组时,采用java风格还是c风格,例如:int[] num是java风格,int num[]是c风格。默认是java风格-->
<module name="ArrayTypeStyle">
</module>

<!-- Checks that there are no "magic numbers", where a magic number is a numeric literal that is not defined as a constant. By default, -1, 0, 1, and 2 are not considered to be magic numbers.
<module name="MagicNumber">
</module>
-->

<!-- A check for TODO: comments. Actually it is a generic regular expression matcher on Java comments. To check for other patterns in Java comments, set property format.
检查是否存在TODO(待处理) TODO是javaIDE自动生成的。一般代码写完后要去掉。
-->
<module name="TodoComment"/>

<!-- Checks that long constants are defined with an upper ell. That is ' L' and not 'l'. This is in accordance to the Java Language Specification, Section 3.10.1.
检查是否在long类型是否定义了大写的L.字母小写l和数字1(一)很相似。
looks a lot like 1. -->
<module name="UpperEll"/>

<!-- Checks that switch statement has "default" clause. 检查switch语句是否有‘default’从句
Rationale: It's usually a good idea to introduce a default case in every switch statement.
Even if the developer is sure that all currently possible cases are covered, this should be expressed in the default branch,
e.g. by using an assertion. This way the code is protected aginst later changes, e.g. introduction of new types in an enumeration type. -->
<module name="MissingSwitchDefault"/>

<!--检查switch中case后是否加入了跳出语句,例如:return、break、throw、continue -->
<module name="FallThrough"/>

<!-- Checks the number of parameters of a method or constructor. max default 7个. -->
<module name="ParameterNumber">
<property name="max" value="5"/>
</module>

<!-- 每行字符数 -->
<module name="LineLength">
<property name="max" value="200"/>
</module>

<!-- Checks for long methods and constructors. max default 150行. max=300 设置长度300 -->
<module name="MethodLength">
<property name="max" value="300"/>
</module>

<!-- ModifierOrder 检查修饰符的顺序,默认是 public,protected,private,abstract,static,final,transient,volatile,synchronized,native -->
<module name="ModifierOrder">
</module>

<!-- 检查是否有多余的修饰符,例如:接口中的方法不必使用public、abstract修饰 -->
<module name="RedundantModifier">
</module>

<!--- 字符串比较必须使用 equals() -->
<module name="StringLiteralEquality">
</module>

<!-- if-else嵌套语句个数 最多4层 -->
<module name="NestedIfDepth">
<property name="max" value="3"/>
</module>

<!-- try-catch 嵌套语句个数 最多2层 -->
<module name="NestedTryDepth">
<property name="max" value="2"/>
</module>

<!-- 返回个数 -->
<module name="ReturnCount">
<property name="max" value="5"/>
<property name="format" value="^$"/>
</module>

</module>
</module>

配置 pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

<project>
...
<properties>
<checkstyle.config.location>config/maven_checks.xml</checkstyle.config.location>
</properties>
...
<reporting>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.0</version>
<executions>
<execution>
<!-- 绑定pmd:pmd到validate生命周期,在validate时会自动进行代码规范检查 -->
<id>validate</id>
<phase>validate</phase>
<configuration>
<!-- 配置文件的路径,在style文件夹下 -->
<configLocation>style/checkstyle.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<includeTestSourceDirectory>false</includeTestSourceDirectory>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jxr-plugin</artifactId>
<version>2.3</version>
</plugin>
</plugins>
</reporting>
...
</project>

其中可以修改使用的检查规则文件路径,插件默认提供了四个规则文件可以直接使用,无需手动下载:

  • config/sun_checks.xml - Sun Microsystems Definition (default).
  • config/maven_checks.xml - Maven Development Definitions.
  • config/turbine_checks.xml - Turbine Development Definitions.
  • config/avalon_checks.xml - Avalon Development Definitions.

配置好后,可以执行 mvn clean checkstyle:check 检查代码。

maven-pmd-plugin

maven-pmd-plugin 是阿里编程规范检查插件。

配置 pom.xml

参考 https://github.com/alibaba/p3c/blob/master/p3c-pmd/pom.xml 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
  <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-pmd-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<sourceEncoding>${project.build.sourceEncoding}</sourceEncoding>
<targetJdk>${maven.compiler.target}</targetJdk>
<printFailingErrors>true</printFailingErrors>
<rulesets>
<ruleset>rulesets/java/ali-comment.xml</ruleset>
<ruleset>rulesets/java/ali-concurrent.xml</ruleset>
<ruleset>rulesets/java/ali-constant.xml</ruleset>
<ruleset>rulesets/java/ali-exception.xml</ruleset>
<ruleset>rulesets/java/ali-flowcontrol.xml</ruleset>
<ruleset>rulesets/java/ali-naming.xml</ruleset>
<ruleset>rulesets/java/ali-oop.xml</ruleset>
<ruleset>rulesets/java/ali-orm.xml</ruleset>
<ruleset>rulesets/java/ali-other.xml</ruleset>
<ruleset>rulesets/java/ali-set.xml</ruleset>
</rulesets>
<printFailingErrors>true</printFailingErrors>
</configuration>
<executions>
<execution>
<phase>verify</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>com.alibaba.p3c</groupId>
<artifactId>p3c-pmd</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
</plugin>
</plugins>

配置好后,可以执行 mvn clean pmd:check 检查代码。

参考资料

系统测试架构

软件测试描述一种用来促进鉴定软件的正确性、完整性、安全性和质量的过程。软件测试的经典定义是:在规定的条件下对程序进行操作,以发现程序错误,衡量软件质量,并对其是否能满足设计要求进行评估的过程。

现代软件开发项目中,分工明确,基本上都会有研发、测试、QA 等角色。不同角色由于关注的视角不同,测试目标和测试方法也不完全相同。本文主要从研发、测试的视角去考量软件测试技术。

注意:

  • 为了方便,只有测试人员需要关注的测试点用【测试】标注;
  • 而只有研发人员需要关注的测试点用【研发】标注;
  • 都需要关注的测试点则不作标注。

测试方法分类

从测试设计方法分类

  • 黑盒测试【测试】 - 把软件系统当作一个“黑箱”,无法了解或使用系统的内部结构及知识。从软件的行为,而不是内部结构出发来设计测试。
  • 白盒测试【研发】 - 设计者可以看到软件系统的内部结构,并且使用软件的内部知识来指导测试数据及方法的选择。
  • 灰盒测试 - 介于黑盒和白盒之间。

小结:

  • 黑河测试通常针对的是软件的行为或功能,一般是测试人员主要关注的。
  • 白盒测试通常则需要对软件有一定程度的了解,一般是开发人员所关注的。
  • 灰盒测试通常是为了测试软件在特定的场景下的表现,而非主场景。

从测试的目的分类

功能测试

  • 单元测试(Unit Test) - 在最低粒度的功能/参数上验证程序的准确性,比如测试一个函数的正确性。【研发】
  • 功能测试(Functional Test) - 验证模块的功能。【测试】
  • 集成测试(Integration Test) - 验证几个互相有依赖关系的模块的功能。【测试】
  • 场景测试(Scenario Test)- 验证几个模块是否能完成一个用户场景。【测试】
  • 系统测试(System Test) - 对于整个系统功能的测试。【测试】
  • Alpha 测试 - 软件测试人员在真实用户环境中对软件进行全面的测试。【测试】
  • Beta 测试 - 也叫公测,是真实的用户在真实的环境中进行的测试。

非功能测试

  • 压力测试(Stress test) - 验证软件在超过负载设计的情况下仍能返回正确的结果,没有崩溃
  • 负载测试(Load test) - 测试软件在负载情况下能否正常工作
  • 性能测试(Performance test) - 测试软件的效能,是否提供满意的服务质量。
    • 常用技术:JMeter、JMH。
  • 软件辅助功能测试(Accessibility test) - 测试软件是否向残疾用户提供足够的辅助功能
  • 本地化/全球化测试(Localization/Globalization
  • 兼容性测试(Compatibility Test)
  • 配置测试(Configuration Test) - 测试软件在各种配置下能否正常工作
  • 可用性测试(Usability Test) – 测试软件是否好用
  • 安全性测试(Security Test)

参考资料

Intellij IDEA 快速入门

快捷键

核心快捷键

IntelliJ IDEA 作为一个以快捷键为中心的 IDE,为大多数操作建议了键盘快捷键。在这个主题中,您可以找到最不可缺少的列表,使 IntelliJ IDEA 轻松实现第一步。

核心快捷键表:

操作 快捷键
根据名称查找操作 Ctrl+Shift+A
显示可用 意图操作 列表 Alt+Enter
切换视图 (Project,Structure, etc.). Alt+F1
切换工具窗口和在编辑器中打开的文件 Ctrl+Tab
显示 导航栏. Alt+Home
插入代码模板. Ctrl+J
在周围插入代码模板. Ctrl+Alt+J
Edit an item from the Project or another tree view. F4
注释 Ctrl+/ Ctrl+Shift+/
根据名称查找类或文件. Ctrl+N Ctrl+Shift+N
拷贝当前行或指定的行. Ctrl+D
增加或减少选中的表达式. Ctrl+W and Ctrl+Shift+W
在当前文件查找或替换. Ctrl+F Ctrl+R
在项目中或指定的目录中查找或替换 Ctrl+Shift+F Ctrl+Shift+R
全局搜索 双击 Shift
快速查看选中对象的引用. Ctrl+Shift+F7
展开或折叠编辑器中的代码块. Ctrl+NumPad Plus Ctrl+NumPad -
调用代码完成. Ctrl+Space
智能声明完成. Ctrl+Shift+Enter
智能补全代码 Ctrl+Shift+Space
显示可用的重构方法列表 Ctrl+Shift+Alt+T

快捷键分类

Tradition

快捷键 介绍
Ctrl + Z 撤销
Ctrl + Shift + Z 取消撤销
Ctrl + X 剪切
Ctrl + C 复制
Ctrl + S 保存
Tab 缩进
Shift + Tab 取消缩进
Shift + Home/End 选中光标到当前行头位置/行尾位置
Ctrl + Home/End 跳到文件头/文件尾

Editing

快捷键 介绍
Ctrl + Space 基础代码补全,默认在 Windows 系统上被输入法占用,需要进行修改,建议修改为 Ctrl + 逗号(必备)
Ctrl + Alt + Space 类名自动完成
Ctrl + Shift + Enter 自动结束代码,行末自动添加分号(必备)
Ctrl + P 方法参数提示显示
Ctrl + Q 光标所在的变量/类名/方法名等上面(也可以在提示补充的时候按),显示文档内容
Shift + F1 如果有外部文档可以连接外部文档
Ctrl + F1 在光标所在的错误代码处显示错误信息(必备)
Alt + Insert 代码自动生成,如生成对象的 set/get 方法,构造函数,toString() 等(必备)
Ctrl + O 选择可重写的方法
Ctrl + I 选择可继承的方法
Ctrl + Alt + T 对选中的代码弹出环绕选项弹出层(必备)
Ctrl + / 注释光标所在行代码,会根据当前不同文件类型使用不同的注释符号(必备)
Ctrl + Shift + / 代码块注释(必备)
Ctrl + W 递进式选择代码块。可选中光标所在的单词或段落,连续按会在原有选中的基础上再扩展选中范围(必备)
Ctrl + Shift + W 递进式取消选择代码块。可选中光标所在的单词或段落,连续按会在原有选中的基础上再扩展取消选中范围(必备)
Alt + Q 弹出一个提示,显示当前类的声明/上下文信息
Alt + Enter IntelliJ IDEA 根据光标所在问题,提供快速修复选择,光标放在的位置不同提示的结果也不同(必备)
Ctrl + Alt + L 格式化代码,可以对当前文件和整个包目录使用(必备)
Ctrl + Alt + O 优化导入的类,可以对当前文件和整个包目录使用(必备)
Ctrl + Alt + I 光标所在行 或 选中部分进行自动代码缩进,有点类似格式化
Ctrl + Shift + C 复制当前文件磁盘路径到剪贴板(必备)
Ctrl + Shift + V 弹出缓存的最近拷贝的内容管理器弹出层
Ctrl + Alt + Shift + C 复制参考信息
Ctrl + Alt + Shift + V 无格式黏贴(必备)
Ctrl + D 复制光标所在行 或 复制选择内容,并把复制内容插入光标位置下面(必备)
Ctrl + Y 删除光标所在行 或 删除选中的行(必备)
Ctrl + Shift + J 自动将下一行合并到当前行末尾(必备)
Shift + Enter 开始新一行。光标所在行下空出一行,光标定位到新行位置(必备)
Ctrl + Shift + U 对选中的代码进行大/小写轮流转换(必备)
Ctrl + Shift + ]/[ 选中从光标所在位置到它的底部/顶部的中括号位置(必备)
Ctrl + Delete 删除光标后面的单词或是中文句(必备)
Ctrl + BackSpace 删除光标前面的单词或是中文句(必备)
Ctrl + +/- 展开/折叠代码块
Ctrl + Shift + +/- 展开/折叠所有代码(必备)
Ctrl + F4 关闭当前编辑文件
Ctrl + Shift + Up/Down 光标放在方法名上,将方法移动到上一个/下一个方法前面,调整方法排序(必备)
Alt + Shift + Up/Down 移动光标所在行向上移动/向下移动(必备)
Ctrl + Shift + 左键单击 把光标放在某个类变量上,按此快捷键可以直接定位到该类中(必备)
Alt + Shift + 左键双击 选择被双击的单词/中文句,按住不放,可以同时选择其他单词/中文句(必备)
Ctrl + Shift + T 对当前类生成单元测试类,如果已经存在的单元测试类则可以进行选择(必备)

Search/Replace

快捷键 介绍
Double Shift 弹出 Search Everywhere 弹出层
F3 在查找模式下,定位到下一个匹配处
Shift + F3 在查找模式下,查找匹配上一个
Ctrl + F 在当前文件进行文本查找(必备)
Ctrl + R 在当前文件进行文本替换(必备)
Ctrl + Shift + F 根据输入内容查找整个项目 或 指定目录内文件(必备)
Ctrl + Shift + R 根据输入内容替换对应内容,范围为整个项目 或 指定目录内文件(必备)
快捷键 介绍
Alt + F7 查找光标所在的方法/变量/类被调用的地方
Ctrl + Alt + F7 显示使用的地方。寻找被该类或是变量被调用的地方,用弹出框的方式找出来
Ctrl + Shift + F7 高亮显示所有该选中文本,按 Esc 高亮消失(必备)

Compile and Run

快捷键 介绍
Ctrl + F9 执行 Make Project 操作
Ctrl + Shift + F9 编译选中的文件/包/Module
Shift + F9 Debug
Shift + F10 Run
Alt + Shift + F9 弹出 Debug 的可选择菜单
Alt + Shift + F10 弹出 Run 的可选择菜单

Debugging

快捷键 介绍
F7 在 Debug 模式下,进入下一步,如果当前行断点是一个方法,则进入当前方法体内,如果该方法体还有方法,则不会进入该内嵌的方法中
F8 在 Debug 模式下,进入下一步,如果当前行断点是一个方法,则不进入当前方法体内
Shift + F7 在 Debug 模式下,智能步入。断点所在行上有多个方法调用,会弹出进入哪个方法
Shift + F8 在 Debug 模式下,跳出,表现出来的效果跟 F9 一样
Alt + F8 在 Debug 模式下,选中对象,弹出可输入计算表达式调试框,查看该输入内容的调试结果
Alt + F9 在 Debug 模式下,执行到光标处
F9 在 Debug 模式下,恢复程序运行,但是如果该断点下面代码还有断点则停在下一个断点上
Ctrl + F8 在 Debug 模式下,设置光标当前行为断点,如果当前已经是断点则去掉断点
Ctrl + Shift + F8 在 Debug 模式下,指定断点进入条件
快捷键 介绍
Ctrl + N 跳转到类(必备)
Ctrl + Shift + N 跳转到文件(必备)
Ctrl + Alt + Shift + N 跳转到符号(必备)
Alt + Left/Right 切换当前已打开的窗口中的子视图,比如 Debug 窗口中有 Output、Debugger 等子视图,用此快捷键就可以在子视图中切换(必备)
F12 回到前一个工具窗口(必备)
ESC 从工具窗口进入代码文件窗口(必备)
Shift + ESC 隐藏当前 或 最后一个激活的工具窗口
Ctrl + G 跳转到当前文件的指定行处
Ctrl + E 显示最近打开的文件记录列表(必备)
Ctrl + Shift + E 显示最近编辑的文件记录列表(必备)
Ctrl + Alt + Left/Right 跳转到上一个/下一个操作的地方(必备)
Ctrl + Shift + Backspace 退回到上次修改的地方(必备)
Alt + F1 显示当前文件选择目标弹出层,弹出层中有很多目标可以进行选择(必备)
Ctrl + B/Ctrl + 左键单击 跳转到声明处
Ctrl + Alt + B 在某个调用的方法名上使用会跳到具体的实现处,可以跳过接口
Ctrl + Shift + B 跳转到类型声明处(必备)
Ctrl + Shift + I 快速查看光标所在的方法 或 类的定义
Ctrl + U 前往当前光标所在的方法的父类的方法/接口定义(必备)
Alt + Up/Down 跳转到当前文件的前一个/后一个方法(必备)
Ctrl + ]/[ 跳转到当前所在代码的花括号结束位置/开始位置
Ctrl + F12 弹出当前文件结构层,可以在弹出的层上直接输入,进行筛选
Ctrl + H 显示当前类的层次结构
Ctrl + Shift + H 显示方法层次结构
Ctrl + Alt + H 调用层次
F2/Shift + F2 跳转到下一个/上一个高亮错误 或 警告位置(必备)
F4 编辑源(必备)
Alt + Home 定位/显示到当前文件的 Navigation Bar
F11 添加书签(必备)
Ctrl + F11 选中文件/文件夹,使用助记符设定/取消书签(必备)
Shift + F11 弹出书签显示层(必备)
Alt + 1,2,3…9 显示对应数值的选项卡,其中 1 是 Project 用得最多(必备)
Ctrl + 1,2,3…9 定位到对应数值的书签位置(必备)

Refactoring

快捷键 介绍
Shift + F6 对文件/文件夹 重命名(必备)
Ctrl + Alt + Shift + T 打开重构菜单(必备)

VCS/Local History

快捷键 介绍
Ctrl + K 版本控制提交项目,需要此项目有加入到版本控制才可用
Ctrl + T 版本控制更新项目,需要此项目有加入到版本控制才可用
`Alt + `
Alt + Shift + C 查看最近操作项目的变化情况列表
Alt + Shift + N 选择/添加 task(必备)

Live Templates

快捷键 介绍
Ctrl + J 插入自定义动态代码模板(必备)
Ctrl + Alt + J 弹出模板选择窗口,将选定的代码加入动态模板中

General

快捷键 介绍
Ctrl + Tab 编辑窗口切换,如果在切换的过程又加按上 delete,则是关闭对应选中的窗口
Ctrl + Alt + Y 同步、刷新
Ctrl + Alt + S 打开 IntelliJ IDEA 系统设置(必备)
Ctrl + Alt + Shift + S 打开当前项目设置(必备)
Ctrl + Shift + A 查找动作/设置(必备)
Ctrl + Shift + F12 编辑器最大化(必备)
Alt + Shift + F 显示添加到收藏夹弹出层/添加到收藏夹
Alt + Shift + I 查看项目当前文件

Intellij IDEA 官方快捷键表

img

插件

推荐几个比较好用的插件

个性化

颜色主题

intellij-colors-solarized 个人觉得这种色彩搭配十分优雅

下载地址

FAQ

(1)运行时报错

Error running XXX. Command line is too long. Shorten the command line via JAR manifest or via a classpath file and rerun

解决方案:

找到 .idea/libraies/workspace.xml 中的 <component name="PropertiesComponent">

添加一行配置:

1
<property name="dynamic.classpath" value="true" />

参考资料

Mysql 运维

如果你的公司有 DBA,那么我恭喜你,你可以无视 Mysql 运维。如果你的公司没有 DBA,那你就好好学两手 Mysql 基本运维操作,行走江湖,防身必备。

安装部署

Windows 安装

(1)下载 Mysql 5.7 免安装版

下载地址:https://dev.mysql.com/downloads/mysql/5.7.html#downloads

(2)解压并创建 my.ini 在根目录

my.ini 文件示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[mysqld]
#设置3306端口
port = 3306
# 设置mysql的安装目录 这块换成自己解压的路径
basedir=D:\\Tools\\DB\\mysql\\mysql-5.7.31
# 允许最大连接数
max_connections=200
# 服务端使用的字符集默认为8比特编码的latin1字符集
character-set-server=utf8
# 创建新表时将使用的默认存储引擎
default-storage-engine=INNODB

[client]
# 设置mysql客户端默认字符集
default-character-set=utf8

(3)执行安装命令

在控制台 CMD 中依次执行以下安装命令

1
2
3
cd D:\\Tools\\DB\\mysql\\mysql-5.7.31
mysqld --initialize
mysqld -install

说明:

  • mysqld --initialize 会自动初始化创建 data 文件夹并初始化 mysql。
  • mysqld -install 会安装 mysql 服务。

(4)启动服务

在控制台执行 net start mysql 启动服务。

CentOS 安装

本文仅介绍 rpm 安装方式

安装 mysql yum 源

官方下载地址:https://dev.mysql.com/downloads/repo/yum/

(1)下载 yum 源

1
wget https://dev.mysql.com/get/mysql80-community-release-el7-1.noarch.rpm

(2)安装 yum repo 文件并更新 yum 缓存

1
rpm -ivh mysql80-community-release-el7-1.noarch.rpm

执行结果:

会在 /etc/yum.repos.d/ 目录下生成两个 repo 文件

1
2
3
$ ls | grep mysql
mysql-community.repo
mysql-community-source.repo

更新 yum:

1
2
yum clean all
yum makecache

(3)查看 rpm 安装状态

1
2
3
4
5
6
$ yum search mysql | grep server
mysql-community-common.i686 : MySQL database common files for server and client
mysql-community-common.x86_64 : MySQL database common files for server and
mysql-community-test.x86_64 : Test suite for the MySQL database server
: administering MySQL servers
mysql-community-server.x86_64 : A very fast and reliable SQL database server

通过 yum 安装 mysql 有几个重要目录:

1
2
3
4
5
6
7
8
9
10
## 配置文件
/etc/my.cnf
## 数据库目录
/var/lib/mysql/
## 配置文件
/usr/share/mysql(mysql.server命令及配置文件)
## 相关命令
/usr/bin(mysqladmin mysqldump等命令)
## 启动脚本
/usr/lib/systemd/system/mysqld.service (注册为 systemd 服务)

(4)安装 mysql 服务器

1
yum install mysql-community-server

mysql 服务管理

通过 yum 方式安装 mysql 后,本地会有一个名为 mysqld 的 systemd 服务。

其服务管理十分简便:

1
2
3
4
5
6
7
8
9
10
11
12
## 查看状态
systemctl status mysqld
## 启用服务
systemctl enable mysqld
## 禁用服务
systemctl disable mysqld
## 启动服务
systemctl start mysqld
## 重启服务
systemctl restart mysqld
## 停止服务
systemctl stop mysqld

初始化数据库密码

查看一下初始密码

1
2
$ grep "password" /var/log/mysqld.log
2018-09-30T03:13:41.727736Z 5 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: %:lt+srWu4k1

执行命令:

1
mysql -uroot -p<临时密码>

输入临时密码,进入 mysql,如果要修改密码,执行以下指令:

1
ALTER user 'root'@'localhost' IDENTIFIED BY '你的密码';

注:密码强度默认为中等,大小写字母、数字、特殊符号,只有修改成功后才能修改配置再设置更简单的密码

配置远程访问

1
2
3
4
CREATE USER 'root'@'%' IDENTIFIED BY '你的密码';
GRANT ALL ON *.* TO 'root'@'%';
ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY '你的密码';
FLUSH PRIVILEGES;

跳过登录认证

1
vim /etc/my.cnf

在 [mysqld] 下面加上 skip-grant-tables

作用是登录时跳过登录认证,换句话说就是 root 什么密码都可以登录进去。

执行 systemctl restart mysqld,重启 mysql

基本运维

客户端连接

语法:mysql -h<主机> -P<端口> -u<用户名> -p<密码>

如果没有显式指定密码,会要求输入密码才能访问。

【示例】连接本地 Mysql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ mysql -h 127.0.0.1 -P 3306 -u root -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 13501
Server version: 8.0.19 MySQL Community Server - GPL

Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql>

查看连接

连接完成后,如果你没有后续的动作,这个连接就处于空闲状态,你可以在 show processlist 命令中看到它。客户端如果太长时间没动静,连接器就会自动将它断开。这个时间是由参数 wait_timeout 控制的,默认值是 8 小时。

img

创建用户

1
CREATE USER 'username'@'host' IDENTIFIED BY 'password';

说明:

  • username:你将创建的用户名
  • host:指定该用户在哪个主机上可以登陆,如果是本地用户可用 localhost,如果想让该用户可以从任意远程主机登陆,可以使用通配符%
  • password:该用户的登陆密码,密码可以为空,如果为空则该用户可以不需要密码登陆服务器

示例:

1
2
3
4
5
CREATE USER 'dog'@'localhost' IDENTIFIED BY '123456';
CREATE USER 'pig'@'192.168.1.101_' IDENDIFIED BY '123456';
CREATE USER 'pig'@'%' IDENTIFIED BY '123456';
CREATE USER 'pig'@'%' IDENTIFIED BY '';
CREATE USER 'pig'@'%';

注意:在 Mysql 8 中,默认密码验证不再是 password。所以在创建用户时,create user 'username'@'%' identified by 'password'; 客户端是无法连接服务的。

所以,需要加上 IDENTIFIED WITH mysql_native_password,例如:CREATE USER 'slave'@'%' IDENTIFIED WITH mysql_native_password BY '123456';

查看用户

1
2
3
-- 查看所有用户
SELECT DISTINCT CONCAT('User: ''', user, '''@''', host, ''';') AS query
FROM mysql.user;

授权

命令:

1
GRANT privileges ON databasename.tablename TO 'username'@'host'

说明:

  • privileges:用户的操作权限,如SELECTINSERTUPDATE等,如果要授予所的权限则使用ALL
  • databasename:数据库名
  • tablename:表名,如果要授予该用户对所有数据库和表的相应操作权限则可用*表示,如*.*

示例:

1
2
3
GRANT SELECT, INSERT ON test.user TO 'pig'@'%';
GRANT ALL ON *.* TO 'pig'@'%';
GRANT ALL ON maindataplus.* TO 'pig'@'%';

注意:

用以上命令授权的用户不能给其它用户授权,如果想让该用户可以授权,用以下命令:

1
2
3
4
-- 为指定用户配置指定权限
GRANT privileges ON databasename.tablename TO 'username'@'host' WITH GRANT OPTION;
-- 为 root 用户分配所有权限
GRANT ALL ON *.* TO 'root'@'%' IDENTIFIED BY '密码' WITH GRANT OPTION;

撤销授权

命令:

1
REVOKE privilege ON databasename.tablename FROM 'username'@'host';

说明:

privilege, databasename, tablename:同授权部分

例子:

1
REVOKE SELECT ON *.* FROM 'pig'@'%';

注意:

假如你在给用户'pig'@'%'授权的时候是这样的(或类似的):GRANT SELECT ON test.user TO 'pig'@'%',则在使用REVOKE SELECT ON *.* FROM 'pig'@'%';命令并不能撤销该用户对 test 数据库中 user 表的SELECT 操作。相反,如果授权使用的是GRANT SELECT ON *.* TO 'pig'@'%';REVOKE SELECT ON test.user FROM 'pig'@'%';命令也不能撤销该用户对 test 数据库中 user 表的Select权限。

具体信息可以用命令SHOW GRANTS FOR 'pig'@'%'; 查看。

查看授权

1
2
-- 查看用户权限
SHOW GRANTS FOR 'root'@'%';

更改用户密码

1
SET PASSWORD FOR 'username'@'host' = PASSWORD('newpassword');

如果是当前登陆用户用:

1
SET PASSWORD = PASSWORD("newpassword");

示例:

1
SET PASSWORD FOR 'pig'@'%' = PASSWORD("123456");

备份与恢复

Mysql 备份数据使用 mysqldump 命令。

mysqldump 将数据库中的数据备份成一个文本文件,表的结构和表中的数据将存储在生成的文本文件中。

备份:

备份一个数据库

语法:

1
mysqldump -h <host> -P<port> -u<username> -p<database> [<table1> <table2> ...] > backup.sql
  • host - Mysql Server 的 host
  • port - Mysql Server 的端口
  • username - 数据库用户
  • dbname - 数据库名称
  • table1 和 table2 参数表示需要备份的表的名称,为空则整个数据库备份;
  • BackupName.sql 参数表设计备份文件的名称,文件名前面可以加上一个绝对路径。通常将数据库被分成一个后缀名为 sql 的文件

备份多个数据库

1
mysqldump -u <username> -p --databases <database1> <database2> ... > backup.sql

备份所有数据库

1
mysqldump -u <username> -p --all-databases > backup.sql

恢复一个数据库

Mysql 恢复数据使用 mysql 命令。

语法:

1
mysql -h <host> -P<port> -u<username> -p<database> < backup.sql

恢复所有数据库

1
mysql -u<username> -p --all-databases < backup.sql

卸载

(1)查看已安装的 mysql

1
2
3
4
5
6
7
$ rpm -qa | grep -i mysql
perl-DBD-MySQL-4.023-6.el7.x86_64
mysql80-community-release-el7-1.noarch
mysql-community-common-8.0.12-1.el7.x86_64
mysql-community-client-8.0.12-1.el7.x86_64
mysql-community-libs-compat-8.0.12-1.el7.x86_64
mysql-community-libs-8.0.12-1.el7.x86_64

(2)卸载 mysql

1
yum remove mysql-community-server.x86_64

主从节点部署

假设需要配置一个主从 Mysql 服务器环境

  • master 节点:192.168.8.10
  • slave 节点:192.168.8.11

主节点上的操作

(1)修改配置并重启

执行 vi /etc/my.cnf ,添加如下配置:

1
2
3
[mysqld]
server-id=1
log_bin=/var/lib/mysql/binlog
  • server-id - 服务器 ID 号。在主从架构中,每台机器的 ID 必须唯一。
  • log_bin - 同步的日志路径及文件名,一定注意这个目录要是 mysql 有权限写入的;

修改后,重启 mysql 使配置生效:

1
systemctl restart mysql

(2)创建用于同步的用户

进入 mysql 命令控制台:

1
2
$ mysql -u root -p
Password:

执行以下 SQL:

1
2
3
4
5
6
7
8
9
10
11
12
-- a. 创建 slave 用户
CREATE USER 'slave'@'%' IDENTIFIED WITH mysql_native_password BY '密码';
-- 为 slave 赋予 REPLICATION SLAVE 权限
GRANT REPLICATION SLAVE ON *.* TO 'slave'@'%';

-- b. 或者,创建 slave 用户,并指定该用户能在任意主机上登录
-- 如果有多个从节点,又想让所有从节点都使用统一的用户名、密码认证,可以考虑这种方式
CREATE USER 'slave'@'%' IDENTIFIED WITH mysql_native_password BY '密码';
GRANT REPLICATION SLAVE ON *.* TO 'slave'@'%';

-- 刷新授权表信息
FLUSH PRIVILEGES;

注意:在 Mysql 8 中,默认密码验证不再是 password。所以在创建用户时,create user 'username'@'%' identified by 'password'; 客户端是无法连接服务的。所以,需要加上 IDENTIFIED WITH mysql_native_password BY 'password'

补充用户管理 SQL:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
-- 查看所有用户
SELECT DISTINCT CONCAT('User: ''', user, '''@''', host, ''';') AS query
FROM mysql.user;

-- 查看用户权限
SHOW GRANTS FOR 'root'@'%';

-- 创建用户
-- a. 创建 slave 用户,并指定该用户只能在主机 192.168.8.11 上登录
CREATE USER 'slave'@'192.168.8.11' IDENTIFIED WITH mysql_native_password BY '密码';
-- 为 slave 赋予 REPLICATION SLAVE 权限
GRANT REPLICATION SLAVE ON *.* TO 'slave'@'192.168.8.11';

-- 删除用户
DROP USER 'slave'@'192.168.8.11';

(3)加读锁

为了主库与从库的数据保持一致,我们先为 mysql 加入读锁,使其变为只读。

1
mysql> FLUSH TABLES WITH READ LOCK;

(4)查看主节点状态

1
2
3
4
5
6
7
mysql> show master status;
+------------------+----------+--------------+---------------------------------------------+-------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+---------------------------------------------+-------------------+
| mysql-bin.000001 | 4202 | | mysql,information_schema,performance_schema | |
+------------------+----------+--------------+---------------------------------------------+-------------------+
1 row in set (0.00 sec)

注意:需要记录下 FilePosition,后面会用到。

(5)导出 sql

1
mysqldump -u root -p --all-databases --master-data > dbdump.sql

(6)解除读锁

1
mysql> UNLOCK TABLES;

(7)将 sql 远程传送到从节点上

1
scp dbdump.sql root@192.168.8.11:/home

从节点上的操作

(1)修改配置并重启

执行 vi /etc/my.cnf ,添加如下配置:

1
2
3
[mysqld]
server-id=2
log_bin=/var/lib/mysql/binlog
  • server-id - 服务器 ID 号。在主从架构中,每台机器的 ID 必须唯一。
  • log_bin - 同步的日志路径及文件名,一定注意这个目录要是 mysql 有权限写入的;

修改后,重启 mysql 使配置生效:

1
systemctl restart mysql

(2)导入 sql

1
mysql -u root -p < /home/dbdump.sql

(3)在从节点上建立与主节点的连接

进入 mysql 命令控制台:

1
2
$ mysql -u root -p
Password:

执行以下 SQL:

1
2
3
4
5
6
7
8
9
10
-- 停止从节点服务
STOP SLAVE;

-- 注意:MASTER_USER 和
CHANGE MASTER TO
MASTER_HOST='192.168.8.10',
MASTER_USER='slave',
MASTER_PASSWORD='密码',
MASTER_LOG_FILE='binlog.000001',
MASTER_LOG_POS=4202;
  • MASTER_LOG_FILEMASTER_LOG_POS 参数要分别与 show master status 指令获得的 FilePosition 属性值对应。
  • MASTER_HOST 是主节点的 HOST。
  • MASTER_USERMASTER_PASSWORD 是在主节点上注册的用户及密码。

(4)启动 slave 进程

1
mysql> start slave;

(5)查看主从同步状态

1
mysql> show slave status\G;

说明:如果以下两项参数均为 YES,说明配置正确。

  • Slave_IO_Running
  • Slave_SQL_Running

(6)将从节点设为只读

1
2
3
4
5
6
7
8
9
10
11
mysql> set global read_only=1;
mysql> set global super_read_only=1;
mysql> show global variables like "%read_only%";
+-----------------------+-------+
| Variable_name | Value |
+-----------------------+-------+
| innodb_read_only | OFF |
| read_only | ON |
| super_read_only | ON |
| transaction_read_only | OFF |
+-----------------------+-------+

注:设置 slave 服务器为只读,并不影响主从同步。

慢查询

查看慢查询是否开启

1
show variables like '%slow_query_log';

可以通过 set global slow_query_log 命令设置慢查询是否开启:ON 表示开启;OFF 表示关闭。

1
set global slow_query_log='ON';

查看慢查询时间阈值

1
show variables like '%long_query_time%';

设置慢查询阈值

1
set global long_query_time = 3;

隔离级别

查看隔离级别:

1
2
3
4
5
6
7
8
9
10
11
mysql> show variables like 'transaction_isolation';

+-----------------------+----------------+

| Variable_name | Value |

+-----------------------+----------------+

| transaction_isolation | READ-COMMITTED |

+-----------------------+----------------+

服务器配置

大部分情况下,默认的基本配置已经足够应付大多数场景,不要轻易修改 Mysql 服务器配置,除非你明确知道修改项是有益的。

尽量不要使用 Mysql 的缓存功能,因为其要求每次请求参数完全相同,才能命中缓存。这种方式实际上并不高效,还会增加额外开销,实际业务场景中一般使用 Redis 等 key-value 存储来解决缓存问题,性能远高于 Mysql 的查询缓存。

配置文件路径

配置 Mysql 首先要确定配置文件在哪儿。

不同 Linux 操作系统上,Mysql 配置文件路径可能不同。通常的路径为 /etc/my.cnf 或 /etc/mysql/my.cnf 。

如果不知道配置文件路径,可以尝试以下操作:

1
2
3
4
5
# which mysqld
/usr/sbin/mysqld
# /usr/sbin/mysqld --verbose --help | grep -A 1 'Default options'
Default options are read from the following files in the given order:
/etc/my.cnf /etc/mysql/my.cnf /usr/etc/my.cnf ~/.my.cnf

配置项语法

Mysql 配置项设置都使用小写,单词之间用下划线或横线隔开(二者是等价的)。

建议使用固定的风格,这样检索配置项时较为方便。

1
2
3
# 这两种格式等价
/usr/sbin/mysqld --auto-increment-offset=5
/usr/sbin/mysqld --auto_increment_offset=5

基本配置模板

一个基本的 Mysql 配置模板大概如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
[mysqld]
# GENERAL
# -------------------------------------------------------------------------------
datadir = /var/lib/mysql
socket = /var/lib/mysql/mysql.sock
pid_file = /var/lib/mysql/mysql.pid
user = mysql
port = 3306
default_storage_engine = InnoDB
default_time_zone = '+8:00'
character_set_server = utf8mb4
collation_server = utf8mb4_0900_ai_ci

# LOG
# -------------------------------------------------------------------------------
log_error = /var/log/mysql/mysql-error.log
slow_query_log = 1
slow_query_log_file = /var/log/mysql/mysql-slow.log

# InnoDB
# -------------------------------------------------------------------------------
innodb_buffer_pool_size = <value>
innodb_log_file_size = <value>
innodb_file_per_table = 1
innodb_flush_method = O_DIRECT

# MyIsam
# -------------------------------------------------------------------------------
key_buffer_size = <value>

# OTHER
# -------------------------------------------------------------------------------
tmp_table_size = 32M
max_heap_table_size = 32M
max_connections = <value>
open_files_limit = 65535

[client]
socket = /var/lib/mysql/mysql.sock
port = 3306

配置项说明

下面是一个较为详尽的 Mysql 配置文件,各配置项有注释说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
[mysqld]
# GENERAL
# -------------------------------------------------------------------------------
datadir = /var/lib/mysql
# socket 文件
socket = /var/lib/mysql/mysql.sock
# PID 文件
pid_file = /var/lib/mysql/mysql.pid
# 启动 mysql 服务进程的用户
user = mysql
# 服务端口号,默认 3306
port = 3306
default_storage_engine = InnoDB
# 默认时区
default_time_zone = '+8:00'
character_set_server = utf8mb4
collation_server = utf8mb4_0900_ai_ci

# Mysql 服务 ID,单点服务时没必要设置
server-id = 1

# 事务隔离级别,默认为可重复读(REPEATABLE-READ)。(此级别下可能参数很多间隙锁,影响性能,但是修改又影响主从复制及灾难恢复,建议还是修改代码逻辑吧)
# 隔离级别可选项目:READ-UNCOMMITTED READ-COMMITTED REPEATABLE-READ SERIALIZABLE
transaction_isolation = REPEATABLE-READ

# 目录配置
# -------------------------------------------------------------------------------

# mysql 安装根目录
basedir = /usr/local/mysql-5.7.21

# mysql 数据文件所在目录
datadir = /var/lib/mysql

# 临时目录 比如 load data infile 会用到,一般都是使用/tmp
tmpdir = /tmp

# 数据库引擎配置
# -------------------------------------------------------------------------------

# mysql 5.1 之后,默认引擎是 InnoDB
default_storage_engine = InnoDB

# 内存临时表默认引擎,默认 InnoDB
default_tmp_storage_engine = InnoDB

# mysql 5.7 新增特性,磁盘临时表默认引擎,默认 InnoDB
internal_tmp_disk_storage_engine = InnoDB

# 字符集配置
# -------------------------------------------------------------------------------

# 数据库默认字符集,主流字符集支持一些特殊表情符号(特殊表情符占用 4 个字节)
character_set_server = utf8mb4

# 数据库字符集对应一些排序等规则,注意要和 character_set_server 对应
collation-server = utf8mb4_0900_ai_ci

# 设置 client 连接 mysql 时的字符集,防止乱码
# init_connect='SET NAMES utf8'

# 是否对 sql 语句大小写敏感,默认值为 0,1 表示不敏感
lower_case_table_names = 1

# 数据库连接配置
# -------------------------------------------------------------------------------

# 最大连接数,可设最大值 16384,一般考虑根据同时在线人数设置一个比较综合的数字,鉴于该数值增大并不太消耗系统资源,建议直接设 10000
# 如果在访问时经常出现 Too Many Connections 的错误提示,则需要增大该参数值
max_connections = 10000

# 默认值 100,最大错误连接数,如果有超出该参数值个数的中断错误连接,则该主机将被禁止连接。如需对该主机进行解禁,执行:FLUSH HOST
# 考虑高并发场景下的容错,建议加大。
max_connect_errors = 10000

# MySQL 打开的文件描述符限制,默认最小 1024;
# 当 open_files_limit 没有被配置的时候,比较 max_connections\*5 和 ulimit -n 的值,哪个大用哪个,
# 当 open_file_limit 被配置的时候,比较 open_files_limit 和 max_connections\*5 的值,哪个大用哪个。
# 注意:仍然可能出现报错信息 Can't create a new thread;此时观察系统 cat /proc/mysql 进程号/limits,观察进程 ulimit 限制情况
# 过小的话,考虑修改系统配置表,/etc/security/limits.conf 和 /etc/security/limits.d/90-nproc.conf
open_files_limit = 65535

# 超时配置
# -------------------------------------------------------------------------------

# MySQL 默认的 wait_timeout 值为 8 个小时,interactive_timeout 参数需要同时配置才能生效
# MySQL 连接闲置超过一定时间后(单位:秒,此处为 1800 秒)将会被强行关闭
interactive_timeout = 1800
wait_timeout = 1800

# 在 MySQL 暂时停止响应新请求之前的短时间内多少个请求可以被存在堆栈中
# 官方建议 back_log = 50 + (max_connections / 5),封顶数为 900
back_log = 900

# 数据库数据交换配置
# -------------------------------------------------------------------------------
# 该参数限制服务器端,接受的数据包大小,如果有 BLOB 子段,建议增大此值,避免写入或者更新出错。有 BLOB 子段,建议改为 1024M
max_allowed_packet = 128M

# 内存、cache 与 buffer 设置

# 内存临时表的最大值,默认 16M,此处设置成 64M
tmp_table_size = 64M

# 用户创建的内存表的大小,默认 16M,往往和 tmp_table_size 一起设置,限制用户临时表大小。
# 超限的话,MySQL 就会自动地把它转化为基于磁盘的 MyISAM 表,存储在指定的 tmpdir 目录下,增大 IO 压力,建议内存大,增大该数值。
max_heap_table_size = 64M

# 表示这个 mysql 版本是否支持查询缓存。ps:SHOW STATUS LIKE 'qcache%',与缓存相关的状态变量。
# have_query_cache

# 这个系统变量控制着查询缓存功能的开启和关闭,0 表示关闭,1 表示打开,2 表示只要 select 中明确指定 SQL_CACHE 才缓存。
# 看业务场景决定是否使用缓存,不使用,下面就不用配置了。
# Mysql8 不支持
query_cache_type = 0

# 默认值 1M,优点是查询缓存可以极大的提高服务器速度,如果你有大量的相同的查询并且很少修改表。
# 缺点:在你表经常变化的情况下或者如果你的查询原文每次都不同,查询缓存也许引起性能下降而不是性能提升。
# Mysql8 不支持
query_cache_size = 64M

# 只有小于此设定值的结果才会被缓冲,保护查询缓冲,防止一个极大的结果集将其他所有的查询结果都覆盖。
query_cache_limit = 2M

# 每个被缓存的结果集要占用的最小内存,默认值 4kb,一般不怎么调整。
# 如果 Qcache_free_blocks 值过大,可能是 query_cache_min_res_unit 值过大,应该调小些
# query_cache_min_res_unit 的估计值:(query_cache_size - Qcache_free_memory) / Qcache_queries_in_cache
query_cache_min_res_unit = 4kb

# 在一个事务中 binlog 为了记录 SQL 状态所持有的 cache 大小
# 如果你经常使用大的、多声明的事务,你可以增加此值来获取更大的性能。
# 所有从事务来的状态都将被缓冲在 binlog 缓冲中然后在提交后一次性写入到 binlog 中
# 如果事务比此值大,会使用磁盘上的临时文件来替代。
# 此缓冲在每个连接的事务第一次更新状态时被创建
binlog_cache_size = 1M

# 日志配置
# -------------------------------------------------------------------------------

# 日志文件相关设置,一般只开启三种日志,错误日志,慢查询日志,二进制日志。普通查询日志不开启。
# 普通查询日志,默认值 off,不开启
general_log = 0

# 普通查询日志存放地址
general_log_file = /usr/local/mysql-5.7.21/log/mysql-general.log

# 全局动态变量,默认 3,范围:1 ~ 3
# 表示错误日志记录的信息,1:只记录 error 信息;2:记录 error 和 warnings 信息;3:记录 error、warnings 和普通的 notes 信息。
log_error_verbosity = 2

# 错误日志文件地址
log_error = /usr/local/mysql-5.7.21/log/mysql-error.log

# 开启慢查询
slow_query_log = 1

# 开启慢查询时间,此处为 1 秒,达到此值才记录数据
long_query_time = 3

# 检索行数达到此数值,才记录慢查询日志中
min_examined_row_limit = 100

# mysql 5.6.5 新增,用来表示每分钟允许记录到 slow log 的且未使用索引的 SQL 语句次数,默认值为 0,不限制。
log_throttle_queries_not_using_indexes = 0

# 慢查询日志文件地址
slow_query_log_file = /var/log/mysql/mysql-slow.log

# 开启记录没有使用索引查询语句
log-queries-not-using-indexes = 1

# 开启二进制日志
log_bin = /usr/local/mysql-5.7.21/log/mysql-bin.log

# mysql 清除过期日志的时间,默认值 0,不自动清理,而是使用滚动循环的方式。
expire_logs_days = 0

# 如果二进制日志写入的内容超出给定值,日志就会发生滚动。你不能将该变量设置为大于 1GB 或小于 4096 字节。 默认值是 1GB。
max_binlog_size = 1000M

# binlog 的格式也有三种:STATEMENT,ROW,MIXED。mysql 5.7.7 后,默认值从 MIXED 改为 ROW
# 关于 binlog 日志格式问题,请查阅网络资料
binlog_format = row

# 表示每 N 次写入 binlog 后,持久化到磁盘,默认值 N=1
# 建议设置成 1,这样可以保证 MySQL 异常重启之后 binlog 不丢失。
# sync_binlog = 1

# InnoDB 引擎配置
# -------------------------------------------------------------------------------

# 说明:该参数可以提升扩展性和刷脏页性能。
# 默认值 1,建议值:4-8;并且必须小于 innodb_buffer_pool_instances
innodb_page_cleaners = 4

# 说明:一般 8k 和 16k 中选择,8k 的话,cpu 消耗小些,selcet 效率高一点,一般不用改
# 默认值:16k;建议值:不改,
innodb_page_size = 16384

# 说明:InnoDB 使用一个缓冲池来保存索引和原始数据,不像 MyISAM。这里你设置越大,你在存取表里面数据时所需要的磁盘 I/O 越少。
# 在一个独立使用的数据库服务器上,你可以设置这个变量到服务器物理内存大小的 60%-80%
# 注意别设置的过大,会导致 system 的 swap 空间被占用,导致操作系统变慢,从而减低 sql 查询的效率
# 默认值:128M,建议值:物理内存的 60%-80%
innodb_buffer_pool_size = 512M

# 说明:只有当设置 innodb_buffer_pool_size 值大于 1G 时才有意义,小于 1G,instances 默认为 1,大于 1G,instances 默认为 8
# 但是网络上有评价,最佳性能,每个实例至少 1G 大小。
# 默认值:1 或 8,建议值:innodb_buffer_pool_size/innodb_buffer_pool_instances >= 1G
innodb_buffer_pool_instances = 1

# 说明:mysql 5.7 新特性,defines the chunk size for online InnoDB buffer pool resizing operations。
# 实际缓冲区大小必须为 innodb_buffer_pool_chunk_size*innodb_buffer_pool_instances*倍数,取略大于 innodb_buffer_pool_size
# 默认值 128M,建议值:默认值就好,乱改反而容易出问题,它会影响实际 buffer pool 大小。
innodb_buffer_pool_chunk_size = 128M

# 在启动时把热数据加载到内存。默认值为 on,不修改
innodb_buffer_pool_load_at_startup = 1

# 在关闭时把热数据 dump 到本地磁盘。默认值为 on,不修改
innodb_buffer_pool_dump_at_shutdown = 1

# 说明:影响 Innodb 缓冲区的刷新算法,建议从小到大配置,直到 zero free pages;innodb_lru_scan_depth \* innodb_buffer_pool_instances defines the amount of work performed by the page cleaner thread each second。
# 默认值 1024,建议值: 未知
innodb_lru_scan_depth = 1024

# 说明:事务等待获取资源等待的最长时间,单位为秒,看具体业务情况,一般默认值就好
# 默认值:50,建议值:看业务。
innodb_lock_wait_timeout = 60

# 说明:设置了 Mysql 后台任务(例如页刷新和 merge dadta from buffer pool)每秒 io 操作的上限。
# 默认值:200,建议值:方法一,单盘 sata 设 100,sas10,raid10 设 200,ssd 设 2000,fushion-io 设 50000;方法二,通过测试工具获得磁盘 io 性能后,设置 IOPS 数值/2。
innodb_io_capacity = 2000

# 说明:该参数是所有缓冲区线程 io 操作的总上限。
# 默认值:innodb_io_capacity 的两倍。建议值:例如用 iometer 测试后的 iops 数值就好
innodb_io_capacity_max = 4000

# 说明:控制着 innodb 数据文件及 redo log 的打开、刷写模式,三种模式:fdatasync(默认),O_DSYNC,O_DIRECT
# fdatasync:数据文件,buffer pool->os buffer->磁盘;日志文件,buffer pool->os buffer->磁盘;
# O_DSYNC: 数据文件,buffer pool->os buffer->磁盘;日志文件,buffer pool->磁盘;
# O_DIRECT: 数据文件,buffer pool->磁盘; 日志文件,buffer pool->os buffer->磁盘;
# 默认值为空,建议值:使用 SAN 或者 raid,建议用 O_DIRECT,不懂测试的话,默认生产上使用 O_DIRECT
innodb_flush_method = O_DIRECT

# 说明:mysql5.7 之后默认开启,意思是,每张表一个独立表空间。
# 默认值 1,开启
innodb_file_per_table = 1

# 说明:The path where InnoDB creates undo tablespaces。通常等于 undo log 文件的存放目录。
# 默认值 ./;自行设置
innodb_undo_directory = /usr/local/mysql-5.7.21/log

# 说明:The number of undo tablespaces used by InnoDB 等于 undo log 文件数量。5.7.21 后开始弃用
# 默认值为 0,建议默认值就好,不用调整了。
innodb_undo_tablespaces = 0

# 说明:定义 undo 使用的回滚段数量。5.7.19 后弃用
# 默认值 128,建议不动,以后弃用了。
innodb_undo_logs = 128

# 说明:5.7.5 后开始使用,在线收缩 undo log 使用的空间。
# 默认值:关闭,建议值:开启
innodb_undo_log_truncate = 1

# 说明:结合 innodb_undo_log_truncate,实现 undo 空间收缩功能
# 默认值:1G,建议值,不改。
innodb_max_undo_log_size = 1G

# 说明:重作日志文件的存放目录
innodb_log_group_home_dir = /usr/local/mysql-5.7.21/log

# 说明:日志文件的大小
# 默认值:48M,建议值:根据你系统的磁盘空间和日志增长情况调整大小
innodb_log_file_size = 128M

# 说明:日志组中的文件数量,mysql 以循环方式写入日志
# 默认值 2,建议值:根据你系统的磁盘空间和日志增长情况调整大小
innodb_log_files_in_group = 3

# 此参数确定些日志文件所用的内存大小,以 M 为单位。缓冲区更大能提高性能,但意外的故障将会丢失数据。MySQL 开发人员建议设置为 1-8M 之间
innodb_log_buffer_size = 16M

# 说明:可以控制 log 从系统 buffer 刷入磁盘文件的刷新频率,增大可减轻系统负荷
# 默认值是 1;建议值不改。系统性能一般够用。
innodb_flush_log_at_timeout = 1

# 说明:参数可设为 0,1,2;
# 参数 0:表示每秒将 log buffer 内容刷新到系统 buffer 中,再调用系统 flush 操作写入磁盘文件。
# 参数 1:表示每次事务提交,redo log 都直接持久化到磁盘。
# 参数 2:表示每次事务提交,隔 1 秒后再将 redo log 持久化到磁盘。
# 建议设置成 1,这样可以保证 MySQL 异常重启之后数据不丢失。
innodb_flush_log_at_trx_commit = 1

# 说明:限制 Innodb 能打开的表的数据,如果库里的表特别多的情况,请增加这个。
# 值默认是 2000,建议值:参考数据库表总数再进行调整,一般够用不用调整。
innodb_open_files = 8192

# innodb 处理 io 读写的后台并发线程数量,根据 cpu 核来确认,取值范围:1-64
# 默认值:4,建议值:与逻辑 cpu 数量的一半保持一致。
innodb_read_io_threads = 4
innodb_write_io_threads = 4

# 默认设置为 0,表示不限制并发数,这里推荐设置为 0,更好去发挥 CPU 多核处理能力,提高并发量
innodb_thread_concurrency = 0

# 默认值为 4,建议不变。InnoDB 中的清除操作是一类定期回收无用数据的操作。mysql 5.5 之后,支持多线程清除操作。
innodb_purge_threads = 4

# 说明:mysql 缓冲区分为 new blocks 和 old blocks;此参数表示 old blocks 占比;
# 默认值:37,建议值,一般不动
innodb_old_blocks_pct = 37

# 说明:新数据被载入缓冲池,进入 old pages 链区,当 1 秒后再次访问,则提升进入 new pages 链区。
# 默认值:1000
innodb_old_blocks_time=1000

# 说明:开启异步 io,可以提高并发性,默认开启。
# 默认值为 1,建议不动
innodb_use_native_aio = 1

# 说明:默认为空,使用 data 目录,一般不改。
innodb_data_home_dir=/usr/local/mysql-5.7.21/data

# 说明:Defines the name,size,and attributes of InnoDB system tablespace data files。
# 默认值,不指定,默认为 ibdata1:12M:autoextend
innodb_data_file_path = ibdata1:12M:autoextend

# 说明:设置了 InnoDB 存储引擎用来存放数据字典信息以及一些内部数据结构的内存空间大小,除非你的数据对象及其多,否则一般默认不改。
# innodb_additional_mem_pool_size = 16M
# 说明:The crash recovery mode。只有紧急情况需要恢复数据的时候,才改为大于 1-6 之间数值,含义查下官网。
# 默认值为 0;
#innodb_force_recovery = 0

# MyISAM 引擎配置
# -------------------------------------------------------------------------------

# 指定索引缓冲区的大小,为 MYISAM 数据表开启供线程共享的索引缓存,对 INNODB 引擎无效。相当影响 MyISAM 的性能。
# 不要将其设置大于你可用内存的 30%,因为一部分内存同样被 OS 用来缓冲行数据
# 甚至在你并不使用 MyISAM 表的情况下,你也需要仍旧设置起 8-64M 内存由于它同样会被内部临时磁盘表使用。
# 默认值 8M,建议值:对于内存在 4GB 左右的服务器该参数可设置为 256M 或 384M。注意:该参数值设置的过大反而会是服务器整体效率降低!
key_buffer_size = 64M

# 为每个扫描 MyISAM 的线程分配参数设置的内存大小缓冲区。
# 默认值 128kb,建议值:16G 内存建议 1M,4G:128kb 或者 256kb 吧
# 注意,该缓冲区是每个连接独占的,所以总缓冲区大小为 128kb*连接数;极端情况 128kb*maxconnectiosns,会超级大,所以要考虑日常平均连接数。
# 一般不需要太关心该数值,稍微增大就可以了,
read_buffer_size = 262144

# 支持任何存储引擎
# MySQL 的随机读缓冲区大小,适当增大,可以提高性能。
# 默认值 256kb;建议值:得参考连接数,16G 内存,有人推荐 8M
# 注意,该缓冲区是每个连接独占的,所以总缓冲区大小为 128kb*连接数;极端情况 128kb*maxconnectiosns,会超级大,所以要考虑日常平均连接数。
read_rnd_buffer_size = 1M

# order by 或 group by 时用到
# 支持所有引擎,innodb 和 myisam 有自己的 innodb_sort_buffer_size 和 myisam_sort_buffer_size 设置
# 默认值 256kb;建议值:得参考连接数,16G 内存,有人推荐 8M。
# 注意,该缓冲区是每个连接独占的,所以总缓冲区大小为 1M*连接数;极端情况 1M*maxconnectiosns,会超级大。所以要考虑日常平均连接数。
sort_buffer_size = 1M

# 此缓冲被使用来优化全联合(full JOINs 不带索引的联合)
# 类似的联合在极大多数情况下有非常糟糕的性能表现,但是将此值设大能够减轻性能影响。
# 通过 “Select_full_join” 状态变量查看全联合的数量
# 注意,该缓冲区是每个连接独占的,所以总缓冲区大小为 1M*连接数;极端情况 1M*maxconnectiosns,会超级大。所以要考虑日常平均连接数。
# 默认值 256kb;建议值:16G 内存,设置 8M。
join_buffer_size = 1M

# 缓存 linux 文件描述符信息,加快数据文件打开速度
# 它影响 myisam 表的打开关闭,但是不影响 innodb 表的打开关闭。
# 默认值 2000,建议值:根据状态变量 Opened_tables 去设定
table_open_cache = 2000

# 缓存表定义的相关信息,加快读取表信息速度
# 默认值 1400,最大值 2000,建议值:基本不改。
table_definition_cache = 1400

# 该参数是 myssql 5.6 后引入的,目的是提高并发。
# 默认值 1,建议值:cpu 核数,并且<=16
table_open_cache_instances = 2

# 当客户端断开之后,服务器处理此客户的线程将会缓存起来以响应下一个客户而不是销毁。可重用,减小了系统开销。
# 默认值为 9,建议值:两种取值方式,方式一,根据物理内存,1G —> 8;2G —> 16; 3G —> 32; >3G —> 64;
# 方式二,根据 show status like 'threads%',查看 Threads_connected 值。
thread_cache_size = 16

# 默认值 256k,建议值:16/32G 内存,512kb,其他一般不改变,如果报错:Thread stack overrun,就增大看看,
# 注意,每个线程分配内存空间,所以总内存空间。。。你懂得。
thread_stack = 512k


[client]
socket = /var/lib/mysql/mysql.sock
port = 3306
  • GENERAL

    • datadir - mysql 数据文件所在目录
    • socket - scoket 文件
    • pid_file - PID 文件
    • user - 启动 mysql 服务进程的用户
    • port - 服务端口号,默认 3306
    • default_storage_engine - mysql 5.1 之后,默认引擎是 InnoDB
    • default_time_zone - 默认时区。中国大部分地区在东八区,即 +8:00
    • character_set_server - 数据库默认字符集
    • collation_server - 数据库字符集对应一些排序等规则,注意要和 character_set_server 对应
  • LOG

    • log_error - 错误日志文件地址
    • slow_query_log - 错误日志文件地址
  • InnoDB

    • innodb_buffer_pool_size - InnoDB 使用一个缓冲池来保存索引和原始数据,不像 MyISAM。这里你设置越大,你在存取表里面数据时所需要的磁盘 I/O 越少。
      • 在一个独立使用的数据库服务器上,你可以设置这个变量到服务器物理内存大小的 60%-80%
      • 注意别设置的过大,会导致 system 的 swap 空间被占用,导致操作系统变慢,从而减低 sql 查询的效率
      • 默认值:128M,建议值:物理内存的 60%-80%
    • innodb_log_file_size - 日志文件的大小。默认值:48M,建议值:根据你系统的磁盘空间和日志增长情况调整大小
    • innodb_file_per_table - 说明:mysql5.7 之后默认开启,意思是,每张表一个独立表空间。默认值 1,开启。
    • innodb_flush_method - 说明:控制着 innodb 数据文件及 redo log 的打开、刷写模式,三种模式:fdatasync(默认),O_DSYNC,O_DIRECT。默认值为空,建议值:使用 SAN 或者 raid,建议用 O_DIRECT,不懂测试的话,默认生产上使用 O_DIRECT
      • fdatasync:数据文件,buffer pool->os buffer->磁盘;日志文件,buffer pool->os buffer->磁盘;
      • O_DSYNC: 数据文件,buffer pool->os buffer->磁盘;日志文件,buffer pool->磁盘;
      • O_DIRECT: 数据文件,buffer pool->磁盘; 日志文件,buffer pool->os buffer->磁盘;
  • MyIsam

    • key_buffer_size - 指定索引缓冲区的大小,为 MYISAM 数据表开启供线程共享的索引缓存,对 INNODB 引擎无效。相当影响 MyISAM 的性能。
      • 不要将其设置大于你可用内存的 30%,因为一部分内存同样被 OS 用来缓冲行数据
      • 甚至在你并不使用 MyISAM 表的情况下,你也需要仍旧设置起 8-64M 内存由于它同样会被内部临时磁盘表使用。
      • 默认值 8M,建议值:对于内存在 4GB 左右的服务器该参数可设置为 256M 或 384M。
      • 注意:该参数值设置的过大反而会是服务器整体效率降低!
  • OTHER

    • tmp_table_size - 内存临时表的最大值,默认 16M,此处设置成 128M
    • max_heap_table_size - 用户创建的内存表的大小,默认 16M,往往和 tmp_table_size 一起设置,限制用户临时表大小。超限的话,MySQL 就会自动地把它转化为基于磁盘的 MyISAM 表,存储在指定的 tmpdir 目录下,增大 IO 压力,建议内存大,增大该数值。
    • query_cache_type - 这个系统变量控制着查询缓存功能的开启和关闭,0 表示关闭,1 表示打开,2 表示只要 select 中明确指定 SQL_CACHE 才缓存。
    • query_cache_size - 默认值 1M,优点是查询缓存可以极大的提高服务器速度,如果你有大量的相同的查询并且很少修改表。缺点:在你表经常变化的情况下或者如果你的查询原文每次都不同,查询缓存也许引起性能下降而不是性能提升。
    • max_connections - 最大连接数,可设最大值 16384,一般考虑根据同时在线人数设置一个比较综合的数字,鉴于该数值增大并不太消耗系统资源,建议直接设 10000。如果在访问时经常出现 Too Many Connections 的错误提示,则需要增大该参数值
    • thread_cache - 当客户端断开之后,服务器处理此客户的线程将会缓存起来以响应下一个客户而不是销毁。可重用,减小了系统开销。默认值为 9,建议值:两种取值方式,
      • 方式一,根据物理内存,1G —> 8;2G —> 16; 3G —> 32; >3G —> 64;
      • 方式二,根据 show status like ‘threads%’,查看 Threads_connected 值。
    • open_files_limit - MySQL 打开的文件描述符限制,默认最小 1024;
      • 当 open_files_limit 没有被配置的时候,比较 max_connections*5 和 ulimit -n 的值,哪个大用哪个,
      • 当 open_file_limit 被配置的时候,比较 open_files_limit 和 max_connections*5 的值,哪个大用哪个
      • 注意:仍然可能出现报错信息 Can’t create a new thread;此时观察系统 cat /proc/mysql 进程号/limits,观察进程 ulimit 限制情况
      • 过小的话,考虑修改系统配置表,/etc/security/limits.conf/etc/security/limits.d/90-nproc.conf

常见问题

Too many connections

现象

尝试连接 Mysql 时,遇到 Too many connections 错误。

原因

数据库连接线程数超过最大值,访问被拒绝。

解决方案

如果实际连接线程数过大,可以考虑增加服务器节点来分流;如果实际线程数并不算过大,那么可以配置 max_connections 来增加允许的最大连接数。需要注意的是,连接数不宜过大,一般来说,单库每秒有 2000 个并发连接时,就可以考虑扩容了,健康的状态应该维持在每秒 1000 个并发连接左右。

(1)查看最大连接数

1
2
3
4
5
6
7
mysql> show variables like '%max_connections%';
+------------------------+-------+
| Variable_name | Value |
+------------------------+-------+
| max_connections | 151 |
| mysqlx_max_connections | 100 |
+------------------------+-------+

(2)查看服务器响应的最大连接数

1
2
3
4
5
6
7
mysql> show global status like 'Max_used_connections';
+----------------------+-------+
| Variable_name | Value |
+----------------------+-------+
| Max_used_connections | 142 |
+----------------------+-------+
1 row in set (0.00 sec)

(3)临时设置最大连接数

1
set GLOBAL max_connections=256;

注意:当服务器重启时,最大连接数会被重置。

(4)永久设置最大连接数

修改 /etc/my.cnf 配置文件,在 [mysqld] 添加以下配置:

1
max_connections=256

重启 mysql 以生效

(5)修改 Linux 最大文件数限制

设置了最大连接数,如果还是没有生效,考虑检查一下 Linux 最大文件数

Mysql 最大连接数会受到最大文件数限制,vim /etc/security/limits.conf,添加 mysql 用户配置

1
2
mysql hard nofile 65535
mysql soft nofile 65535

(6)检查 LimitNOFILE

如果是使用 rpm 方式安装 mysql,检查 mysqld.service 文件中的 LimitNOFILE 是否配置的太小。

时区(time_zone)偏差

现象

数据库中存储的 Timestamp 字段值比真实值少了 13 个小时。

原因

  • 当 JDBC 与 MySQL 开始建立连接时,会获取服务器参数。
  • 当 MySQL 的 time_zone 值为 SYSTEM 时,会取 system_time_zone 值作为协调时区,若得到的是 CST 那么 Java 会误以为这是 CST -0500 ,因此会给出错误的时区信息(国内一般是CST +0800,即东八区)。

查看时区方法:

通过 show variables like '%time_zone%'; 命令查看 Mysql 时区配置:

1
2
3
4
5
6
7
mysql> show variables like '%time_zone%';
+------------------+--------+
| Variable_name | Value |
+------------------+--------+
| system_time_zone | CST |
| time_zone | SYSTEM |
+------------------+--------+

解决方案

方案一

1
2
3
4
5
mysql> set global time_zone = '+08:00';
Query OK, 0 rows affected (0.00 sec)

mysql> set time_zone = '+08:00';
Query OK, 0 rows affected (0.00 sec)

方案二

修改 my.cnf 文件,在 [mysqld] 节下增加 default-time-zone='+08:00' ,然后重启。

数据表损坏如何修复

使用 myisamchk 来修复,具体步骤:

  1. 修复前将 mysql 服务停止。
  2. 打开命令行方式,然后进入到 mysql 的 bin 目录。
  3. 执行 myisamchk –recover 数据库所在路 /*.MYI

使用 repair table 或者 OPTIMIZE table 命令来修复,REPAIR TABLE table_name 修复表 OPTIMIZE TABLE table_name 优化表 REPAIR TABLE 用于修复被破坏的表。 OPTIMIZE TABLE 用于回收闲置的数据库空间,当表上的数据行被删除时,所占据的磁盘空间并没有立即被回收,使用了 OPTIMIZE TABLE 命令后这些空间将被回收,并且对磁盘上的数据行进行重排(注意:是磁盘上,而非数据库)

数据结构

问题现象:ERROR 1071: Specified key was too long; max key length is 767 bytes

问题原因:Mysql 默认情况下单个列的索引不能超过 767 位(不同版本可能存在差异) 。

解决方法:优化索引结构,索引字段不宜过长。

脚本

这里推荐我写的几个一键运维脚本,非常方便,欢迎使用:

参考资料

Iptables 应用

iptables 是一个配置 Linux 内核 防火墙 的命令行工具,是 netfilter 项目的一部分。 可以直接配置,也可以通过许多前端和图形界面配置。

iptables 也经常代指该内核级防火墙。iptables 用于 ipv4ip6tables 用于 ipv6

nftables 已经包含在 Linux kernel 3.13 中,以后会取代 iptables 成为主要的 Linux 防火墙工具。

环境:CentOS7

简介

iptables 可以检测、修改、转发、重定向和丢弃 IPv4 数据包

过滤 IPv4 数据包的代码已经内置于内核中,并且按照不同的目的被组织成 的集合。 由一组预先定义的 组成,包含遍历顺序规则。每一条规则包含一个谓词的潜在匹配和相应的动作(称为 目标),如果谓词为真,该动作会被执行。也就是说条件匹配。

安装 iptables

(1)禁用 firewalld

CentOS 7 上默认安装了 firewalld 作为防火墙,使用 iptables 建议关闭并禁用 firewalld。

1
2
systemctl stop firewalld
systemctl disable firewalld

(2)安装 iptables

1
yum install -y iptables-services

(3)服务管理

  • 查看服务状态:systemctl status iptables
  • 启用服务:systemctl enable iptables
  • 禁用服务:systemctl disable iptables
  • 启动服务:systemctl start iptables
  • 重启服务:systemctl restart iptables
  • 关闭服务: systemctl stop iptables

命令

基本语法:

1
iptables(选项)(参数)

基本选项说明:

参数 作用
-P 设置默认策略:iptables -P INPUT (DROP
-F 清空规则链
-L 查看规则链
-A 在规则链的末尾加入新规则
-I num 在规则链的头部加入新规则
-D num 删除某一条规则
-s 匹配来源地址 IP/MASK,加叹号”!”表示除这个 IP 外。
-d 匹配目标地址
-i 网卡名称 匹配从这块网卡流入的数据
-o 网卡名称 匹配从这块网卡流出的数据
-p 匹配协议,如 tcp,udp,icmp
–dport num 匹配目标端口号
–sport num 匹配来源端口号

顺序:

1
iptables -t 表名 <-A/I/D/R> 规则链名 [规则号] <-i/o 网卡名> -p 协议名 <-s 源IP/源子网> --sport 源端口 <-d 目标IP/目标子网> --dport 目标端口 -j 动作

iptables 示例

清空当前的所有规则和计数

1
2
3
iptables -F  # 清空所有的防火墙规则
iptables -X # 删除用户自定义的空链
iptables -Z # 清空计数

配置允许 ssh 端口连接

1
2
iptables -A INPUT -s 192.168.1.0/24 -p tcp --dport 22 -j ACCEPT
# 22为你的ssh端口, -s 192.168.1.0/24表示允许这个网段的机器来连接,其它网段的ip地址是登陆不了你的机器的。 -j ACCEPT表示接受这样的请求

允许本地回环地址可以正常使用

1
2
3
iptables -A INPUT -i lo -j ACCEPT
#本地圆环地址就是那个127.0.0.1,是本机上使用的,它进与出都设置为允许
iptables -A OUTPUT -o lo -j ACCEPT

设置默认的规则

1
2
3
iptables -P INPUT DROP # 配置默认的不让进
iptables -P FORWARD DROP # 默认的不允许转发
iptables -P OUTPUT ACCEPT # 默认的可以出去

配置白名单

1
2
3
iptables -A INPUT -p all -s 192.168.1.0/24 -j ACCEPT  # 允许机房内网机器可以访问
iptables -A INPUT -p all -s 192.168.140.0/24 -j ACCEPT # 允许机房内网机器可以访问
iptables -A INPUT -p tcp -s 183.121.3.7 --dport 3380 -j ACCEPT # 允许183.121.3.7访问本机的3380端口

开启相应的服务端口

1
2
3
iptables -A INPUT -p tcp --dport 80 -j ACCEPT # 开启80端口,因为web对外都是这个端口
iptables -A INPUT -p icmp --icmp-type 8 -j ACCEPT # 允许被ping
iptables -A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT # 已经建立的连接得让它进来

保存规则到配置文件中

1
2
3
cp /etc/sysconfig/iptables /etc/sysconfig/iptables.bak # 任何改动之前先备份,请保持这一优秀的习惯
iptables-save > /etc/sysconfig/iptables
cat /etc/sysconfig/iptables

列出已设置的规则

iptables -L [-t 表名][链名]

  • 四个表名 rawnatfiltermangle
  • 五个规则链名 INPUTOUTPUTFORWARDPREROUTINGPOSTROUTING
  • filter 表包含INPUTOUTPUTFORWARD三个规则链
1
2
3
4
5
6
iptables -L -t nat                  # 列出 nat 上面的所有规则
# ^ -t 参数指定,必须是 raw, nat,filter,mangle 中的一个
iptables -L -t nat --line-numbers # 规则带编号
iptables -L INPUT

iptables -L -nv # 查看,这个列表看起来更详细

清除已有规则

1
2
3
4
iptables -F INPUT  # 清空指定链 INPUT 上面的所有规则
iptables -X INPUT # 删除指定的链,这个链必须没有被其它任何规则引用,而且这条上必须没有任何规则。
# 如果没有指定链名,则会删除该表中所有非内置的链。
iptables -Z INPUT # 把指定链,或者表中的所有链上的所有计数器清零。

删除已添加的规则

1
2
# 添加一条规则
iptables -A INPUT -s 192.168.1.5 -j DROP

将所有 iptables 以序号标记显示,执行:

1
iptables -L -n --line-numbers

比如要删除 INPUT 里序号为 8 的规则,执行:

1
iptables -D INPUT 8

开放指定的端口

1
2
3
4
5
6
7
8
9
iptables -A INPUT -s 127.0.0.1 -d 127.0.0.1 -j ACCEPT               #允许本地回环接口(即运行本机访问本机)
iptables -A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT #允许已建立的或相关连的通行
iptables -A OUTPUT -j ACCEPT #允许所有本机向外的访问
iptables -A INPUT -p tcp --dport 22 -j ACCEPT #允许访问22端口
iptables -A INPUT -p tcp --dport 80 -j ACCEPT #允许访问80端口
iptables -A INPUT -p tcp --dport 21 -j ACCEPT #允许ftp服务的21端口
iptables -A INPUT -p tcp --dport 20 -j ACCEPT #允许FTP服务的20端口
iptables -A INPUT -j reject #禁止其他未允许的规则访问
iptables -A FORWARD -j REJECT #禁止其他未允许的规则访问

屏蔽 IP

1
2
3
4
5
iptables -A INPUT -p tcp -m tcp -s 192.168.0.8 -j DROP  # 屏蔽恶意主机(比如,192.168.0.8
iptables -I INPUT -s 123.45.6.7 -j DROP #屏蔽单个IP的命令
iptables -I INPUT -s 123.0.0.0/8 -j DROP #封整个段即从123.0.0.1到123.255.255.254的命令
iptables -I INPUT -s 124.45.0.0/16 -j DROP #封IP段即从123.45.0.1到123.45.255.254的命令
iptables -I INPUT -s 123.45.6.0/24 -j DROP #封IP段即从123.45.6.1到123.45.6.254的命令是

指定数据包出去的网络接口

只对 OUTPUT,FORWARD,POSTROUTING 三个链起作用。

1
iptables -A FORWARD -o eth0

查看已添加的规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
iptables -L -n -v
Chain INPUT (policy DROP 48106 packets, 2690K bytes)
pkts bytes target prot opt in out source destination
5075 589K ACCEPT all -- lo * 0.0.0.0/0 0.0.0.0/0
191K 90M ACCEPT tcp -- * * 0.0.0.0/0 0.0.0.0/0 tcp dpt:22
1499K 133M ACCEPT tcp -- * * 0.0.0.0/0 0.0.0.0/0 tcp dpt:80
4364K 6351M ACCEPT all -- * * 0.0.0.0/0 0.0.0.0/0 state RELATED,ESTABLISHED
6256 327K ACCEPT icmp -- * * 0.0.0.0/0 0.0.0.0/0

Chain FORWARD (policy ACCEPT 0 packets, 0 bytes)
pkts bytes target prot opt in out source destination

Chain OUTPUT (policy ACCEPT 3382K packets, 1819M bytes)
pkts bytes target prot opt in out source destination
5075 589K ACCEPT all -- * lo 0.0.0.0/0 0.0.0.0/0

启动网络转发规则

公网210.14.67.7让内网192.168.188.0/24上网

1
iptables -t nat -A POSTROUTING -s 192.168.188.0/24 -j SNAT --to-source 210.14.67.127

端口映射

本机的 2222 端口映射到内网 虚拟机的 22 端口

1
iptables -t nat -A PREROUTING -d 210.14.67.127 -p tcp --dport 2222  -j DNAT --to-dest 192.168.188.115:22

字符串匹配

比如,我们要过滤所有 TCP 连接中的字符串test,一旦出现它我们就终止这个连接,我们可以这么做:

1
2
3
4
5
6
7
8
9
10
11
12
iptables -A INPUT -p tcp -m string --algo kmp --string "test" -j REJECT --reject-with tcp-reset
iptables -L

# Chain INPUT (policy ACCEPT)
# target prot opt source destination
# REJECT tcp -- anywhere anywhere STRING match "test" ALGO name kmp TO 65535 reject-with tcp-reset
#
# Chain FORWARD (policy ACCEPT)
# target prot opt source destination
#
# Chain OUTPUT (policy ACCEPT)
# target prot opt source destination

阻止 Windows 蠕虫的攻击

1
iptables -I INPUT -j DROP -p tcp -s 0.0.0.0/0 -m string --algo kmp --string "cmd.exe"

防止 SYN 洪水攻击

1
iptables -A INPUT -p tcp --syn -m limit --limit 5/second -j ACCEPT

参考资料