Dunwu Blog

大道至简,知易行难

Java 并发面试一

并发简介

【简单】并发和并行有什么区别?

  • 什么是并发?
  • 什么是并行?
  • 并发和并行有什么区别?

并发和并行是最容易让新手费解的概念,那么如何理解二者呢?其最关键的差异在于:是否是同时发生:

  • 并发是指具备处理多个任务的能力,但不一定要同时
  • 并行是指具备同时处理多个任务的能力

下面是我见过最生动的说明,摘自 并发与并行的区别是什么?——知乎的高票答案

  • 你吃饭吃到一半,电话来了,你一直到吃完了以后才去接,这就说明你不支持并发也不支持并行。
  • 你吃饭吃到一半,电话来了,你停了下来接了电话,接完后继续吃饭,这说明你支持并发。
  • 你吃饭吃到一半,电话来了,你一边打电话一边吃饭,这说明你支持并行。

【简单】同步和异步有什么区别?

  • 什么是同步?
  • 什么是异步?
  • 同步和异步有什么区别?
  • 同步:顺序执行,必须等待当前任务完成才能继续,会阻塞后续操作。
  • 异步:不等待当前任务完成,直接执行后续操作,任务完成后通过回调/通知返回结果。

比喻:

  • 同步就像是打电话:不挂电话,通话不会结束。
  • 异步就像是发短信:发完短信后,就可以做其他事;当收到回复短信时,手机会通过铃声或振动来提醒。

【简单】阻塞和非阻塞有什么区别?

  • 什么是阻塞?
  • 阻塞和非阻塞有什么区别?

阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态:

  • 阻塞:是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。
  • 非阻塞:是指在不能立刻得到结果之前,该调用不会阻塞当前线程。

比喻:

  • 阻塞:排队等奶茶,不拿到不走;
  • 非阻塞:点完奶茶去逛街,店员短信通知后再取。

【中等】进程、线程、协程、管程有什么区别?

进程、线程、协程、管程对比:

概念 定义 特点 适用场景
进程 可视为一个正在运行的程序 独立内存空间
切换开销大
进程间通信(IPC)较复杂
需要高隔离性的任务(如浏览器多标签)
线程 CPU 调度的基本单位(属于进程) 共享进程内存
切换开销较小
需同步(锁)避免竞态
高并发任务(如Web服务器处理请求)
协程 用户态轻量级线程(协作式调度) 无内核切换开销
由程序员控制切换(yield
单线程内并发
I/O密集型高并发(如爬虫、异步编程)
管程 管理共享资源的同步机制(如锁、条件变量) 封装线程同步逻辑
避免手动操作锁(如Java synchronized
多线程共享资源(如线程安全的数据结构)

小结

  • 进程:隔离性强但开销大。
  • 线程:CPU 调度的基本单位,共享内存但需同步。
  • 协程:用户态线程,高效但需主动让出控制权。
  • 管程:同步工具,简化多线程资源共享。

进程和线程的差异:

  • 一个程序至少有一个进程,一个进程至少有一个线程。
  • 线程比进程划分更细,所以执行开销更小,并发性更高
  • 进程是一个实体,拥有独立的资源;而同一个进程中的多个线程共享进程的资源。

img

JVM 在单个进程中运行,JVM 中的线程共享属于该进程的堆。这就是为什么几个线程可以访问同一个对象。线程共享堆并拥有自己的堆栈空间。这是一个线程如何调用一个方法以及它的局部变量是如何保持线程安全的。但是堆不是线程安全的并且为了线程安全必须进行同步。

【中等】Java 线程和操作系统的线程有什么区别?

以下是 Java 线程与操作系统线程的区别对比表:

对比维度 Java 线程 操作系统线程
抽象层级 JVM 层面的用户态抽象(现代 JVM 1:1 映射到 OS 线程) 内核直接管理的原生线程(内核态)
调度机制 依赖 OS 调度,但可通过协程(如虚拟线程)优化 完全由内核抢占式调度
创建/切换开销 高(需系统调用),但线程池可优化 高(上下文切换涉及用户态-内核态切换)
并发模型 支持 1:1(默认)和 M:N(虚拟线程) 仅 1:1,并发数受内核限制
平台依赖性 跨平台(JVM 统一行为,底层实现因 OS 而异) 直接依赖 OS 和硬件特性(如线程优先级实现不同)
同步机制 高级抽象(如synchronized,映射为 OS 原语) 底层原语(如pthread_mutex
栈内存占用 默认 1MB(可调),虚拟线程仅 KB 级 Linux 默认 8MB(不可跨线程共享)
典型应用场景 通用并发编程,高并发推荐虚拟线程 直接系统编程,需精细控制线程行为的场景

补充说明

  1. 现代 JVM:HotSpot 等主流 JVM 默认将 Java 线程与 OS 线程1:1 绑定,但虚拟线程(Project Loom)实现M:N 映射,显著提升并发能力。
  2. 性能关键点
    • Java 线程的阻塞操作(如 I/O)会阻塞 OS 线程,而虚拟线程通过挂起避免资源浪费。
    • OS 线程数量过多会导致内存和调度开销激增,Java 线程池或虚拟线程可缓解。

【中等】单核 CPU 支持 Java 多线程吗?

单核 CPU 可以支持 Java 多线程,但多个线程无法真正并行执行,而是通过时间片轮转(分时调度)在单个 CPU 核心上交替运行,实现并发(Concurrency)而非并行(Parallelism)

这里顺带提一下 Java 使用的线程调度方式。

操作系统主要通过两种线程调度方式来管理多线程的执行:

  • 抢占式调度(Preemptive Scheduling):操作系统决定何时暂停当前正在运行的线程,并切换到另一个线程执行。这种切换通常是由系统时钟中断(时间片轮转)或其他高优先级事件(如 I/O 操作完成)触发的。这种方式存在上下文切换开销,但公平性和 CPU 资源利用率较好,不易阻塞。
  • 协同式调度(Cooperative Scheduling):线程执行完毕后,主动通知系统切换到另一个线程。这种方式可以减少上下文切换带来的性能开销,但公平性较差,容易阻塞。

Java 使用的线程调度是抢占式的。也就是说,JVM 本身不负责线程的调度,而是将线程的调度委托给操作系统。操作系统通常会基于线程优先级和时间片来调度线程的执行,高优先级的线程通常获得 CPU 时间片的机会更多。

【简单】并发一定比串行更快吗?

并发不一定比串行更快!关键看场景:

并发更快的情况

  • 📶 I/O 密集型:网络/磁盘操作时,CPU 可切换做其他事
  • 多核 CPU:真正并行执行计算任务

串行更快的情况

  • 🔢 单核 CPU 计算:线程切换反而增加开销
  • 🔒 高竞争场景:锁争用导致线程空等
  • 🎯 简单任务:并发管理开销超过收益

黄金法则

  • I/O 多用并发,计算多用多核
  • 避免无脑加线程,合理控制并发度

【简单】什么是并发安全?有哪些线程不安全的情况?

::: info 什么是并发安全?
:::

并发最重要的问题是并发安全问题。所谓并发安全,是指保证程序的正确性,使得并发处理结果符合预期。

并发安全需要保证几个基本特性:

  • 可见性 - 是一个线程修改了某个共享变量,其状态能够立即被其他线程知晓,通常被解释为将线程本地状态反映到主内存上,volatile 就是负责保证可见性的。
  • 原子性 - 简单说就是相关操作不会中途被其他线程干扰,一般通过同步机制(加锁:sychronizedLock)实现。
  • 有序性 - 是保证线程内串行语义,避免指令重排等。

::: info 有哪些线程不安全的情况?
:::

  • 竞态条件:多线程同时修改共享变量(如 count++
  • 非原子操作:多步骤操作被中断(如 if(x==null) x=new Object()
  • 可见性问题:线程 A 的修改对线程 B 不可见
  • 死锁:多个线程互相持有对方需要的锁
  • 资源泄漏:线程未释放资源(如连接、文件)

::: info 线程不安全有哪些解决办法?
:::

  • 同步:synchronizedLock
  • 原子类:AtomicInteger
  • 不可变对象:final
  • 并发容器:ConcurrentHashMap

核心:减少共享数据,合理加锁

【中等】为什么会有并发安全问题?

(1)缓存导致的可见性问题

一个线程对共享变量的修改,另外一个线程能够立刻看到,称为 可见性

在单核时代,所有的线程都是在一颗 CPU 上执行,CPU 缓存与内存的数据一致性容易解决。

多核时代,每颗 CPU 都有自己的缓存,这时 CPU 缓存与内存的数据一致性就没那么容易解决了,当多个线程在不同的 CPU 上执行时,这些线程操作的是不同的 CPU 缓存。

(2)线程切换带来的原子性问题

Java 的并发也是基于任务切换。Java 中,即使是一条语句,也可能需要执行多条 CPU 指令。一个或者多个操作在 CPU 执行的过程中不被中断的特性称为原子性

CPU 能保证的原子操作是 CPU 指令级别的,而不是高级语言的操作符。违背直觉的是,高级语言里一条语句往往需要多条 CPU 指令完成,例如上面代码中的count += 1,至少需要三条 CPU 指令。

  • 指令 1:首先,需要把变量 count 从内存加载到 CPU 的寄存器;
  • 指令 2:之后,在寄存器中执行+1 操作;
  • 指令 3:最后,将结果写入内存(缓存机制导致可能写入的是 CPU 缓存而不是内存)。

因此,执行 count += 1 不是原子操作。

(3)编译优化带来的有序性问题

有序性指的是程序按照代码的先后顺序执行。编译器为了优化性能,有时候会改变程序中语句的先后顺序,例如程序中:a=6; b=7; 编译器优化后可能变成 b=7; a=6;,在这个例子中,编译器调整了语句的顺序,但是不影响程序的最终结果。不过有时候编译器及解释器的优化可能导致意想不到的 Bug。

【中等】哪些场景需要额外注意线程安全问题?

  • 访问共享变量或资源 - 典型的场景有访问共享对象的属性,访问 static 静态变量,访问共享的缓存,等等。因为这些信息不仅会被一个线程访问到,还有可能被多个线程同时访问,那么就有可能在并发读写的情况下发生线程安全问题。
  • 依赖时序的操作 - 如果我们操作的正确性是依赖时序的,而在多线程的情况下又不能保障执行的顺序和我们预想的一致,这个时候就会发生线程安全问题。
  • 不同数据之间存在绑定关系 - 有时候,不同数据之间是成组出现的,存在着相互对应或绑定的关系,最典型的就是 IP 和端口号。有时候我们更换了 IP,往往需要同时更换端口号,如果没有把这两个操作绑定在一起,就有可能出现单独更换了 IP 或端口号的情况,而此时信息如果已经对外发布,信息获取方就有可能获取一个错误的 IP 与端口绑定情况,这时就发生了线程安全问题。
  • 对方没有声明自己是线程安全的 - 在我们使用其他类时,如果对方没有声明自己是线程安全的,那么这种情况下对其他类进行多线程的并发操作,就有可能会发生线程安全问题。举个例子,比如说我们定义了 ArrayList,它本身并不是线程安全的,如果此时多个线程同时对 ArrayList 进行并发读/写,那么就有可能会产生线程安全问题,造成数据出错,而这个责任并不在 ArrayList,因为它本身并不是并发安全的。

【困难】什么是死锁?如何发现死锁?如何避免死锁?

::: info 什么是死锁?
:::

死锁一组互相竞争资源的线程因互相等待,导致“永久”阻塞的现象

产生死锁的四个必要条件:

  • 互斥:该资源任意一个时刻只由一个线程占用。
  • 占有并等待:一个线程因请求资源而阻塞时,对已获得的资源保持不放。
  • 不可抢占:线程已获得的资源在未使用完之前不能被其他线程强行剥夺,只有自己使用完毕后才释放资源。
  • 循环等待:若干线程之间形成一种头尾相接的循环等待资源关系。

::: info 如何发现死锁?
:::

(1)使用 jstack 工具

  • 运行程序后,执行命令:

    1
    jstack <PID>  # PID 是 Java 进程 ID
  • 如果存在死锁,输出会显示 Found one Java-level deadlock,并列出死锁的线程和资源。

(2)使用 ThreadMXBean 检测(代码方式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;

public class DeadlockDetector {
public static void main(String[] args) {
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
long[] deadlockedThreads = threadMXBean.findDeadlockedThreads(); // 检测死锁线程
if (deadlockedThreads != null) {
System.out.println("发现死锁!涉及线程:");
for (long threadId : deadlockedThreads) {
System.out.println(threadId);
}
} else {
System.out.println("无死锁。");
}
}
}

输出示例:

1
2
3
发现死锁!涉及线程:
12345
67890

(3)使用 VisualVM 或 JConsole(可视化工具)

连接 Java 进程后,查看线程选项卡,死锁会被明确标记。

::: info 如何避免死锁?
:::

如何预防死锁? 破坏死锁的产生的必要条件即可:

  • 互斥:难以避免
  • 占有并等待:一次性申请所有资源
  • 不可抢占:超时释放锁
  • 循环等待:按序申请资源

如何避免死锁?

避免死锁就是在资源分配时,借助于算法(比如银行家算法)对资源分配进行计算评估,使其进入安全状态。

安全状态 指的是系统能够按照某种线程推进顺序(P1、P2、P3……Pn)来为每个线程分配所需资源,直到满足每个线程对资源的最大需求,使每个线程都可顺利完成。称 <P1、P2、P3.....Pn> 序列为安全序列。

【中等】什么是活锁?如何避免活锁?

::: info 什么是活锁?
:::

活锁是一个递归的情况,两个或更多的线程会不断重复一个特定的代码逻辑。预期的逻辑通常为其他线程提供机会继续支持’this’线程。

想象这样一个例子:两个人在狭窄的走廊里相遇,二者都很礼貌,试图移到旁边让对方先通过。但是他们最终在没有取得任何进展的情况下左右摇摆,因为他们都在同一时间向相同的方向移动。

如图所示:两个线程想要通过一个 Worker 对象访问共享公共资源的情况,但是当他们看到另一个 Worker(在另一个线程上调用)也是“活动的”时,它们会尝试将该资源交给其他工作者并等待为它完成。如果最初我们让两名工作人员都活跃起来,他们将会面临活锁问题。

::: info 如何避免活锁?
:::

解决“活锁”的方案很简单,谦让时,尝试等待一个随机的时间就可以了。由于等待的时间是随机的,所以同时相撞后再次相撞的概率就很低了。“等待一个随机时间”的方案虽然很简单,却非常有效,Raft 这样知名的分布式一致性算法中也用到了它。

【中等】什么是饥饿问题?如何避免饥饿?

::: info 什么是饥饿问题?
:::

定义:某些线程由于长期无法获取所需资源(如 CPU 时间、锁、I/O 等),导致任务无法执行或执行缓慢

与死锁/活锁的区别

  • 死锁:所有相关线程都被阻塞,无法继续。
  • 活锁:线程在运行,但无法取得进展。
  • 饥饿:部分线程能正常运行,但某些线程长期得不到资源。

饥饿的常见原因

原因 示例
线程优先级不合理 高优先级线程总是抢占 CPU,低优先级线程长期得不到执行。
锁竞争不公平 某些线程总是抢不到锁(如synchronized是非公平锁)。
资源分配不均 线程池任务调度不合理,某些任务被长时间搁置。
I/O 或网络阻塞 某些线程因 I/O 操作被阻塞,而其他线程持续占用 CPU。

::: info 如何避免饥饿?
:::

(1)使用公平锁(Fair Lock)

  • ReentrantLock 支持公平策略,避免某些线程长期抢不到锁。

    1
    ReentrantLock fairLock = new ReentrantLock(true); // true 表示公平锁
  • synchronized 是非公平的,无法直接设置公平性。

(2)合理设置线程优先级

  • 避免滥用高优先级,尽量让所有线程有机会执行。

  • Java 线程优先级(1~10,默认 5):

    1
    thread.setPriority(Thread.NORM_PRIORITY); // 5

(3)避免长时间占用资源

  • 减少锁的持有时间,尽量只在必要时加锁。

  • 使用 tryLock() 设置超时,防止无限等待:

    1
    2
    3
    4
    if (lock.tryLock(100, TimeUnit.MILLISECONDS)) {
    try { /* 临界区 */ }
    finally { lock.unlock(); }
    }

(4)优化线程池任务调度

  • 使用 newFixedThreadPoolnewCachedThreadPool 时,结合 BlockingQueue 避免任务堆积。
  • 可改用 ForkJoinPool 进行任务拆分,提高公平性。

(5)监控与调整

  • 使用 VisualVM、JConsole 等工具观察线程状态,发现长期阻塞的线程。
  • 结合日志分析,优化资源分配策略。

【简单】简单介绍一下 Java 并发编程?

并发编程可以抽象成三个核心问题:分工、同步、互斥。

  • 分工 - 是指如何高效地拆解任务并分配给线程。
  • 同步 - 是指线程之间如何协作。
  • 互斥 - 是指保证同一时刻只允许一个线程访问共享资源。

Java 的 java.util.concurrent 包(简称 J.U.C)中提供了大量并发工具类,是 Java 并发能力的主要体现(注意,不是全部,有部分并发能力的支持在其他包中)。从功能上,大致可以分为:

  • 原子类 - 如:AtomicIntegerAtomicIntegerArrayAtomicReferenceAtomicStampedReference 等。
  • - 如:ReentrantLockReentrantReadWriteLock 等。
  • 并发容器 - 如:ConcurrentHashMapCopyOnWriteArrayListCopyOnWriteArraySet 等。
  • 阻塞队列 - 如:ArrayBlockingQueueLinkedBlockingQueue 等。
  • 非阻塞队列 - 如: ConcurrentLinkedQueueLinkedTransferQueue 等。
  • 线程池 - 如:ThreadPoolExecutorExecutors 等。

J.U.C 包中的工具类是基于 synchronizedvolatileCASThreadLocal 这样的并发核心机制打造的。所以,要想深入理解 J.U.C 工具类的特性、为什么具有这样那样的特性,就必须先理解这些核心机制。

Java 线程

【中等】Java 线程生命周期有哪些状态?状态之间如何切换?

java.lang.Thread.State 中定义了 6 种不同的线程状态,在给定的一个时刻,线程只能处于其中的一个状态。

以下是各状态的说明,以及状态间的联系:

  • 开始(NEW) - 尚未调用 start 方法的线程处于此状态。此状态意味着:创建的线程尚未启动
  • 可运行(RUNNABLE) - 已经调用了 start 方法的线程处于此状态。此状态意味着,线程已经准备好了,一旦被线程调度器分配了 CPU 时间片,就可以运行线程。
    • 在操作系统层面,线程有 READY 和 RUNNING 状态;而在 JVM 层面,只能看到 RUNNABLE 状态,所以 Java 系统一般将这两个状态统称为 RUNNABLE(运行中) 状态 。
  • 阻塞(BLOCKED) - 此状态意味着:线程处于被阻塞状态。表示线程在等待 synchronized 的隐式锁(Monitor lock)。synchronized 修饰的方法、代码块同一时刻只允许一个线程执行,其他线程只能等待,即处于阻塞状态。当占用 synchronized 隐式锁的线程释放锁,并且等待的线程获得 synchronized 隐式锁时,就又会从 BLOCKED 转换到 RUNNABLE 状态。
  • 等待(WAITING) - 此状态意味着:线程无限期等待,直到被其他线程显式地唤醒。 阻塞和等待的区别在于,阻塞是被动的,它是在等待获取 synchronized 的隐式锁。而等待是主动的,通过调用 Object.wait 等方法进入。
    • 进入:Object.wait();退出:Object.notify / Object.notifyAll
    • 进入:Thread.join();退出:被调用的线程执行完毕
    • 进入:LockSupport.park();退出:LockSupport.unpark
  • 定时等待(TIMED_WAITING) - 等待指定时间的状态。一个线程处于定时等待状态,是由于执行了以下方法中的任意方法:
    • 进入:Thread.sleep(long);退出:时间结束
    • 进入:Object.wait(long);退出:时间结束 / Object.notify / Object.notifyAll
    • 进入:Thread.join(long);退出:时间结束 / 被调用的线程执行完毕
    • 进入:LockSupport.parkNanos(long);退出:LockSupport.unpark
    • 进入:LockSupport.parkUntil(long);退出:LockSupport.unpark
  • 终止 (TERMINATED) - 线程 run() 方法执行结束,或者因异常退出了 run() 方法,则该线程结束生命周期。死亡的线程不可再次复生。

👉 扩展阅读:

【中等】Java 中,创建线程有几种方式?

一般来说,创建线程有很多种方式,例如:

  • 实现 Runnable 接口(推荐)
  • 继承 Thread 类(不推荐,因为不灵活,Java 不支持多继承)
  • 实现 Callable 接口 + FutureTask,支持返回值
  • 通过线程池(生产环境推荐)
  • 使用 CompletableFuture

虽然,看似有多种多样的创建线程方式。但是,**从本质上来说,Java 就只有一种方式可以创建线程,那就是通过 new Thread().start() 创建。不管是哪种方式,最终还是依赖于 new Thread().start()**。

👉 扩展阅读:大家都说 Java 有三种创建线程的方式!并发编程中的惊天骗局!

【简单】可以直接调用 Thread.run() 方法么?

可以直接调用 Thread.run() 方法,但是它的行为和普通方法一样,不会启动新线程去执行。调用 start() 方法方可启动线程并使线程进入就绪状态,直接执行 run() 方法的话不会以多线程的方式执行。

  • run() 方法是线程的执行体
  • start() 方法负责启动线程,然后 JVM 会让这个线程去执行 run() 方法

【简单】一个线程两次调用 Thread.start() 方法会怎样?

Java 的线程是不允许启动两次的,**第二次调用 Thread.start() 会抛出 IllegalThreadStateException**。

【简单】Thread.sleep()Thread.yield()Thread.join()Object.wait() 有什么区别?

方法 所属类 作用 是否释放锁 使用场景
Thread.sleep(long ms) Thread 让当前线程暂停执行指定时间(不释放 CPU 资源) ❌ 不释放锁 模拟耗时操作、定时任务
Thread.yield() Thread 提示调度器让出 CPU,但可能立即重新竞争(不保证让出) ❌ 不释放锁 优化线程调度,减少竞争(极少使用)
Thread.join() Thread 等待目标线程执行完毕(阻塞当前线程) ❌ 不释放锁 线程顺序执行,如主线程等待子线程结束
Object.wait() Object 释放锁并进入等待,直到 notify()/notifyAll() 唤醒 ✅ 释放锁 线程间通信(需在 synchronized 块中使用)

锁的释放

  • wait() 会释放锁,其他方法不会。
  • sleep()yield() 仅影响线程调度,不涉及锁。

唤醒机制

  • wait() 需依赖 notify()/notifyAll() 或超时唤醒。
  • sleep()join() 超时后自动恢复。
  • yield() 立刻重新参与竞争。

用途

  • sleep():固定时间暂停(如定时任务)。
  • yield():礼貌让出 CPU(实际开发很少用)。
  • join():线程依赖(如主线程等待子线程)。
  • wait():线程间协作(生产者-消费者模型)。

👉 扩展阅读:Java 并发编程:线程间协作的两种方式:wait、notify、notifyAll 和 Condition

【中等】为什么 Thread.sleep()Thread.yield() 设计为静态方法?

Thread.sleep()Thread.yield() 针对的是 Running 状态的线程,也就是说在非 Running 状态的线程上执行这两个方法没有意义。这就是为什么这两个方法被设计为静态的。它们只针对正在 Running 状态的线程工作,避免程序员错误的认为可以在其他非 Running 状态线程上调用。

👉 扩展阅读:Java 线程中 yield 与 join 方法的区别
👉 扩展阅读:sleep(),wait(),yield() 和 join() 方法的区别

【中等】为什么 Object.wait()Object.notify()Object.notifyAll() 被定义在 Object 类里?

因为锁是对象的,wait()/notify() 是锁的行为,所以必须定义在 Object

  • 锁基于对象:Java 的锁(synchronized)是 对象级别 的,每个对象关联一个监视器(Monitor),wait()/notify() 是监视器的核心操作,必须属于 Object

  • 任何对象都可作为锁:不仅 Thread 能作为锁,所有对象 都能作为锁,因此这些方法需定义在 Object 以保证通用性。

  • 等待队列绑定对象:调用 wait() 的线程会进入 该对象的等待队列notify() 唤醒的也是同一对象队列中的线程,与对象强绑定。

  • Thread 类职责分离Thread 类管理线程生命周期(如 sleep()join()),而 wait()/notify()线程间协作机制,属于锁(对象)的行为。

  • 设计一致性与历史原因:遵循 Monitor 模式(操作系统同步原语),保持 Thread 简洁,避免功能混淆(如 wait()sleep() 的误用)。

【中等】为什么 Object.wait()Object.notify()Object.notifyAll() 必须在 synchronized 方法/块中被调用?

当一个线程需要调用对象的 wait() 方法的时候,这个线程必须拥有该对象的锁,接着它就会释放这个对象锁并进入等待状态直到其他线程调用这个对象上的 notify() 方法。同样的,当一个线程需要调用对象的 notify() 方法时,它会释放这个对象的锁,以便其他在等待的线程就可以得到这个对象锁。

由于所有的这些方法都需要线程持有对象的锁,这样就只能通过 synchronized 来实现,所以他们只能在 synchronized 方法/块中被调用。

【中等】如何正确停止 Java 线程?

对于 Java 而言,最正确的停止线程的方式是:通过 Thread.interruptThread.isInterrupted 配合来控制线程终止

  • Thread.interrupt():设置线程的中断标志位(不会直接停止线程)。
  • Thread.isInterrupted():检查中断状态。

【示例】正确停止线程的方式——Thread.interrupt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ThreadStopDemo {

public static void main(String[] args) throws Exception {
Thread thread = new Thread(new MyTask(), "MyTask");
thread.start();
TimeUnit.MILLISECONDS.sleep(10);
thread.interrupt();
}

private static class MyTask implements Runnable {

private long count = 0L;

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 线程启动");
// 通过 Thread.interrupted 和 interrupt 配合来控制线程终止
while (!Thread.currentThread().isInterrupted() && count < 10000) {
System.out.println("count = " + count++);
}
System.out.println(Thread.currentThread().getName() + " 线程终止");
}

}

}
// 输出(count 未到 10000,线程就主动结束):
// MyTask 线程启动
// count = 0
// count = 1
// ...
// count = 840
// count = 841
// count = 842
// MyTask 线程终止

【中等】可以使用 Thread.stopThread.suspendThread.resume 停止线程吗?为什么?

Thread.stopThread.suspendThread.resume 方法已经被 Java 标记为 @Deprecated。为什么废弃呢?

  • Thread.stop 会直接把线程停止,这样就没有给线程足够的时间来处理想要在停止前保存数据的逻辑,任务戛然而止,会导致出现数据完整性等问题
  • 而对于Thread.suspendThread.resume 而言,它们的问题在于:如果线程调用 Thread.suspend,它并不会释放锁,就开始进入休眠,但此时有可能仍持有锁,这样就容易导致死锁问题。因为这把锁在线程被 Thread.resume 之前,是不会被释放的。假设线程 A 调用了 Thread.suspend 方法让线程 B 挂起,线程 B 进入休眠,而线程 B 又刚好持有一把锁,此时假设线程 A 想访问线程 B 持有的锁,但由于线程 B 并没有释放锁就进入休眠了,所以对于线程 A 而言,此时拿不到锁,也会陷入阻塞,那么线程 A 和线程 B 就都无法继续向下执行。

【示例】Thread.stop 终止线程,导致线程任务戛然而止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class ThreadStopErrorDemo {

public static void main(String[] args) {
MyTask thread = new MyTask();
thread.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 终止线程
thread.stop();
// 确保线程终止后,才执行下面的代码
while (thread.isAlive()) { }
// 输出两个计数器的最终状态
thread.print();
}

/**
* 持有两个计数器,run 方法中每次执行都会使计数器自增
*/
private static class MyTask extends Thread {

private int i = 0;

private int j = 0;

@Override
public void run() {
synchronized (this) {
++i;
try {
// 模拟耗时操作
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
++j;
}
}

public void print() {
System.out.println("i=" + i + " j=" + j);
}

}

}

【中等】使用 volatile 标记方式停止线程正确吗?

使用 volatile 标记方式仅适用于简单场景(无阻塞、无锁竞争)。推荐 Thread.interruptThread.isInterrupted 方式停止线程:更通用,可处理阻塞操作,是 Java 线程停止的标准方式。

volatile 标记停止线程适用场景(正确使用)

  • 非阻塞循环
    • 线程在 while (!stopped) 循环中运行,且 无阻塞操作(如 sleep()wait()、I/O)。
    • volatile 保证标志位 (stopped) 的修改对所有线程 立即可见
  • 短周期任务:适用于 纯计算型任务高频检查标志位 的场景。

volatile 标记停止线程不适用场景(可能失效)

  • 线程被阻塞(如 sleep()wait()、I/O):阻塞期间无法检测 volatile 标志位,必须等阻塞结束才能退出。
  • 依赖外部资源(如锁竞争、网络请求):即使 stopped=true,线程可能因锁或 I/O 阻塞无法立即退出。

当我们使用 volatile 变量来控制线程的停止,通常是通过设置一个 volatile 标志位来告诉线程停止执行。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MyTask extends Thread {
private volatile boolean canceled = false;

public void run() {
while (!canceled) {
// 执行任务
}
}

public void stopTask() {
canceled = true;
}
}

在上述例子中,canceled 是一个 volatile 变量,用来控制线程的停止。虽然这种方式在某些情况下可以工作,但它并不是一个可靠的停止线程的方式,因为在多线程环境中,其他线程修改 canceled 的值时,可能会出现竞态条件,导致线程无法正确停止

【中等】Java 线程之间如何进行通信?

在 Java 中,线程间通信(Inter-Thread Communication, ITC)是指多个线程之间协调工作、共享数据或传递消息的机制。常见的线程通信方式包括以下几种:

通信方式 核心机制 适用场景 特点
共享变量 volatile/synchronized 简单状态标记 需处理竞态条件
wait()/notify() 对象监视器 生产者-消费者 需手动同步
BlockingQueue 内置锁和条件队列 生产者-消费者 无需手动同步
CountDownLatch 计数器 主线程等待子线程 一次性
CyclicBarrier 屏障 多线程同步 可重复使用
Semaphore 许可证 限流/资源池 控制并发数
管道流 字节流 线程间数据传输 效率较低

推荐选择

  • 需要高效数据交换 → BlockingQueue
  • 线程协作 → wait()/notify()CountDownLatch
  • 资源控制 → Semaphore
  • 避免重复造轮子,优先使用 JUC(java.util.concurrent)工具类!

【简单】高优先级的 Java 线程一定先执行吗?

Java 中的线程优先级的范围是 [1,10],一般来说,高优先级的线程在运行时会具有优先权。可以通过 thread.setPriority(Thread.MAX_PRIORITY) 的方式设置,默认优先级为 5

即使设置了线程的优先级,也无法保证高优先级的线程一定先执行。这是因为 Java 线程优先级依赖于操作系统的支持,然而,不同的操作系统支持的线程优先级并不相同,不能很好的和 Java 中线程优先级一一对应。因此,Java 线程优先级控制并不可靠。

Java 内存模型

【中等】什么是 Java 内存模型?

Java Memory Model (JMM) 是 Java 规范定义的一套多线程内存访问规则,用于解决并发编程中的可见性、原子性、有序性问题。目的是让 Java 程序在不同硬件和操作系统上都能正确执行并发操作。

CPU、内存、I/O 设备存在很大的速度差异 - CPU 远快于内存,内存远快于 I/O 设备。

为了合理利用 CPU 的高性能,平衡这三者的速度差异,计算机体系机构、操作系统、编译程序都做出了贡献,主要体现为:

  • CPU 增加了缓存,以均衡与 CPU 内存的速度差异;
  • 编译程序优化指令执行次序,使得缓存能够得到更加合理地利用。
  • 操作系统增加了进程、线程,以分时复用 CPU,进而均衡 CPU 与 I/O 的速度差异;

缓存一致性

缓存导致的可见性问题,编译优化带来的有序性问题,线程切换带来的原子性问题。

为了解决缓存一致性问题,需要各个处理器访问缓存时都遵循一些协议,在读写时要根据协议来进行操作

指令重排序

为了使缓存得到更加合理地使用,计算机在执行程序代码的时候,会对指令进行重排序。常见的指令重排序有下面 2 种情况:

  • 编译器优化重排:编译器在不改变单线程语义的前提下调整语句顺序。
  • 指令并行重排:处理器利用指令级并行技术(ILP)调整指令执行顺序(无数据依赖时)。

Java 源代码会经历 编译器优化重排 —> 指令并行重排 —> 内存系统重排 的过程,最终才变成操作系统可执行的指令序列。指令重排序可以保证串行语义一致,但是没有义务保证多线程间的语义也一致 ,所以在多线程下,指令重排序可能会导致一些问题。

解决方案:

  • 编译器:禁止特定类型的编译器重排序。
  • 处理器:通过插入内存屏障(Memory Barrier/Fence)禁止特定处理器重排序。

👉 扩展阅读:全面理解 Java 内存模型

【困难】什么是 Happens-Before 规则?有什么用?

JMM 为程序中所有的操作定义了一个偏序关系,称之为 先行发生原则(Happens-Before)Happens-Before 是 JMM 的核心规则,用于约束指令重排序和保证多线程可见性。

Happens-Before 非常重要,它是判断数据是否存在竞争、线程是否安全的主要依据,依靠这个原则,我们可以通过几条规则一揽子地解决并发环境下两个操作间是否可能存在冲突的所有问题。

  1. 程序顺序规则:单线程内代码顺序执行(但不影响多线程重排序)。
  2. volatile 规则:**volatile 写** Happens-Before 后续的 volatilevolatile 保证可见性 + 禁止指令重排序
  3. 锁规则解锁 Happens-Before 后续的加锁(如 synchronizedReentrantLock)。
  4. 线程启动规则:**Thread.start()** Happens-Before 线程内的所有操作
  5. 线程终止规则线程中的所有操作 Happens-Before Thread.join() 完成
  6. 线程中断规则:**Thread.interrupt()** Happens-Before 被中断线程检测到中断(isInterrupted()InterruptedException
  7. 对象终结规则对象的构造函数执行结束 Happens-Before finalize() 方法被调用
  8. 传递性:若 A → B 且 B → C,则 A → C。

1978 年,Lamport 在论文 Time, Clocks, and the Ordering of Events in a Distributed System译文解读 )中第一次提出了 Happens-Before,阐述了偏序关系(partial ordering)、逻辑时钟(Logical Clocks)概念,提出解决分布式系统中区分事件发生的时序问题的方法。Happens-Before 的语义是一种因果关系:如果 A 事件是导致 B 事件的起因,那么 A 事件一定是先于(Happens-Before)B 事件发生的。

【困难】什么是 Java 内存屏障?有什么用?

内存屏障(Memory Barrier/Fence)是 JMM 的底层机制,通过 限制重排序强制缓存同步,实现多线程程序的 可见性有序性

  • 禁止特定类型的指令重排序(编译器和处理器优化可能导致乱序执行)。
  • 强制刷新 CPU 缓存,确保多线程间的 内存可见性

JVM 依赖底层 CPU 的内存屏障指令(如 x86 的 mfence/lfence/sfence),抽象为以下四种:

  • LoadLoad:确保 Load1 的读取操作在 Load2 及后续读取之前完成。 示例:volatile 读后的普通读。
  • StoreStore:确保 Store1 的写入操作在 Store2 及后续写入之前对其他线程可见。示例:volatile 写前的普通写。
  • LoadStore:确保 Load1 的读取操作在 Store2 及后续写入之前完成。
  • StoreLoad:确保 Store1 的写入对所有线程可见后,才执行 Load2 的读取。 开销最大(如 volatile 写后的 volatile 读会插入此屏障)。

内存屏障的应用场景

  • volatile 变量
    • 写操作:插入 StoreStore + StoreLoad 屏障。
    • 读操作:插入 LoadLoad + LoadStore 屏障。
  • synchronized
    • 进入临界区(加锁)和退出(解锁)时插入屏障,保证可见性和有序性。
  • final 字段
    • 构造函数中的 final 字段写入后插入屏障,确保正确初始化对其他线程可见。

内存屏障的作用

  • 禁止重排序:防止编译器和 CPU 优化破坏多线程逻辑(如单例模式的 DCL 问题)。
  • 保证可见性:强制将工作内存的修改刷回主内存,并失效其他线程的缓存。
  • 保证有序性:确保临界区代码按预期顺序执行(如 happens-before 规则的实现基础)。

底层实现

  • x86 CPUStoreLoad 对应 mfence 指令,其他屏障通常无实际指令(因 x86 强内存模型已满足大部分需求)。
  • ARM/PowerPC:弱内存模型需显式插入更多屏障指令。
  • JVM 的封装:通过 Unsafe 类提供 loadFence()/storeFence()/fullFence() 方法(如 VarHandle 内部使用)。

示例:volatile 的屏障插入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
volatile int flag = 0;
int value = 0;

void write() {
value = 42; // 普通写
// StoreStore 屏障(确保 value=42 先刷入主内存)
flag = 1; // volatile 写
// StoreLoad 屏障(保证写操作对所有线程可见)
}

void read() {
if (flag == 1) { // volatile 读
// LoadLoad + LoadStore 屏障
System.out.println(value); // 保证读到 value=42
}
}

【中等】volatile 有什么作用?

volatile 是轻量级的线程同步工具。**volatile 可以保证可见性和有序性,但不保证原子性**。适用于状态标志、DCL 单例等场景。

注意事项

  • 不要滥用:仅适用于简单状态同步,复杂操作仍需锁或原子类。
  • 不适用于复合操作:如 check-then-act(需 synchronized 或 CAS)。

::: info 保证可见性

:::

  • 强制线程每次读取 volatile 变量时,直接从主内存获取最新值(跳过工作内存缓存)。
  • 强制线程每次写入 volatile 变量时,立即同步到主内存,使其他线程立即可见。

::: info 禁止指令重排序

:::

  • 通过插入 内存屏障(Memory Barrier) 禁止编译器和 CPU 对 volatile 变量的读写操作进行重排序。
  • 双重检查锁(DCL)单例模式 中必须用 volatile 修饰实例变量,防止对象未初始化完成就被使用。

::: info 不保证原子性

:::

volatile **不能替代 synchronized**,例如 volatile int i++; 仍存在竞态条件(需用 AtomicInteger)。

适用场景:单线程写、多线程读 的变量(如开关标志)。

::: info volatile 底层实现原理

:::

  • 写操作:插入 StoreStore + StoreLoad 屏障,确保写入前所有操作完成,且结果全局可见。
  • 读操作:插入 LoadLoad + LoadStore 屏障,确保读取后所有操作依赖最新值。

::: info volatile 应用场景

:::

状态标志位

1
2
3
4
volatile boolean running = true;

void stop() { running = false; } // 线程 A
void run() { while (running) { ... } } // 线程 B

双重检查锁(DCL)

1
2
3
4
5
6
7
8
9
10
11
12
13
class Singleton {
private static volatile Singleton instance;
static Singleton getInstance() {
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton(); // 禁止重排序
}
}
}
return instance;
}
}

发布不可变对象

1
volatile Map<String, String> config = readConfig(); // 保证引用可见性

【中等】volatile 能完全保证并发安全吗?

线程安全需要具备:可见性、原子性、顺序性。**volatile 不保证原子性,所以决定了它不能彻底地保证线程安全**。

我们通过下面的代码即可证明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class VolatileAtomicityDemo {
public volatile static int inc = 0;

public void increase() {
inc++;
}

public static void main(String[] args) throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
VolatileAtomicityDemo volatileAtomicityDemo = new VolatileAtomicityDemo();
for (int i = 0; i < 5; i++) {
threadPool.execute(() -> {
for (int j = 0; j < 500; j++) {
volatileAtomicityDemo.increase();
}
});
}
// 等待 1.5 秒,保证上面程序执行完成
Thread.sleep(1500);
System.out.println(inc);
threadPool.shutdown();
}
}

正常情况下,运行上面的代码理应输出 2500。但你真正运行了上面的代码之后,你会发现每次输出结果都小于 2500

为什么会出现这种情况呢?不是说好了,volatile 可以保证变量的可见性嘛!

也就是说,如果 volatile 能保证 inc++ 操作的原子性的话。每个线程中对 inc 变量自增完之后,其他线程可以立即看到修改后的值。5 个线程分别进行了 500 次操作,那么最终 inc 的值应该是 5*500=2500。

很多人会误认为自增操作 inc++ 是原子性的,实际上,inc++ 其实是一个复合操作,包括三步:

  1. 读取 inc 的值。
  2. 对 inc 加 1。
  3. 将 inc 的值写回内存。

volatile 是无法保证这三个操作是具有原子性的,有可能导致下面这种情况出现:

  1. 线程 1 对 inc 进行读取操作之后,还未对其进行修改。线程 2 又读取了 inc 的值并对其进行修改(+1),再将 inc 的值写回内存。
  2. 线程 2 操作完毕后,线程 1 对 inc 的值进行修改(+1),再将 inc 的值写回内存。

这也就导致两个线程分别对 inc 进行了一次自增操作后,inc 实际上只增加了 1。

其实,如果想要保证上面的代码运行正确也非常简单,利用 synchronizedLock 或者 AtomicInteger 都可以。

使用 synchronized 改进:

1
2
3
public synchronized void increase() {
inc++;
}

使用 AtomicInteger 改进:

1
2
3
4
5
public AtomicInteger inc = new AtomicInteger();

public void increase() {
inc.getAndIncrement();
}

使用 ReentrantLock 改进:

1
2
3
4
5
6
7
8
9
Lock lock = new ReentrantLock();
public void increase() {
lock.lock();
try {
inc++;
} finally {
lock.unlock();
}
}

【中等】volatilesynchronized 有什么区别?volatile 能替代 synchronized 吗?

volatile 无法替代 synchronized ,因为 volatile 无法保证操作的原子性

volatile 和 synchronized 的特性区别

特性 volatile synchronized
原子性 ❌ 不保证(如 i++ ✅ 保证
可见性 ✅ 强制主内存读写 ✅ 通过锁机制保证
有序性 ✅ 禁止重排序 ✅ 串行化执行
性能 ⚡ 轻量级(无锁) 🔒 较重(上下文切换)

volatile 和 synchronized 的实现区别

  • volatile
    • 通过 内存屏障 禁止指令重排序
    • 强制 CPU 缓存失效 保证可见性
    • 底层使用 LoadLoad/StoreStore 等屏障指令
  • synchronized
    • 通过 Monitor 监视器锁(对象头 Mark Word)
    • 包含 偏向锁→轻量级锁→重量级锁 的升级过程
    • 保证 代码块/方法 的排他性访问

【中等】synchronized 有什么作用?

synchronized 是 Java 最基础的线程同步机制,通过 原子性、可见性、有序性 保障线程安全,适用于需要 强一致性 的场景,但需合理控制锁粒度以避免性能问题。

synchronized 有 3 种应用方式:

  • 同步实例方法 - 对于普通同步方法,锁是当前实例对象
  • 同步静态方法 - 对于静态同步方法,锁是当前类的 Class 对象
  • 同步代码块 - 对于同步方法块,锁是 synchonized 括号里配置的对象

【中等】synchronized 的实现原理是什么?

synchronized 的底层实现涉及 Java 对象头、Monitor(监视器)、锁升级机制 等。

synchronized 修饰代码块时,在代码块前后植入 monitorenter 和 monitorexit 字节码指令,相当于加锁和解锁

synchronized 修饰方法时,会在方法的访问标志上设置一个 ACC_SYNCHRONIZED 标记。线程每次访问方法,会进行检查,若设置了 ACC_SYNCHRONIZED 标记,执行线程将先持有 Monitor 对象,然后再执行方法。在该方法运行期间,其它线程将无法获取到该 Mointor 对象,当方法执行完成后,再释放该 Monitor 对象。

(1)对象头与 Mark Word

每个 Java 对象在内存中由 对象头(Header)、实例数据(Instance Data)、对齐填充(Padding) 组成。
synchronized 的锁信息存储在 对象头Mark Word 中,主要包括:

  • 锁状态(无锁、偏向锁、轻量级锁、重量级锁)
  • 持有锁的线程 ID
  • GC 分代年龄
  • 哈希码(HashCode)

Mark Word 记录了对象和锁有关的信息。Mark Word 在 64 位 JVM 中的长度是 64bit,我们可以一起看下 64 位 JVM 的存储结构是怎么样的。如下图所示:

img

(2)Monitor(监视器)

每个 Java 对象都关联一个 Monitor(监视器),用于实现同步机制。Monitor 的主要结构:

  • **_owner**:持有锁的线程
  • **_EntryList**:等待获取锁的线程队列(阻塞状态)
  • **_WaitSet**:调用 wait() 后进入等待状态的线程队列

【困难】JDK6 对synchronized 进行了哪些优化?

JDK 6 以后,synchronized 做了大量的优化,其性能已经与 LockReadWriteLock 基本上持平

::: info 锁升级

:::

JDK 1.6 后,synchronized 采用 锁升级 机制优化性能,避免直接使用重量级锁带来的性能损耗。锁的状态变化如下:

锁状态 适用场景 实现方式
无锁 初始状态 Mark Word 无锁标记
偏向锁 单线程访问 Mark Word 记录线程 ID
轻量级锁 少量线程竞争 CAS 自旋
重量级锁 高并发竞争 操作系统 Mutex 锁

偏向锁

  • 适用场景:只有一个线程访问同步块。
  • 实现方式
    • 在 Mark Word 中记录 线程 ID,后续该线程进入时无需 CAS 操作。
    • 如果其他线程尝试获取锁,偏向锁会 撤销(Revoke)并升级为轻量级锁。

轻量级锁

  • 适用场景:少量线程竞争,且线程交替执行。
  • 实现方式
    • 线程通过 CAS(Compare-And-Swap) 尝试获取锁。
    • 如果失败,会进行 自旋(Spin)(循环尝试),避免直接进入阻塞状态。
    • 如果自旋失败,升级为 重量级锁

重量级锁

  • 适用场景:高并发竞争。
  • 实现方式
    • 依赖 操作系统 Mutex 锁(互斥量)。
    • 未获取锁的线程会被 挂起(Blocked),进入 _EntryList 等待唤醒。

Mark Word 记录了对象和锁有关的信息。Mark Word 在 64 位 JVM 中的长度是 64bit,我们可以一起看下 64 位 JVM 的存储结构是怎么样的。如下图所示:

img

锁升级功能主要依赖于 Mark Word 中的锁标志位和释放偏向锁标志位,synchronized 同步锁就是从偏向锁开始的,随着竞争越来越激烈,偏向锁升级到轻量级锁,最终升级到重量级锁。

::: info 锁消除

:::

锁消除是指在即时编译(JIT)时,JVM 会对代码进行逃逸分析。如果发现一段代码中使用的锁对象不会逃逸到方法外部,也就是其他线程无法访问到该锁对象,那么 JVM 会认为该锁是无意义的,从而将锁的代码消除,避免不必要的锁竞争,提高程序的性能。

锁消除实现原理

(1)逃逸分析:JVM 会分析对象的作用域。如果一个对象在方法内部创建,并且不会被外部方法引用,那么这个对象就不会逃逸出该方法。

(2)锁消除:由于 StringBufferappend 方法是 synchronized 方法,但 sb 对象不会逃逸,JVM 经过逃逸分析后,会将 append 方法中的锁代码消除,从而避免了锁的开销。

【示例】锁消除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class LockEliminationExample {
public static String concatString(String s1, String s2, String s3) {
// 创建一个 StringBuffer 对象,它不会逃逸出该方法
StringBuffer sb = new StringBuffer();
sb.append(s1);
sb.append(s2);
sb.append(s3);
return sb.toString();
}

public static void main(String[] args) {
String result = concatString("Hello", " ", "World");
System.out.println(result);
}
}

在这个示例中,StringBuffer 对象 sb 只在 concatString 方法内部使用,不会被其他方法访问。因此,JVM 在即时编译时会进行逃逸分析,并将 append 方法中的锁代码消除。

::: info 锁粗化

:::

锁粗化是指:在 JIT 编译器动态编译时,如果发现几个相邻的同步块使用的是同一个锁实例,那么 JIT 编译器将会把这几个同步块合并为一个大的同步块,从而避免一个线程“反复申请、释放同一个锁“所带来的性能开销。

如果一系列的连续操作都对同一个对象反复加锁和解锁,频繁的加锁操作就会导致性能损耗。

【中等】final 关键字可以保证线程的可见性吗?

final 本身不能直接保证线程间的可见性

但 final 修饰的字段在正确初始化后,对其他线程是可见的(JMM 保证)。对象构造完成时,final 字段的初始化值对所有线程立即可见。不需要额外的同步措施(如 volatile/synchronized)。

final 的线程可见性仅限于初始化阶段,适用于:

  • 声明不可变常量(如 final int MAX = 100
  • 构造线程安全对象(如 final AtomicReference

如果需要持续可见性(如状态标志位),仍需使用 volatile 或同步机制。

非 final 字段对比:

1
2
3
4
class Example {
final int x = 42; // 构造后所有线程看到x=42
int y = 10; // 其他线程可能看到y=0(默认值)或10
}

底层实现机制

  • JVM 会插入内存屏障:确保 final 字段初始化后对所有线程可见。
  • 与 happens-before 规则关联:对象构造结束 happens-before 于其他线程看到该对象。

使用限制

场景 是否线程安全 说明
final 基本类型 ✔️ 安全 int/long 等初始化后不可变
final 引用类型 ⚠️ 部分安全 引用不可变,但对象内部状态可能变化
非 final 字段 ❌ 不安全 需要额外同步

危险示例:

1
2
final Map<String, Integer> map = new HashMap<>();
// map引用不可变,但map.put()操作非线程安全!

最佳实践

(1)优先用 final 修饰不可变数据

1
2
3
public class SafeCounter {
private final AtomicLong count = new AtomicLong(0); // 线程安全
}

(2)需要跨线程可见的变量应使用 volatile

1
private volatile boolean running = true;

(3)避免以下错误用法

1
2
// 错误!final 不能保证对象内部线程安全
final List<String> unsafeList = new ArrayList<>();

MySQL 优化

慢查询

慢查询日志可以帮我们找到执行慢的 SQL。

可以通过以下命令查看慢查询日志是否开启:

1
2
3
4
5
6
7
mysql> show variables like '%slow_query_log';
+----------------+-------+
| Variable_name | Value |
+----------------+-------+
| slow_query_log | ON |
+----------------+-------+
1 row in set (0.02 sec)

启停慢查询日志开关:

1
2
3
4
5
# 开启慢查询日志
mysql > set global slow_query_log='ON';

# 关闭慢查询日志
mysql > set global slow_query_log='OFF';

查看慢查询的时间阈值:

1
2
3
4
5
6
7
mysql> show variables like '%long_query_time%';
+-----------------+-----------+
| Variable_name | Value |
+-----------------+-----------+
| long_query_time | 10.000000 |
+-----------------+-----------+
1 row in set (0.02 sec)

设置慢查询的时间阈值:

1
mysql > set global long_query_time = 3;

MySQL 自带了一个 mysqldumpslow 工具,用于统计慢查询日志(这个工具是个 Perl 脚本,需要先安装好 Perl)。

mysqldumpslow 命令的具体参数如下:

  • -s - 采用 order 排序的方式,排序方式可以有以下几种。分别是 c(访问次数)、t(查询时间)、l(锁定时间)、r(返回记录)、ac(平均查询次数)、al(平均锁定时间)、ar(平均返回记录数)和 at(平均查询时间)。其中 at 为默认排序方式。
  • -t - 返回前 N 条数据 。
  • -g - 后面可以是正则表达式,对大小写不敏感。

比如想要按照查询时间排序,查看前两条 SQL 语句,可以执行如下命令:

1
perl mysqldumpslow.pl -s t -t 2 "C:\ProgramData\MySQL\MySQL Server 8.0\Data\slow.log"

执行计划(EXPLAIN)

“执行计划”是对 SQL 查询语句在数据库中执行过程的描述。 如果要分析某条 SQL 的性能问题,通常需要先查看 SQL 的执行计划,排查每一步 SQL 执行是否存在问题。

很多数据库都支持执行计划,MySQL 也不例外。在 MySQL 中,用户可以通过 EXPLAIN 命令查看优化器针对指定 SQL 生成的逻辑执行计划。

【示例】MySQL 执行计划示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
mysql> explain select * from user_info where id = 2
*************************** 1. row ***************************
id: 1
select_type: SIMPLE
table: user_info
partitions: NULL
type: const
possible_keys: PRIMARY
key: PRIMARY
key_len: 8
ref: const
rows: 1
filtered: 100.00
Extra: NULL
1 row in set, 1 warning (0.00 sec)

执行计划返回结果参数说明:

  • id - SELECT 查询的标识符。每个 SELECT 都会自动分配一个唯一的标识符。
  • select_type - SELECT 查询的类型。
    • SIMPLE - 表示此查询不包含 UNION 查询或子查询。
    • PRIMARY - 表示此查询是最外层的查询。
    • UNION - 表示此查询是 UNION 的第二或随后的查询。
    • DEPENDENT UNION - UNION 中的第二个或后面的查询语句, 取决于外面的查询。
    • UNION RESULT - UNION 的结果。
    • SUBQUERY - 子查询中的第一个 SELECT
    • DEPENDENT SUBQUERY - 子查询中的第一个 SELECT, 取决于外面的查询. 即子查询依赖于外层查询的结果。
  • table - 查询的是哪个表,如果给表起别名了,则显示别名。
  • partitions - 匹配的分区。
  • type - 表示从表中查询到行所执行的方式,查询方式是 SQL 优化中一个很重要的指标,执行效率由高到低依次为:
    • system/const - 表中只有一行数据匹配。此时根据索引查询一次就能找到对应的数据。如果是 B+ 树索引,我们知道此时索引构造成了多个层级的树,当查询的索引在树的底层时,查询效率就越低。const 表示此时索引在第一层,只需访问一层便能得到数据。
    • eq_ref - 使用唯一索引扫描。常见于多表连接中使用主键和唯一索引作为关联条件。
    • ref - 非唯一索引扫描。还可见于唯一索引最左原则匹配扫描。
    • range - 索引范围扫描。比如 <>between 等操作。
    • index - 索引全表扫描。此时遍历整个索引树。
    • ALL - 表示全表扫描。需要遍历全表来找到对应的行。
  • possible_keys - 此次查询中可能选用的索引。
  • key - 此次查询中实际使用的索引。如果这一项为 NULL,说明没有使用索引。
  • ref - 哪个字段或常数与 key 一起被使用。
  • rows - 显示此查询一共扫描了多少行,这个是一个估计值。
  • filtered - 表示此查询条件所过滤的数据的百分比。
  • extra - 额外的信息。
    • Using filesort - 当查询语句中包含 GROUP BY 操作,而且无法利用索引完成排序操作的时候, 这时不得不选择相应的排序算法进行,甚至可能会通过文件排序,效率是很低的,所以要避免这种问题的出现。
    • Using temporary - 使了用临时表保存中间结果,MySQL 在对查询结果排序时使用临时表,常见于排序 ORDER BY 和分组查询 GROUP BY。效率低,要避免这种问题的出现。
    • Using index - 所需数据只需在索引即可全部获得,不须要再到表中取数据,也就是使用了覆盖索引,避免了回表操作,效率不错。

更多内容请参考:MySQL 性能优化神器 Explain 使用分析

optimizer trace

在 MySQL 5.6 及之后的版本中,我们可以使用 optimizer trace 功能查看优化器生成执行计划的整个过程。有了这个功能,我们不仅可以了解优化器的选择过程,更可以了解每一个执行环节的成本,然后依靠这些信息进一步优化查询。

如下代码所示,打开 optimizer_trace 后,再执行 SQL 就可以查询 information_schema.OPTIMIZER_TRACE 表查看执行计划了,最后可以关闭 optimizer_trace 功能:

1
2
3
4
SET optimizer_trace="enabled=on";
SELECT * FROM person WHERE NAME >'name84059' AND create_time>'2020-01-24 05:00
SELECT * FROM information_schema.OPTIMIZER_TRACE;
SET optimizer_trace="enabled=off";

SQL 优化

SQL 优化基本思路

使用 EXPLAIN 命令查看当前 SQL 是否使用了索引,优化后,再通过执行计划(EXPLAIN)来查看优化效果。

SQL 优化的基本思路:

  • 只返回必要的列 - 最好不要使用 SELECT * 语句。

  • 只返回必要的行 - 使用 WHERE 子查询语句进行过滤查询,有时候也需要使用 LIMIT 语句来限制返回的数据。

  • 缓存重复查询的数据 - 应该考虑在客户端使用缓存,尽量不要使用 MySQL 服务器缓存(存在较多问题和限制)。

  • 使用索引覆盖查询

优化分页

当需要分页操作时,通常会使用 LIMIT 加上偏移量的办法实现,同时加上合适的 ORDER BY 字句。如果有对应的索引,通常效率会不错,否则,MySQL 需要做大量的文件排序操作

一个常见的问题是当偏移量非常大的时候,比如:LIMIT 1000000 20 这样的查询,MySQL 需要查询 1000020 条记录然后只返回 20 条记录,前面的 1000000 条都将被抛弃,这样的代价非常高。

针对分页优化,有以下两种方案

(1)方案 - 延迟关联

优化这种查询一个最简单的办法就是尽可能的使用覆盖索引扫描,而不是查询所有的列。然后根据需要做一次关联查询再返回所有的列。对于偏移量很大时,这样做的效率会提升非常大。考虑下面的查询:

1
SELECT film_id,description FROM film ORDER BY title LIMIT 1000000,5;

如果这张表非常大,那么这个查询最好改成下面的样子:

1
2
3
4
SELECT film.film_id,film.description
FROM film INNER JOIN (
SELECT film_id FROM film ORDER BY title LIMIT 50,5
) AS tmp USING(film_id);

这里的延迟关联将大大提升查询效率,让 MySQL 扫描尽可能少的页面,获取需要访问的记录后在根据关联列回原表查询所需要的列。

(2)方案 - 书签方式

有时候如果可以使用书签记录上次取数据的位置,那么下次就可以直接从该书签记录的位置开始扫描,这样就可以避免使用 OFFSET,比如下面的查询:

1
2
3
4
-- 原语句
SELECT id FROM t LIMIT 1000000, 10;
-- 优化语句
SELECT id FROM t WHERE id > 1000000 LIMIT 10;

其他优化的办法还包括使用预先计算的汇总表,或者关联到一个冗余表,冗余表中只包含主键列和需要做排序的列。

优化 JOIN

优化子查询

尽量使用 JOIN 语句来替代子查询。因为子查询是嵌套查询,而嵌套查询会新创建一张临时表,而临时表的创建与销毁会占用一定的系统资源以及花费一定的时间,同时对于返回结果集比较大的子查询,其对查询性能的影响更大。

小表驱动大表

JOIN 查询时,应该用小表驱动大表。因为 JOIN 时,MySQL 内部会先遍历驱动表,再去遍历被驱动表。

比如 left join,左表就是驱动表,A 表小于 B 表,建立连接的次数就少,查询速度就被加快了。

1
select name from A left join B ;

适当冗余字段

增加冗余字段可以减少大量的连表查询,因为多张表的连表查询性能很低,所有可以适当的增加冗余字段,以减少多张表的关联查询,这是以空间换时间的优化策略

避免 JOIN 太多表

《阿里巴巴 Java 开发手册》规定不要 join 超过三张表,第一 join 太多降低查询的速度,第二 join 的 buffer 会占用更多的内存。

如果不可避免要 join 多张表,可以考虑使用数据异构的方式异构到 ES 中查询。

优化 UNION

MySQL 执行 UNION 的策略是:先创建临时表,然后将各个查询结果填充到临时表中,最后再进行查询。很多优化策略在 UNION 查询中都会失效,因为它无法利用索引。

最好将 WHERELIMIT 等子句下推到 UNION 的各个子查询中,以便优化器可以充分利用这些条件进行优化。

此外,尽量使用 UNION ALL,避免使用 UNION

UNIONUNION ALL 都是将两个结果集合并为一个,两个要联合的 SQL 语句字段个数必须一样,而且字段类型要“相容”(一致)

  • UNION 需要进行去重扫描,因此消息较低;而 UNION ALL 不会进行去重。
  • UNION 会按照字段的顺序进行排序;而 UNION ALL 只是简单的将两个结果合并就返回。

优化 COUNT() 查询

COUNT() 有两种作用:

  • 统计某个列值的数量。统计列值时,要求列值是非 NULL 的,它不会统计 NULL
  • 统计行数。

统计列值时,要求列值是非空的,它不会统计 NULL。如果确认括号中的表达式不可能为空时,实际上就是在统计行数。最简单的就是当使用 COUNT(*) 时,并不是我们所想象的那样扩展成所有的列,实际上,它会忽略所有的列而直接统计行数。

我们最常见的误解也就在这儿,在括号内指定了一列却希望统计结果是行数,而且还常常误以为前者的性能会更好。但实际并非这样,如果要统计行数,直接使用 COUNT(*),意义清晰,且性能更好。

(1)简单优化

1
2
3
4
SELECT count(*) FROM world.city WHERE id > 5;

SELECT (SELECT count(*) FROM world.city) - count(*)
FROM world.city WHERE id <= 5;

(2)使用近似值

有时候某些业务场景并不需要完全精确的统计值,可以用近似值来代替,EXPLAIN 出来的行数就是一个不错的近似值,而且执行 EXPLAIN 并不需要真正地去执行查询,所以成本非常低。通常来说,执行 COUNT() 都需要扫描大量的行才能获取到精确的数据,因此很难优化,MySQL 层面还能做得也就只有覆盖索引了。如果不还能解决问题,只有从架构层面解决了,比如添加汇总表,或者使用 Redis 这样的外部缓存系统。

优化查询方式

切分大查询

一个大查询如果一次性执行的话,可能一次锁住很多数据、占满整个事务日志、耗尽系统资源、阻塞很多小的但重要的查询。

1
DELEFT FROM messages WHERE create < DATE_SUB(NOW(), INTERVAL 3 MONTH);
1
2
3
4
5
rows_affected = 0
do {
rows_affected = do_query(
"DELETE FROM messages WHERE create < DATE_SUB(NOW(), INTERVAL 3 MONTH) LIMIT 10000")
} while rows_affected > 0

分解大连接查询

将一个大连接查询(JOIN)分解成对每一个表进行一次单表查询,然后将结果在应用程序中进行关联,这样做的好处有:

  • 让缓存更高效。对于连接查询,如果其中一个表发生变化,那么整个查询缓存就无法使用。而分解后的多个查询,即使其中一个表发生变化,对其它表的查询缓存依然可以使用。
  • 分解成多个单表查询,这些单表查询的缓存结果更可能被其它查询使用到,从而减少冗余记录的查询。
  • 减少锁竞争;
  • 在应用层进行连接,可以更容易对数据库进行拆分,从而更容易做到高性能和可扩展。
  • 查询本身效率也可能会有所提升。例如下面的例子中,使用 IN() 代替连接查询,可以让 MySQL 按照 ID 顺序进行查询,这可能比随机的连接要更高效。
1
2
3
4
SELECT * FROM tag
JOIN tag_post ON tag_post.tag_id=tag.id
JOIN post ON tag_post.post_id=post.id
WHERE tag.tag='mysql';
1
2
3
SELECT * FROM tag WHERE tag='mysql';
SELECT * FROM tag_post WHERE tag_id=1234;
SELECT * FROM post WHERE post.id IN (123,456,567,9098,8904);

索引优化

通过索引覆盖查询,可以优化排序、分组。

详情见 MySQL 索引

数据结构优化

良好的逻辑设计和物理设计是高性能的基石。

数据类型优化

数据类型优化基本原则

  • 更小的通常更好 - 越小的数据类型通常会更快,占用更少的磁盘、内存,处理时需要的 CPU 周期也更少。
    • 例如:整型比字符类型操作代价低,因而会使用整型来存储 IP 地址,使用 DATETIME 来存储时间,而不是使用字符串。
  • 简单就好 - 如整型比字符型操作代价低。
    • 例如:很多软件会用整型来存储 IP 地址。
    • 例如:**UNSIGNED 表示不允许负值,大致可以使正数的上限提高一倍**。
  • 尽量避免 NULL - 可为 NULL 的列会使得索引、索引统计和值比较都更复杂。

类型的选择

  • 整数类型通常是标识列最好的选择,因为它们很快并且可以使用 AUTO_INCREMENT

  • ENUMSET 类型通常是一个糟糕的选择,应尽量避免。

  • 应该尽量避免用字符串类型作为标识列,因为它们很消耗空间,并且通常比数字类型慢。对于 MD5SHAUUID 这类随机字符串,由于比较随机,所以可能分布在很大的空间内,导致 INSERT 以及一些 SELECT 语句变得很慢。

    • 如果存储 UUID ,应该移除 - 符号;更好的做法是,用 UNHEX() 函数转换 UUID 值为 16 字节的数字,并存储在一个 BINARY(16) 的列中,检索时,可以通过 HEX() 函数来格式化为 16 进制格式。

表设计

应该避免的设计问题:

  • 太多的列 - 设计者为了图方便,将大量冗余列加入表中,实际查询中,表中很多列是用不到的。这种宽表模式设计,会造成不小的性能代价,尤其是 ALTER TABLE 非常耗时。
  • 太多的关联 - 所谓的实体 - 属性 - 值(EAV)设计模式是一个常见的糟糕设计模式。MySQL 限制了每个关联操作最多只能有 61 张表,但 EAV 模式需要许多自关联。
  • 枚举 - 尽量不要用枚举,因为添加和删除字符串(枚举选项)必须使用 ALTER TABLE
  • 尽量避免 NULL

范式和反范式

范式化目标是尽量减少冗余,而反范式化则相反

范式化的优点:

  • 比反范式更节省空间
  • 更新操作比反范式快
  • 更少需要 DISTINCTGROUP BY 语句

范式化的缺点:

  • 通常需要关联查询。而关联查询代价较高,如果是分表的关联查询,代价更是高昂。

在真实世界中,很少会极端地使用范式化或反范式化。实际上,应该权衡范式和反范式的利弊,混合使用。

索引优化

索引优化应该是查询性能优化的最有效手段。

如果想详细了解索引特性请参考:MySQL 索引

何时使用索引

  • 对于非常小的表,大部分情况下简单的全表扫描更高效。
  • 对于中、大型表,索引非常有效。
  • 对于特大型表,建立和使用索引的代价将随之增长。可以考虑使用分区技术。
  • 如果表的数量特别多,可以建立一个元数据信息表,用来查询需要用到的某些特性。

索引优化策略

  • 索引基本原则
    • 索引不是越多越好,不要为所有列都创建索引。
    • 要尽量避免冗余和重复索引。
    • 要考虑删除未使用的索引。
    • 尽量的扩展索引,不要新建索引。
    • 频繁作为 WHERE 过滤条件的列应该考虑添加索引。
  • 独立的列 - “独立的列” 是指索引列不能是表达式的一部分,也不能是函数的参数。
  • 前缀索引 - 索引很长的字符列,可以索引开始的部分字符,这样可以大大节约索引空间。
  • 最左匹配原则 - 将选择性高的列或基数大的列优先排在多列索引最前列。
  • 使用索引来排序 - 索引最好既满足排序,又用于查找行。这样,就可以使用索引来对结果排序。
  • =IN 可以乱序 - 不需要考虑 =IN 等的顺序
  • 覆盖索引
  • 自增字段作主键

数据模型和业务

  • 表字段比较复杂、易变动、结构难以统一的情况下,可以考虑使用 Nosql 来代替关系数据库表存储,如 ElasticSearch、MongoDB。
  • 在高并发情况下的查询操作,可以使用缓存(如 Redis)代替数据库操作,提高并发性能。
  • 数据量增长较快的表,需要考虑水平分表或分库,避免单表操作的性能瓶颈。
  • 除此之外,我们应该通过一些优化,尽量避免比较复杂的 JOIN 查询操作,例如冗余一些字段,减少 JOIN 查询;创建一些中间表,减少 JOIN 查询。

参考资料

MySQL 事务

::: info 概述

不是所有的 MySQL 存储引擎都实现了事务处理。支持事务的存储引擎有:InnoDBNDB Cluster。不支持事务的存储引擎,代表有:MyISAM

用户可以根据业务是否需要事务处理(事务处理可以保证数据安全,但会增加系统开销),选择合适的存储引擎。

:::

事务简介

事务概念

“事务”指的是满足 ACID 特性的一组操作。事务内的 SQL 语句,要么全执行成功,要么全执行失败。可以通过 Commit 提交一个事务,也可以使用 Rollback 进行回滚。

ACID

ACID 是数据库事务正确执行的四个基本要素。

  • 原子性(Atomicity)
    • 事务被视为不可分割的最小单元,事务中的所有操作要么全部提交成功,要么全部失败回滚。
    • 回滚可以用日志来实现,日志记录着事务所执行的修改操作,在回滚时反向执行这些修改操作即可。
  • 一致性(Consistency)
    • 数据库在事务执行前后都保持一致性状态。
    • 在一致性状态下,所有事务对一个数据的读取结果都是相同的。
  • 隔离性(Isolation)
    • 一个事务所做的修改在最终提交以前,对其它事务是不可见的。
  • 持久性(Durability)
    • 一旦事务提交,则其所做的修改将会永远保存到数据库中。即使系统发生崩溃,事务执行的结果也不能丢失。
    • 可以通过数据库备份和恢复来实现,在系统发生奔溃时,使用备份的数据库进行数据恢复。

一个支持事务(Transaction)中的数据库系统,必需要具有这四种特性,否则在事务过程(Transaction processing)当中无法保证数据的正确性。

  • 只有满足一致性,事务的执行结果才是正确的。
  • 在无并发的情况下,事务串行执行,隔离性一定能够满足。此时只要能满足原子性,就一定能满足一致性。
  • 在并发的情况下,多个事务并行执行,事务不仅要满足原子性,还需要满足隔离性,才能满足一致性。
  • 事务满足持久化是为了能应对系统崩溃的情况。

事务操作

事务相关的语句如下:

  • BEGIN / START TRANSACTION - 用于标记事务的起始点
  • START TRANSACTION WITH CONSISTENT SNAPSHOT - 用于标记事务的起始点
  • SAVEPOINT - 用于创建保存点。方便后续针对保存点进行回滚。一个事务中可以存在多个保存点。
  • RELEASE SAVEPOINT - 删除某个保存点。
  • ROLLBACK TO - 用于回滚到指定的保存点。如果没有设置保存点,则回退到 START TRANSACTION 语句处。
  • COMMIT - 提交事务
  • SET TRANSACTION - 设置事务的隔离级别。

注意:

两种开启事务的命令,启动时机是不同的:

  • 执行了 BEGIN / START TRANSACTION 命令后,并不代表事务立刻启动,而是当执行了增删查操作时,才真正启动事务。
  • 执行了 START TRANSACTION WITH CONSISTENT SNAPSHOT 命令,会立刻启动事务。

事务处理示例:

(1)创建一张示例表

1
2
3
4
5
6
7
8
9
10
-- 撤销表 user
DROP TABLE IF EXISTS `user`;

-- 创建表 user
CREATE TABLE `user` (
`id` INT(10) UNSIGNED NOT NULL COMMENT 'Id',
`username` VARCHAR(64) NOT NULL DEFAULT 'default' COMMENT '用户名',
`password` VARCHAR(64) NOT NULL DEFAULT 'default' COMMENT '密码',
`email` VARCHAR(64) NOT NULL DEFAULT 'default' COMMENT '邮箱'
) COMMENT ='用户表';

(2)执行事务操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
-- 开始事务
START TRANSACTION;

-- 插入操作 A
INSERT INTO `user`
VALUES (1, 'root1', 'root1', 'xxxx@163.com');

-- 创建保留点 updateA
SAVEPOINT `updateA`;

-- 插入操作 B
INSERT INTO `user`
VALUES (2, 'root2', 'root2', 'xxxx@163.com');

-- 回滚到保留点 updateA
ROLLBACK TO `updateA`;

-- 提交事务,只有操作 A 生效
COMMIT;

(3)查询结果

1
SELECT * FROM `user`;

结果:

1
2
3
4
5
6
7
mysql> SELECT * FROM user;
+----+----------+----------+--------------+
| id | username | password | email |
+----+----------+----------+--------------+
| 1 | root1 | root1 | xxxx@163.com |
+----+----------+----------+--------------+
1 row in set (0.02 sec)

AUTOCOMMIT

MySQL 默认采用隐式提交策略(autocommit。每执行一条语句就把这条语句当成一个事务然后进行提交。当出现 START TRANSACTION 语句时,会关闭隐式提交;当 COMMITROLLBACK 语句执行后,事务会自动关闭,重新恢复隐式提交。

通过 set autocommit=0 可以取消自动提交,直到 set autocommit=1 才会提交;autocommit 标记是针对每个连接而不是针对服务器的。

1
2
3
4
5
6
7
8
-- 查看 AUTOCOMMIT
SHOW VARIABLES LIKE 'AUTOCOMMIT';

-- 关闭 AUTOCOMMIT
SET autocommit = 0;

-- 开启 AUTOCOMMIT
SET autocommit = 1;

并发一致性问题

在并发环境下,事务的隔离性很难保证,因此会出现很多并发一致性问题。

丢失修改

“丢失修改”是指一个事务的更新操作被另外一个事务的更新操作替换

如下图所示,T1 和 T2 两个事务对同一个数据进行修改,T1 先修改,T2 随后修改,T2 的修改覆盖了 T1 的修改。

脏读

“脏读(dirty read)”是指当前事务可以读取其他事务未提交的数据

如下图所示,T1 修改一个数据,T2 随后读取这个数据。如果 T1 撤销了这次修改,那么 T2 读取的数据是脏数据。

不可重复读

“不可重复读(non-repeatable read)”是指一个事务内多次读取同一数据,过程中,该数据被其他事务所修改,导致当前事务多次读取的数据可能不一致

如下图所示,T2 读取一个数据,T1 对该数据做了修改。如果 T2 再次读取这个数据,此时读取的结果和第一次读取的结果不同。

幻读

“幻读(phantom read)”是指一个事务内多次读取同一范围的数据,过程中,其他事务在该数据范围新增了数据,导致当前事务未发现新增数据

事务 T1 读取某个范围内的记录时,事务 T2 在该范围内插入了新的记录,T1 再次读取这个范围的数据,此时读取的结果和和第一次读取的结果不同。

事务隔离级别

事务隔离级别简介

为了解决以上提到的“并发一致性问题”,SQL 标准提出了四种“事务隔离级别”来应对这些问题。事务隔离级别等级越高,越能保证数据的一致性和完整性,但是执行效率也越低。因此,设置数据库的事务隔离级别时需要做一下权衡。

事务隔离级别从低到高分别是:

  • “读未提交(read uncommitted)” - 是指,事务中的修改,即使没有提交,对其它事务也是可见的
  • “读已提交(read committed)” ** - 是指,事务提交后,其他事务才能看到它的修改**。换句话说,一个事务所做的修改在提交之前对其它事务是不可见的。
    • 读已提交解决了脏读的问题
    • 读已提交是大多数数据库的默认事务隔离级别,如 Oracle。
  • “可重复读(repeatable read)” - 是指:保证在同一个事务中多次读取同样数据的结果是一样的
    • 可重复读解决了不可重复读问题
    • 可重复读是 InnoDB 存储引擎的默认事务隔离级别
  • 串行化(serializable ) - 是指,强制事务串行执行,对于同一行记录,加读写锁,一旦出现锁冲突,必须等前面的事务释放锁。
    • 串行化解决了幻读问题。由于强制事务串行执行,自然避免了所有的并发问题。
    • 串行化策略会在读取的每一行数据上都加锁,这可能导致大量的超时和锁竞争。这对于高并发应用基本上是不可接受的,所以一般不会采用这个级别。

事务隔离级别对并发一致性问题的解决情况:

隔离级别 丢失修改 脏读 不可重复读 幻读
读未提交 ✔️️️
读已提交 ✔️️️ ✔️️️
可重复读 ✔️️️ ✔️️️ ✔️️️
可串行化 ✔️️️ ✔️️️ ✔️️️ ✔️️️

查看和设置事务隔离级别

可以通过 SHOW VARIABLES LIKE 'transaction_isolation' 语句查看事务隔离级别。

【示例】查看事务隔离示例

1
2
3
4
5
6
7
mysql> SHOW VARIABLES LIKE 'transaction_isolation';
+-----------------------+-----------------+
| Variable_name | Value |
+-----------------------+-----------------+
| transaction_isolation | REPEATABLE-READ |
+-----------------------+-----------------+
1 row in set (0.03 sec)

MySQL 提供了 SET TRANSACTION 语句,该语句可以改变单个会话或全局的事务隔离级别。语法格式如下:

1
SET [SESSION | GLOBAL] TRANSACTION ISOLATION LEVEL {READ UNCOMMITTED | READ COMMITTED | REPEATABLE READ | SERIALIZABLE}

其中,SESSIONGLOBAL 关键字用来指定修改的事务隔离级别的范围:

  • SESSION - 表示修改的事务隔离级别,将应用于当前会话内的所有事务。
  • GLOBAL - 表示修改的事务隔离级别,将应用于所有会话内的所有事务(即全局修改),且当前已经存在的会话不受影响;
  • 如果省略 SESSIONGLOBAL,表示修改的事务隔离级别,将应用于当前会话内的下一个还未开始的事务。

【示例】设置事务隔离示例

1
2
3
4
5
6
7
8
9
10
11
-- 设置事务隔离级别为 READ UNCOMMITTED
SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;

-- 设置事务隔离级别为 READ COMMITTED
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;

-- 设置事务隔离级别为 REPEATABLE READ
SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;

-- 设置事务隔离级别为 SERIALIZABLE
SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE;

事务隔离级别实现方式

MySQL 中的事务功能是在存储引擎层实现的,并非所有存储引擎都支持事务功能。InnoDB 是 MySQL 的首先事务存储引擎。

四种隔离级别具体是如何实现的呢?

以 InnoDB 的事务实现来说明:

  • 对于“读未提交”隔离级别的事务来说,因为可以读到未提交事务修改的数据,所以直接读取最新的数据就好了;
  • 对于“串行化”隔离级别的事务来说,通过加读写锁的方式来避免并行访问;
  • 对于“读提交”和“可重复读”隔离级别的事务来说,它们都是通过 ReadView 来实现的,区别仅在于创建 ReadView 的时机不同。ReadView 可以理解为一个数据快照。
    • “读提交”隔离级别是在“每个语句执行前”都会重新生成一个 ReadView
    • “可重复读”隔离级别是在“启动事务时”生成一个 ReadView,然后整个事务期间都在用这个 ReadView。

关于 ReadView 更多细节,将在 MVCC 章节中阐述。

MVCC

当前读和快照读

在高并发场景下,多事务同时执行,可能会出现种种并发一致性问题。最常见,也是最容易想到的解决问题思路就是:对访问的数据加锁,通过强制互斥来解决问题。但是,加锁就意味着阻塞,势必会增加响应时间,降低系统整体吞吐量。在大多数真实的业务场景中,读请求远大于写请求,由于读请求并不会修改数据,自然也不存在一致性问题,因此为占大多数的读请求加锁是一种不必要的开销。那么,我们很自然的会想到,如果只针对写操作加锁,不就能大大提升吞吐量了吗?没错,有一种名为“写时复制(Copy-On-Write,简称 COW)”的技术,正是基于这个想法而设计,并广泛应用于各种软件领域,例如:Java 中的 CopyOnWriteArrayList 等容器;Redis 中的 RDB 持久化过程。

Copy-On-Write 的核心思想是:假设有多个请求需要访问相同的数据,先为这份数据生成一个副本(也可以称为快照)。然后将读写分离,所有的读请求都直接访问原数据;所有的写请求都访问副本数据,为了实现并发一致性,写数据时需要通过加锁保证每次写操作只能由一个写请求完成。当写操作完成后,用副本数据替换原数据。

在 MySQL 中,也采用了 Copy-On-Write 设计思想,将读写分离。

  • 这里的“写”指的是当前读。“当前读”,顾名思义,指的是读取记录当前的数据。为了保证读取当前数据时,没有其他事务修改,因此需要对读取记录加锁。当前读的场景有下面几种:
    • INSERT - 插入操作
    • UPDATE - 更新操作
    • DELETE - 删除操作
    • SELECT ... LOCK IN SHARE MODE - 加共享锁(读锁)
    • SELECT ... FOR UPDATE - 加独享锁(写锁)
  • 这里的“读”指的是快照读。“快照读”,顾名思义,指的是读取记录的某个历史快照版本。不加锁的普通 SELECT 都属于快照读,例如:SELECT ... FROM。采用快照读的前提是,事务隔离级别不是串行化级别。串行化级别下的快照读会退化成当前读。快照读的实现是基于 MVCC。

什么是 MVCC

前文提到,快照读的实现是基于 MVCC。那么,什么是 MVCC 呢?

MVCC 是 Multi Version Concurrency Control 的缩写,即“多版本并发控制”。MVCC 的设计目标是提高数据库的并发性,采用非阻塞的方式去处理读/写并发冲突,可以将其看成一种乐观锁。

不仅是 MySQL,包括 Oracle、PostgreSQL 等其他关系型数据库都实现了各自的 MVCC,实现机制没有统一标准。MVCC 是 InnoDB 存储引擎实现事务隔离级别的一种具体方式。其主要用于实现读已提交和可重复读这两种隔离级别。而未提交读隔离级别总是读取最新的数据行,要求很低,无需使用 MVCC。可串行化隔离级别需要对所有读取的行都加锁,单纯使用 MVCC 无法实现。

MVCC 实现原理

MVCC 的实现原理,主要基于隐式字段、UndoLog、ReadView 来实现。

隐式字段

InnoDB 存储引擎中,数据表的每行记录,除了用户显示定义的字段以外,还有几个数据库隐式定义的字段:

  • DB_ROW_ID - 隐藏的自增 ID,如果数据表没有指定主键,InnoDB 会自动基于 row_id 产生一个聚簇索引。
  • DB_TRX_ID - 最近修改的事务 ID。事务对某条聚簇索引记录进行改动时,就会把该事务的事务 id 记录在 trx_id 隐藏列里;
  • DB_ROLL_PTR - 回滚指针,指向这条记录的上一个版本。

UndoLog

MVCC 的多版本指的是多个版本的快照,快照存储在 UndoLog 中。该日志通过回滚指针 roll_pointer 把一个数据行的所有快照链接起来,构成一个版本链

ReadView

ReadView 就是事务进行快照读时产生的读视图(快照)

ReadView 有四个重要的字段:

  • m_ids - 指的是在创建 ReadView 时,当前数据库中“活跃事务”的事务 ID 列表。注意:这是一个列表,“活跃事务”指的就是,启动了但还没提交的事务
  • min_trx_id - 指的是在创建 ReadView 时,当前数据库中“活跃事务”中事务 id 最小的事务,也就是 m_ids 的最小值。
  • max_trx_id - 这个并不是 m_ids 的最大值,而是指创建 ReadView 时当前数据库中应该给下一个事务分配的 ID 值,也就是全局事务中最大的事务 ID 值 + 1;
  • creator_trx_id - 指的是创建该 ReadView 的事务的事务 ID。

在创建 ReadView 后,我们可以将记录中的 trx_id 划分为三种情况:

  • 已提交事务
  • 已启动但未提交的事务
  • 未启动的事务

ReadView 如何判断版本链中哪个版本可见?

一个事务去访问记录的时候,除了自己的更新记录总是可见之外,还有这几种情况:

  • trx_id == creator_trx_id - 表示 trx_id 版本记录由 ReadView 所代表的当前事务产生,当然可以访问。
  • trx_id < min_trx_id - 表示 trx_id 版本记录是在创建 ReadView 之前已提交的事务生成的,当前事务可以访问。
  • trx_id >= max_trx_id - 表示 trx_id 版本记录是在创建 ReadView 之后才启动的事务生成的,当前事务不可以访问。
  • min_trx_id <= trx_id < max_trx_id - 需要判断 trx_id 是否在 m_ids 列表中
    • 如果 trx_idm_ids 列表中,表示生成 trx_id 版本记录的事务依然活跃(未提交事务),当前事务不可以访问。
    • 如果 trx_id 不在 m_ids 列表中,表示生成 trx_id 版本记录的事务已提交,当前事务可以访问。

这种通过“版本链”来控制并发事务访问同一个记录时的行为就叫 MVCC(多版本并发控制)。

MVCC 如何实现多种事务隔离级别

对于“读已提交”和“可重复读”隔离级别的事务来说,它们都是通过 MVCC 的 ReadView 机制来实现的,区别仅在于创建 ReadView 的时机不同。ReadView 可以理解为一个数据快照。

  • “读已提交”隔离级别,会在“每个语句执行前”都会重新生成一个 ReadView。
  • “可重复读”隔离级别,会在“启动事务时”生成一个 ReadView,然后整个事务期间都在复用这个 ReadView。

MySQL InnoDB 引擎的默认隔离级别虽然是“可重复读”,但是它很大程度上避免幻读现象(并不是完全解决了),解决的方案有两种:

  • 针对快照读(普通 select 语句),通过 MVCC 方式解决了幻读,因为可重复读隔离级别下,事务执行过程中看到的数据,一直跟这个事务启动时看到的数据是一致的,即使中途有其他事务插入了一条数据,是查询不出来这条数据的,所以就很好了避免幻读问题。
  • 针对当前读(select … for update 等语句),通过 Next-Key Lock(记录锁+间隙锁)方式解决了幻读,因为当执行 select … for update 语句的时候,会加上 Next-Key Lock,如果有其他事务在 Next-Key Lock 锁范围内插入了一条记录,那么这个插入语句就会被阻塞,无法成功插入,所以就很好了避免幻读问题。

MVCC 实现可重复读

可重复读隔离级别只有在启动事务时才会创建 ReadView,然后整个事务期间都使用这个 ReadView。这样就保证了在事务期间读到的数据都是事务启动前的记录。

举个例子,假设有两个事务依次执行以下操作:

  • 初始,表中 id = 1 的 value 列值为 100。
  • 事务 2 读取数据,value 为 100;
  • 事务 1 将 value 设为 200;
  • 事务 2 读取数据,value 为 100;
  • 事务 1 提交事务;
  • 事务 2 读取数据,value 依旧为 100;

以上操作,如下图所示。T2 事务在事务过程中,是否可以看到 T1 事务的修改,可以根据 ReadView 中描述的规则去判断。

从图中不难看出:

  • 对于 trx_id = 100 的版本记录,比对 T2 事务 ReadView ,trx_id < min_trx_id,因此在 T2 事务中的任意时刻都可见;
  • 对于 trx_id = 101 的版本记录,比对 T2 事务 ReadView ,可以看出 min_trx_id <= trx_id < max_trx_id ,且 trx_idm_ids 中,因此 T2 事务中不可见。

综上所述,在 T2 事务中,自始至终只能看到 trx_id = 100 的版本记录。

MVCC 实现读已提交

读已提交隔离级别每次读取数据时都会创建一个 ReadView。这意味着,事务期间的多次读取同一条数据,前后读取的数据可能会出现不一致——因为,这期间可能有另外一个事务修改了该记录,并提交了事务。

举个例子,假设有两个事务依次执行以下操作:

  • 初始,表中 id = 1 的 value 列值为 100。
  • 事务 2 读取数据(创建 ReadView),value 为 0;
  • 事务 1 将 value 设为 100;
  • 事务 2 读取数据(创建 ReadView),value 为 0;
  • 事务 1 提交事务;
  • 事务 2 读取数据(创建 ReadView),value 为 100;

以上操作,如下图所示,T2 事务在事务过程中,是否可以看到其他事务的修改,可以根据 ReadView 中描述的规则去判断。

从图中不难看出:

  • 对于 trx_id = 100 的版本记录,比对 T2 事务 ReadView ,trx_id < min_trx_id,因此在 T2 事务中的任意时刻都可见;
  • 对于 trx_id = 101 的版本记录,比对 T2 事务 ReadView ,可以看出第二次查询时(T1 更新未提交),min_trx_id <= trx_id < max_trx_id ,且 trx_idm_ids 中,因此 T2 事务中不可见;而第三次查询时(T1 更新已提交),trx_id < min_trx_id,因此在 T2 事务中可见;

综上所述,在 T2 事务中,当 T1 事务提交前,可读取到的是 trx_id = 100 的版本记录;当 T1 事务提交后,可读取到的是 trx_id = 101 的版本记录。

MVCC + Next-Key Lock 解决幻读

MySQL InnoDB 引擎的默认隔离级别虽然是“可重复读”,但是它很大程度上避免幻读现象(并不是完全解决了)。针对快照读和当前读,InnoDB 的处理方式各不相同。

快照读是如何避免幻读的?

针对快照读(普通 SELECT 语句),通过 MVCC 方式解决了幻读,因为可重复读隔离级别下,事务执行过程中看到的数据,一直跟这个事务启动时看到的数据是一致的,即使中途有其他事务插入了一条数据,是查询不出来这条数据的,所以就很好了避免幻读问题。

当前读是如何避免幻读的?

针对当前读SELECT ... FOR UPDATE 等语句),通过 Next-Key Lock(记录锁+间隙锁)方式解决了幻读,因为当执行 SELECT ... FOR UPDATE 语句的时候,会加上 Next-Key Lock,如果有其他事务在 Next-Key Lock 锁范围内插入了一条记录,那么这个插入语句就会被阻塞,无法成功插入,所以就很好的避免了幻读问题。

幻读被完全解决了吗?

可重复读隔离级别下虽然很大程度上避免了幻读,但是还是没有能完全解决幻读

【示例】幻读案例一

环境:存储引擎为 InnoDB;事务隔离级别为可重复读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
-- --------------------------------------------------------------------------------------
-- 实验说明:以下 SQL 脚本必须严格按照顺序执行,并且事务 A 和事务 B 必须在不同的 Client 中执行。
-- ----------------------------------------------------------------------------------------

-- --------------------------------------------------------------------- (1)数据初始化

-- 创建表 test
CREATE TABLE `test` (
`id` INT(10) UNSIGNED PRIMARY KEY AUTO_INCREMENT,
`value` INT(10) NOT NULL
);

-- 数据初始化
INSERT INTO `test` (`id`, `value`) VALUES (1, 1);
INSERT INTO `test` (`id`, `value`) VALUES (2, 2);
INSERT INTO `test` (`id`, `value`) VALUES (3, 3);

-- --------------------------------------------------------------------- (2)事务 A

BEGIN;

-- 查询 id = 4 的记录
SELECT * FROM `test` WHERE `id` = 4;
-- 结果为空

-- --------------------------------------------------------------------- (3)事务 B

BEGIN;

INSERT INTO `test` (`id`, `value`) VALUES (4, 4);

COMMIT;

-- --------------------------------------------------------------------- (4)事务 A

-- 查询 id = 4 的记录
SELECT * FROM `test` WHERE `id` = 4;
-- 结果依然为空

-- 成功更新本应看不到的记录 id = 4
UPDATE `test` SET `value` = 0 WHERE `id` = 4;

-- 再一次查询 id = 4 的记录
SELECT * FROM `test` WHERE `id` = 4;
-- 结果为:
-- +----+-------+
-- | id | value |
-- +----+-------+
-- | 4 | 0 |
-- +----+-------+

COMMIT;

以上示例代码的时序图如下:

【示例】幻读案例二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
-- --------------------------------------------------------------------- (1)数据初始化

-- 创建表 test
CREATE TABLE `test` (
`id` INT(10) UNSIGNED PRIMARY KEY AUTO_INCREMENT,
`value` INT(10) NOT NULL
);

-- 数据初始化
INSERT INTO `test` (`id`, `value`) VALUES (1, 1);
INSERT INTO `test` (`id`, `value`) VALUES (2, 2);
INSERT INTO `test` (`id`, `value`) VALUES (3, 3);

-- --------------------------------------------------------------------- (2)事务 A

BEGIN;

-- 查询 id > 2 的记录数
SELECT COUNT(*) FROM `test` WHERE `id` > 2;
-- 结果为:
-- +----------+
-- | count(*) |
-- +----------+
-- | 1 |
-- +----------+

-- --------------------------------------------------------------------- (3)事务 B

BEGIN;

INSERT INTO `test` (`id`, `value`) VALUES (4, 4);

COMMIT;

-- --------------------------------------------------------------------- (4)事务 A

-- 查询 id > 2 的记录数
SELECT COUNT(*) FROM `test` WHERE `id` > 2 FOR UPDATE;
-- 结果为:
-- +----------+
-- | count(*) |
-- +----------+
-- | 2 |
-- +----------+

COMMIT;

要避免这类特殊场景下发生幻读的现象的话,就是尽量在开启事务之后,马上执行 select ... for update 这类当前读的语句,因为它会对记录加 Next-Key Lock,从而避免其他事务插入一条新记录。

分布式事务

在单一数据节点中,事务仅限于对单一数据库资源的访问控制,称之为 本地事务。几乎所有的成熟的关系型数据库都提供了对本地事务的原生支持。

分布式事务指的是事务操作跨越多个节点,并且要求满足事务的 ACID 特性。

分布式事务的常见方案如下:

  • 两阶段提交(2PC) - 将事务的提交过程分为两个阶段来进行处理:准备阶段和提交阶段。参与者将操作成败通知协调者,再由协调者根据所有参与者的反馈情报决定各参与者是否要提交操作还是中止操作。
  • 三阶段提交(3PC) - 与二阶段提交不同的是,引入超时机制。同时在协调者和参与者中都引入超时机制。将二阶段的准备阶段拆分为 2 个阶段,插入了一个 preCommit 阶段,使得原先在二阶段提交中,参与者在准备之后,由于协调者发生崩溃或错误,而导致参与者处于无法知晓是否提交或者中止的“不确定状态”所产生的可能相当长的延时的问题得以解决。
  • 补偿事务(TCC)
    • Try - 操作作为一阶段,负责资源的检查和预留。
    • Confirm - 操作作为二阶段提交操作,执行真正的业务。
    • Cancel - 是预留资源的取消。
  • 本地消息表 - 在事务主动发起方额外新建事务消息表,事务发起方处理业务和记录事务消息在本地事务中完成,轮询事务消息表的数据发送事务消息,事务被动方基于消息中间件消费事务消息表中的事务。
  • MQ 事务 - 基于 MQ 的分布式事务方案其实是对本地消息表的封装。
  • SAGA - Saga 事务核心思想是将长事务拆分为多个本地短事务,由 Saga 事务协调器协调,如果正常结束那就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。

分布式事务方案分析:

  • 2PC/3PC 依赖于数据库,能够很好的提供强一致性和强事务性,但相对来说延迟比较高,比较适合传统的单体应用,在同一个方法中存在跨库操作的情况,不适合高并发和高性能要求的场景。
  • TCC 适用于执行时间确定且较短,实时性要求高,对数据一致性要求高,比如互联网金融企业最核心的三个服务:交易、支付、账务。
  • 本地消息表/MQ 事务 都适用于事务中参与方支持操作幂等,对一致性要求不高,业务上能容忍数据不一致到一个人工检查周期,事务涉及的参与方、参与环节较少,业务上有对账/校验系统兜底。
  • Saga 事务 由于 Saga 事务不能保证隔离性,需要在业务层控制并发,适合于业务场景事务并发操作同一资源较少的情况。 Saga 相比缺少预提交动作,导致补偿动作的实现比较麻烦,例如业务是发送短信,补偿动作则得再发送一次短信说明撤销,用户体验比较差。Saga 事务较适用于补偿动作容易处理的场景。

分布式事务详细说明、分析请参考:分布式事务基本原理

事务最佳实践

高并发场景下的事务到底该如何调优?

尽量使用低级别事务隔离

结合业务场景,尽量使用低级别事务隔离

避免行锁升级表锁

在 InnoDB 中,行锁是通过索引实现的,如果不通过索引条件检索数据,行锁将会升级到表锁。我们知道,表锁是会严重影响到整张表的操作性能的,所以应该尽力避免。

缩小事务范围

有时候,数据库并发访问量太大,会出现以下异常:

1
MySQLQueryInterruptedException: Query execution was interrupted

高并发时对一条记录进行更新的情况下,由于更新记录所在的事务还可能存在其他操作,导致一个事务比较长,当有大量请求进入时,就可能导致一些请求同时进入到事务中。

又因为锁的竞争是不公平的,当多个事务同时对一条记录进行更新时,极端情况下,一个更新操作进去排队系统后,可能会一直拿不到锁,最后因超时被系统打断踢出。

img

如上图中的操作,虽然都是在一个事务中,但锁的申请在不同时间,只有当其他操作都执行完,才会释放所有锁。因为扣除库存是更新操作,属于行锁,这将会影响到其他操作该数据的事务,所以我们应该尽量避免长时间地持有该锁,尽快释放该锁。又因为先新建订单和先扣除库存都不会影响业务,所以我们可以将扣除库存操作放到最后,也就是使用执行顺序 1,以此尽量减小锁的持有时间。

在 InnoDB 事务中,行锁是在需要的时候才加上的,但并不是不需要了就立刻释放,而是要等到事务结束时才释放。这个就是两阶段锁协议。

知道了这个设定,对我们使用事务有什么帮助呢?那就是,如果你的事务中需要锁多个行,要把最可能造成锁冲突、最可能影响并发度的锁尽量往后放。

参考资料

Kafka 快速入门

Kafka 简介

Apache Kafka 是一款开源的消息引擎系统,也是一个分布式流计算平台,此外,还可以作为数据存储

img

Kafka 的功能

Kafka 的核心功能如下:

  • 消息引擎 - Kafka 可以作为一个消息引擎系统。
  • 流处理 - Kafka 可以作为一个分布式流处理平台。
  • 存储 - Kafka 可以作为一个安全的分布式存储。

Kafka 的特性

Kafka 的设计目标:

  • 高性能
    • 分区、分段、索引:基于分区机制提供并发处理能力。分段、索引提升了数据读写的查询效率。
    • 顺序读写:使用顺序读写提升磁盘 IO 性能。
    • 零拷贝:利用零拷贝技术,提升网络 I/O 效率。
    • 页缓存:利用操作系统的 PageCache 来缓存数据(典型的利用空间换时间)
    • 批量读写:批量读写可以有效提升网络 I/O 效率。
    • 数据压缩:Kafka 支持数据压缩,可以有效提升网络 I/O 效率。
    • pull 模式:Kafka 架构基于 pull 模式,可以自主控制消费策略,提升传输效率。
  • 高可用
    • 持久化:Kafka 所有的消息都存储在磁盘,天然支持持久化。
    • 副本机制:Kafka 的 Broker 集群支持副本机制,可以通过冗余,来保证其整体的可用性。
    • 选举 Leader:Kafka 基于 ZooKeeper 支持选举 Leader,实现了故障转移能力。
  • 伸缩性
    • 分区:Kafka 的分区机制使得其具有良好的伸缩性。

Kafka 术语

  • 消息:Kafka 的数据单元被称为消息。消息由字节数组组成。
  • 批次:批次就是一组消息,这些消息属于同一个主题和分区。
  • 主题(Topic):Kafka 消息通过主题进行分类。主题就类似数据库的表。
    • 不同主题的消息是物理隔离的;
    • 同一个主题的消息保存在一个或多个 Broker 上。但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处。
    • 主题有一个或多个分区。
  • 分区(Partition):分区是一个有序不变的消息序列,消息以追加的方式写入分区,然后以先入先出的顺序读取。Kafka 通过分区来实现数据冗余和伸缩性。
  • 消息偏移量(Offset):表示分区中每条消息的位置信息,是一个单调递增且不变的值。
  • 生产者(Producer):生产者是向主题发布新消息的 Kafka 客户端。生产者可以将数据发布到所选择的主题中。生产者负责将记录分配到主题中的哪一个分区中。
  • 消费者(Consumer):消费者是从主题订阅新消息的 Kafka 客户端。消费者通过检查消息的偏移量来区分消息是否已读。
  • 消费者群组(Consumer Group):多个消费者共同构成的一个群组,同时消费多个分区以实现高并发。
    • 每个消费者属于一个特定的消费者群组(可以为每个消费者指定消费者群组,若不指定,则属于默认的群组)。
    • 群组中,一个消费者可以消费多个分区。
    • 群组中,每个分区只能被指定给一个消费者。
  • 再均衡(Rebalance):消费者群组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。分区再均衡是 Kafka 消费者端实现高可用的重要手段。
  • Broker - 一个独立的 Kafka 服务器被称为 Broker。Broker 接受来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存;消费者向 Broker 请求消息,Broker 负责返回已提交的消息。
  • 副本(Replica):Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。

Kafka 发行版本

Kafka 主要有以下发行版本:

  • Apache Kafka:也称社区版 Kafka。优势在于迭代速度快,社区响应度高,使用它可以让你有更高的把控度;缺陷在于仅提供基础核心组件,缺失一些高级的特性。
  • Confluent Kafka:Confluent 公司提供的 Kafka。优势在于集成了很多高级特性且由 Kafka 原班人马打造,质量上有保证;缺陷在于相关文档资料不全,普及率较低,没有太多可供参考的范例。
  • CDH/HDP Kafka:大数据云公司提供的 Kafka,内嵌 Apache Kafka。优势在于操作简单,节省运维成本;缺陷在于把控度低,演进速度较慢。

Kafka 重大版本

Kafka 有以下重大版本:

  • 0.7 - 只提供了最基础的消息队列功能
  • 0.8
    • 正式引入了副本机制
    • 至少升级到 0.8.2.2
  • 0.9
    • 增加了基础的安全认证 / 权限功能
    • 用 Java 重写了新版本消费者 API
    • 引入了 Kafka Connect 组件
    • 新版本 Producer API 在这个版本中算比较稳定
  • 0.10
    • 引入了 Kafka Streams,正式升级成分布式流处理平台
    • 至少升级到 0.10.2.2
    • 修复了一个可能导致 Producer 性能降低的 Bug
  • 0.11
    • 提供幂等性 Producer API 以及事务
    • 对 Kafka 消息格式做了重构
    • 至少升级到 0.11.0.3
  • 1.0 和 2.0 - Kafka Streams 的改进

Kafka 服务端使用入门

步骤一、获取 Kafka

下载最新的 Kafka 版本并解压到本地。

1
2
$ tar -xzf kafka_2.13-2.7.0.tgz
$ cd kafka_2.13-2.7.0

步骤二、启动 Kafka 环境

注意:本地必须已安装 Java8

执行以下指令,保证所有服务按照正确的顺序启动:

1
2
3
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties

打开另一个终端会话,并执行:

1
2
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties

一旦所有服务成功启动,您就已经成功运行了一个基本的 kafka 环境。

步骤三、创建一个 TOPIC 并存储您的事件

Kafka 是一个分布式事件流处理平台,它可以让您通过各种机制读、写、存储并处理事件(events,也被称为记录或消息)

示例事件包括付款交易,手机的地理位置更新,运输订单,物联网设备或医疗设备的传感器测量等等。 这些事件被组织并存储在主题中(topics)。 简单来说,主题类似于文件系统中的文件夹,而事件是该文件夹中的文件。

因此,在您写入第一个事件之前,您必须先创建一个 Topic。执行以下指令:

1
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

所有的 Kafka 命令行工具都有附加可选项:不加任何参数,运行 kafka-topics.sh 命令会显示使用信息。例如,会显示新 Topic 的分区数等细节。

1
2
3
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic:quickstart-events PartitionCount:1 ReplicationFactor:1 Configs:
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0

步骤四、向 Topic 写入 Event

Kafka 客户端和 Kafka Broker 的通信是通过网络读写 Event。一旦收到信息,Broker 会将其以您需要的时间(甚至永久化)、容错化的方式存储。

执行 kafka-console-producer.sh 命令将 Event 写入 Topic。默认,您输入的任意行会作为独立 Event 写入 Topic:

1
2
3
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event

您可以通过 Ctrl-C 在任何时候中断 kafka-console-producer.sh

步骤五、读 Event

执行 kafka-console-consumer.sh 以读取写入 Topic 中的 Event

1
2
3
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

您可以通过 Ctrl-C 在任何时候中断 kafka-console-consumer.sh

由于 Event 被持久化存储在 Kafka 中,因此您可以根据需要任意多次地读取它们。 您可以通过打开另一个终端会话并再次重新运行上一个命令来轻松地验证这一点。

步骤六、通过 KAFKA CONNECT 将数据作为事件流导入/导出

您可能有大量数据,存储在传统的关系数据库或消息队列系统中,并且有许多使用这些系统的应用程序。 通过 Kafka Connect,您可以将来自外部系统的数据持续地导入到 Kafka 中,反之亦然。 因此,将已有系统与 Kafka 集成非常容易。为了使此过程更加容易,有数百种此类连接器可供使用。

需要了解有关如何将数据导入和导出 Kafka 的更多信息,可以参考:Kafka Connect section 章节。

步骤七、使用 Kafka Streams 处理事件

一旦将数据作为 Event 存储在 Kafka 中,就可以使用 Kafka Streams 的 Java / Scala 客户端。它允许您实现关键任务的实时应用程序和微服务,其中输入(和/或)输出数据存储在 Kafka Topic 中。

Kafka Streams 结合了 Kafka 客户端编写和部署标准 Java 和 Scala 应用程序的简便性,以及 Kafka 服务器集群技术的优势,使这些应用程序具有高度的可伸缩性、弹性、容错性和分布式。该库支持一次性处理,有状态的操作,以及聚合、窗口化化操作、join、基于事件时间的处理等等。

1
2
3
4
5
6
7
8
KStream<String, String> textLines = builder.stream("quickstart-events");

KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
.groupBy((keyIgnored, word) -> word)
.count();

wordCounts.toStream().to("output-topic"), Produced.with(Serdes.String(), Serdes.Long()));

Kafka Streams demoapp development tutorial 展示了如何从头到尾的编码并运行一个流式应用。

步骤八、终止 Kafka 环境

  1. 如果尚未停止,请使用 Ctrl-C 停止生产者和消费者客户端。
  2. 使用 Ctrl-C 停止 Kafka 代理。
  3. 最后,使用 Ctrl-C 停止 ZooKeeper 服务器。

如果您还想删除本地 Kafka 环境的所有数据,包括您在此过程中创建的所有事件,请执行以下命令:

1
$ rm -rf /tmp/kafka-logs /tmp/zookeeper

Kafka Java 客户端使用入门

引入 maven 依赖

Stream API 的 maven 依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.1.0</version>
</dependency>

其他 API 的 maven 依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>

Kafka 核心 API

Kafka 有 5 个核心 API

  • Producer API - 允许一个应用程序发布一串流式数据到一个或者多个 Kafka Topic。
  • Consumer API - 允许一个应用程序订阅一个或多个 Kafka Topic,并且对发布给他们的流式数据进行处理。
  • Streams API - 允许一个应用程序作为一个流处理器,消费一个或者多个 Kafka Topic 产生的输入流,然后生产一个输出流到一个或多个 Kafka Topic 中去,在输入输出流中进行有效的转换。
  • Connector API - 允许构建并运行可重用的生产者或者消费者,将 Kafka Topic 连接到已存在的应用程序或数据库。例如,连接到一个关系型数据库,捕捉表的所有变更内容。
  • Admin API - 支持管理和检查 Topic,Broker,ACL 和其他 Kafka 对象。

发送消息

发送并忽略返回

代码如下,直接通过 send 方法来发送

1
2
3
4
5
6
7
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}

同步发送

代码如下,与“发送并忘记”的方式区别在于多了一个 get 方法,会一直阻塞等待 Broker 返回结果:

1
2
3
4
5
6
7
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}

异步发送

代码如下,异步方式相对于“发送并忽略返回”的方式的不同在于:在异步返回时可以执行一些操作,如记录错误或者成功日志。

首先,定义一个 callback

1
2
3
4
5
6
7
8
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}

然后,使用这个 callback

1
2
3
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());

发送消息示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
* Kafka 生产者生产消息示例 生产者配置参考:https://kafka.apache.org/documentation/#producerconfigs
*/
public class ProducerDemo {
private static final String HOST = "localhost:9092";

public static void main(String[] args) {
// 1. 指定生产者的配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, HOST);
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

// 2. 使用配置初始化 Kafka 生产者
Producer<String, String> producer = new KafkaProducer<>(properties);

try {
// 3. 使用 send 方法发送异步消息
for (int i = 0; i < 100; i++) {
String msg = "Message " + i;
producer.send(new ProducerRecord<>("HelloWorld", msg));
System.out.println("Sent:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 4. 关闭生产者
producer.close();
}
}
}

消费消息流程

消费流程

具体步骤如下:

  1. 创建消费者。
  2. 订阅主题。除了订阅主题方式外还有使用指定分组的模式,但是常用方式都是订阅主题方式
  3. 轮询消息。通过 poll 方法轮询。
  4. 关闭消费者。在不用消费者之后,会执行 close 操作。close 操作会关闭 socket,并触发当前消费者群组的再均衡。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 1.构建KafkaCustomer
Consumer consumer = buildCustomer();

// 2.设置主题
consumer.subscribe(Arrays.asList(topic));

// 3.接受消息
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
System.out.println("customer Message---");
for (ConsumerRecord<String, String> record : records)

// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
} finally {
// 4.关闭消息
consumer.close();
}

创建消费者的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public Consumer buildCustomer() {
Properties props = new Properties();
// bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
props.put("bootstrap.servers", "localhost:9092");
// 消费者群组
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);

return consumer;
}

消费消息方式

分为订阅主题和指定分组两种方式:

  • 消费者分组模式。通过订阅主题方式时,消费者必须加入到消费者群组中,即消费者必须有一个自己的分组;
  • 独立消费者模式。这种模式就是消费者是独立的不属于任何消费者分组,自己指定消费那些 Partition

(1)订阅主题方式

1
consumer.subscribe(Arrays.asList(topic));

(2)独立消费者模式

通过 consumer 的 assign(Collection<TopicPartition> partitions) 方法来为消费者指定分区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void consumeMessageForIndependentConsumer(String topic){
// 1.构建KafkaCustomer
Consumer consumer = buildCustomer();

// 2.指定分区
// 2.1获取可用分区
List<PartitionInfo> partitionInfoList = buildCustomer().partitionsFor(topic);
// 2.2指定分区,这里是指定了所有分区,也可以指定个别的分区
if(null != partitionInfoList){
List<TopicPartition> partitions = Lists.newArrayList();
for(PartitionInfo partitionInfo : partitionInfoList){
partitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
}
consumer.assign(partitions);
}

// 3.接受消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
System.out.println("consume Message---");
for (ConsumerRecord<String, String> record : records) {

// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());

// 异步提交
consumer.commitAsync();


}
}
}

参考资料

Kafka 运维

环境要求:

  • JDK8
  • ZooKeeper

Kafka 单点部署

下载解压

进入官方下载地址:http://kafka.apache.org/downloads,选择合适版本。

解压到本地:

1
2
tar -xzf kafka_2.11-1.1.0.tgz
cd kafka_2.11-1.1.0

现在您已经在您的机器上下载了最新版本的 Kafka。

启动服务器

由于 Kafka 依赖于 ZooKeeper,所以运行前需要先启动 ZooKeeper

1
2
3
$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

然后,启动 Kafka

1
2
3
4
$ bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

停止服务器

执行所有操作后,可以使用以下命令停止服务器

1
bin/kafka-server-stop.sh config/server.properties

Kafka 集群部署

修改配置

复制配置为多份(Windows 使用 copy 命令代理):

1
2
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

修改配置:

1
2
3
4
5
6
7
8
9
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2

其中,broker.id 这个参数必须是唯一的。

端口故意配置的不一致,是为了可以在一台机器启动多个应用节点。

启动

根据这两份配置启动三个服务器节点:

1
2
3
4
5
6
$ bin/kafka-server-start.sh config/server.properties &
...
$ bin/kafka-server-start.sh config/server-1.properties &
...
$ bin/kafka-server-start.sh config/server-2.properties &
...

创建一个新的 Topic 使用 三个备份:

1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

查看主题:

1
2
3
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
  • leader - 负责指定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的领导者。
  • replicas - 是复制此分区日志的节点列表,无论它们是否为领导者,或者即使它们当前处于活动状态。
  • isr - 是“同步”复制品的集合。这是副本列表的子集,该列表当前处于活跃状态并且已经被领导者捕获。

Kafka 命令

主题(Topic)

创建 Topic

1
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-topic

查看 Topic 列表

1
kafka-topics --list --zookeeper localhost:2181

添加 Partition

1
kafka-topics --zookeeper localhost:2181 --alter --topic my-topic --partitions 16

删除 Topic

1
kafka-topics --zookeeper localhost:2181 --delete --topic my-topic

查看 Topic 详细信息

1
kafka-topics --zookeeper localhost:2181/kafka-cluster --describe

查看备份分区

1
kafka-topics --zookeeper localhost:2181/kafka-cluster --describe --under-replicated-partitions

生产者(Producers)

通过控制台输入生产消息

1
kafka-console-producer --broker-list localhost:9092 --topic my-topic

通过文件输入生产消息

1
kafka-console-producer --broker-list localhost:9092 --topic test < messages.txt

通过控制台输入 Avro 生产消息

1
kafka-avro-console-producer --broker-list localhost:9092 --topic my.Topic --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' --property schema.registry.url=http://localhost:8081

然后,可以选择输入部分 json key:

1
{ "f1": "value1" }

生成消息性能测试

1
kafka-producer-perf-test --topic position-reports --throughput 10000 --record-size 300 --num-records 20000 --producer-props bootstrap.servers="localhost:9092"

消费者(Consumers)

消费所有未消费的消息

1
kafka-console-consumer --bootstrap-server localhost:9092 --topic my-topic --from-beginning

消费一条消息

1
kafka-console-consumer --bootstrap-server localhost:9092 --topic my-topic  --max-messages 1

从指定的 offset 消费一条消息

从指定的 offset __consumer_offsets 消费一条消息:

1
kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter 'kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter' --max-messages 1

从指定 Group 消费消息

1
kafka-console-consumer --topic my-topic --new-consumer --bootstrap-server localhost:9092 --consumer-property group.id=my-group

消费 avro 消息

1
kafka-avro-console-consumer --topic position-reports --new-consumer --bootstrap-server localhost:9092 --from-beginning --property schema.registry.url=localhost:8081 --max-messages 10
1
kafka-avro-console-consumer --topic position-reports --new-consumer --bootstrap-server localhost:9092 --from-beginning --property schema.registry.url=localhost:8081

查看消费者 Group 列表

1
kafka-consumer-groups --new-consumer --list --bootstrap-server localhost:9092

查看消费者 Group 详细信息

1
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group testgroup

配置(Config)

设置 Topic 的保留时间

1
kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config retention.ms=3600000

查看 Topic 的所有配置

1
kafka-configs --zookeeper localhost:2181 --describe --entity-type topics --entity-name my-topic

修改 Topic 的配置

1
kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --delete-config retention.ms

ACL

查看指定 Topic 的 ACL

1
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --list --topic topicA

添加 ACL

1
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic topicA --group groupA
1
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic topicA

ZooKeeper

1
zookeeper-shell localhost:2182 ls /

Kafka 工具

Kafka 核心配置

Broker 级别配置

存储配置

首先 Broker 是需要配置存储信息的,即 Broker 使用哪些磁盘。那么针对存储信息的重要参数有以下这么几个:

  • log.dirs:指定了 Broker 需要使用的若干个文件目录路径。这个参数是没有默认值的,必须由使用者亲自指定。
  • log.dir:注意这是 dir,结尾没有 s,说明它只能表示单个路径,它是补充上一个参数用的。

log.dirs 具体格式是一个 CSV 格式,也就是用逗号分隔的多个路径,比如/home/kafka1,/home/kafka2,/home/kafka3这样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。这样做有两个好处:

  • 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
  • 能够实现故障转移:即 Failover。这是 Kafka 1.1 版本新引入的强大功能。要知道在以前,只要 Kafka Broker 使用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。但是自 1.1 开始,这种情况被修正了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能正常工作。

zookeeper 配置

Kafka 与 ZooKeeper 相关的最重要的参数当属 zookeeper.connect。这也是一个 CSV 格式的参数,比如我可以指定它的值为zk1:2181,zk2:2181,zk3:2181。2181 是 ZooKeeper 的默认端口。

现在问题来了,如果我让多个 Kafka 集群使用同一套 ZooKeeper 集群,那么这个参数应该怎么设置呢?这时候 chroot 就派上用场了。这个 chroot 是 ZooKeeper 的概念,类似于别名。

如果你有两套 Kafka 集群,假设分别叫它们 kafka1 和 kafka2,那么两套集群的zookeeper.connect参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1zk1:2181,zk2:2181,zk3:2181/kafka2。切记 chroot 只需要写一次,而且是加到最后的。我经常碰到有人这样指定:zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3,这样的格式是不对的。

Broker 连接配置

  • listeners:告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。
  • advertised.listeners:和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的。
  • host.name/port:列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。

我们具体说说监听器的概念,从构成上来说,它是若干个逗号分隔的三元组,每个三元组的格式为<协议名称,主机名,端口号>。这里的协议名称可能是标准的名字,比如 PLAINTEXT 表示明文传输、SSL 表示使用 SSL 或 TLS 加密传输等;也可能是你自己定义的协议名字,比如CONTROLLER: //localhost:9092

最好全部使用主机名,即 Broker 端和 Client 端应用配置中全部填写主机名。

Topic 管理

  • auto.create.topics.enable:是否允许自动创建 Topic。一般设为 false,由运维把控创建 Topic。
  • unclean.leader.election.enable:是否允许 Unclean Leader 选举。
  • auto.leader.rebalance.enable:是否允许定期进行 Leader 选举。

第二个参数unclean.leader.election.enable是关闭 Unclean Leader 选举的。何谓 Unclean?还记得 Kafka 有多个副本这件事吗?每个分区都有多个副本来提供高可用。在这些副本中只能有一个副本对外提供服务,即所谓的 Leader 副本。

那么问题来了,这些副本都有资格竞争 Leader 吗?显然不是,只有保存数据比较多的那些副本才有资格竞选,那些落后进度太多的副本没资格做这件事。

好了,现在出现这种情况了:假设那些保存数据比较多的副本都挂了怎么办?我们还要不要进行 Leader 选举了?此时这个参数就派上用场了。

如果设置成 false,那么就坚持之前的原则,坚决不能让那些落后太多的副本竞选 Leader。这样做的后果是这个分区就不可用了,因为没有 Leader 了。反之如果是 true,那么 Kafka 允许你从那些“跑得慢”的副本中选一个出来当 Leader。这样做的后果是数据有可能就丢失了,因为这些副本保存的数据本来就不全,当了 Leader 之后它本人就变得膨胀了,认为自己的数据才是权威的。

这个参数在最新版的 Kafka 中默认就是 false,本来不需要我特意提的,但是比较搞笑的是社区对这个参数的默认值来来回回改了好几版了,鉴于我不知道你用的是哪个版本的 Kafka,所以建议你还是显式地把它设置成 false 吧。

第三个参数auto.leader.rebalance.enable的影响貌似没什么人提,但其实对生产环境影响非常大。设置它的值为 true 表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,当然这个重选举不是无脑进行的,它要满足一定的条件才会发生。严格来说它与上一个参数中 Leader 选举的最大不同在于,它不是选 Leader,而是换 Leader!比如 Leader A 一直表现得很好,但若auto.leader.rebalance.enable=true,那么有可能一段时间后 Leader A 就要被强行卸任换成 Leader B。

你要知道换一次 Leader 代价很高的,原本向 A 发送请求的所有客户端都要切换成向 B 发送请求,而且这种换 Leader 本质上没有任何性能收益,因此我建议你在生产环境中把这个参数设置成 false。

数据留存

  • log.retention.{hour|minutes|ms}:都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hour 最低。通常情况下我们还是设置 hour 级别的多一些,比如log.retention.hour=168表示默认保存 7 天的数据,自动删除 7 天前的数据。很多公司把 Kafka 当做存储来使用,那么这个值就要相应地调大。
  • log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。这个值默认是 -1,表明你想在这台 Broker 上保存多少数据都可以,至少在容量方面 Broker 绝对为你开绿灯,不会做任何阻拦。这个参数真正发挥作用的场景其实是在云上构建多租户的 Kafka 集群:设想你要做一个云上的 Kafka 服务,每个租户只能使用 100GB 的磁盘空间,为了避免有个“恶意”租户使用过多的磁盘空间,设置这个参数就显得至关重要了。
  • message.max.bytes:控制 Broker 能够接收的最大消息大小。默认的 1000012 太少了,还不到 1MB。实际场景中突破 1MB 的消息都是屡见不鲜的,因此在线上环境中设置一个比较大的值还是比较保险的做法。毕竟它只是一个标尺而已,仅仅衡量 Broker 能够处理的最大消息大小,即使设置大一点也不会耗费什么磁盘空间的。

Topic 级别配置

  • retention.ms:规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。
  • retention.bytes:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。

操作系统参数

  • 文件描述符限制
  • 文件系统类型
  • Swappiness
  • 提交时间

文件描述符系统资源并不像我们想象的那样昂贵,你不用太担心调大此值会有什么不利的影响。通常情况下将它设置成一个超大的值是合理的做法,比如ulimit -n 1000000。其实设置这个参数一点都不重要,但不设置的话后果很严重,比如你会经常看到“Too many open files”的错误。

其次是文件系统类型的选择。这里所说的文件系统指的是如 ext3、ext4 或 XFS 这样的日志型文件系统。根据官网的测试报告,XFS 的性能要强于 ext4,所以生产环境最好还是使用 XFS。对了,最近有个 Kafka 使用 ZFS 的数据报告,貌似性能更加强劲,有条件的话不妨一试。

第三是 swap 的调优。网上很多文章都提到设置其为 0,将 swap 完全禁掉以防止 Kafka 进程使用 swap 空间。我个人反倒觉得还是不要设置成 0 比较好,我们可以设置成一个较小的值。为什么呢?因为一旦设置成 0,当物理内存耗尽时,操作系统会触发 OOM killer 这个组件,它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用 swap 空间时,你至少能够观测到 Broker 性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。基于这个考虑,我个人建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1。

最后是提交时间或者说是 Flush 落盘时间。向 Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是 5 秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于 Kafka 在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。

Kafka 集群规划

操作系统

部署生产环境的 Kafka,强烈建议操作系统选用 Linux。

在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的快速数据传输特性。

Windows 平台上部署 Kafka 只适合于个人测试或用于功能验证,千万不要应用于生产环境。

磁盘

Kafka 集群部署选择普通的机械磁盘还是固态硬盘?前者成本低且容量大,但易损坏;后者性能优势大,不过单价高。

结论是:使用普通机械硬盘即可

Kafka 采用顺序读写操作,一定程度上规避了机械磁盘最大的劣势,即随机读写操作慢。从这一点上来说,使用 SSD 似乎并没有太大的性能优势,毕竟从性价比上来说,机械磁盘物美价廉,而它因易损坏而造成的可靠性差等缺陷,又由 Kafka 在软件层面提供机制来保证,故使用普通机械磁盘是很划算的。

带宽

大部分公司使用普通的以太网络,千兆网络(1Gbps)应该是网络的标准配置。

通常情况下你只能假设 Kafka 会用到 70% 的带宽资源,因为总要为其他应用或进程留一些资源。此外,通常要再额外预留出 2/3 的资源,因为不能让带宽资源总是保持在峰值。

基于以上原因,一个 Kafka 集群数量的大致推算公式如下:

1
Kafka 机器数 = 单位时间需要处理的总数据量 / 单机所占用带宽

参考资料

ZooKeeper 原理

ZooKeeper 是 Apache 的顶级项目。ZooKeeper 为分布式应用提供了高效且可靠的分布式协调服务,提供了诸如统一命名服务、配置管理和分布式锁等分布式的基础服务。在解决分布式数据一致性方面,ZooKeeper 并没有直接采用 Paxos 算法,而是采用了名为 ZAB 的一致性协议

ZooKeeper 主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储。但是 ZooKeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控存储数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理

很多大名鼎鼎的框架都基于 ZooKeeper 来实现分布式高可用,如:Dubbo、Kafka 等。

ZooKeeper 官方支持 Java 和 C 的 Client API。ZooKeeper 社区为大多数语言(.NET,python 等)提供非官方 API。

ZooKeeper 简介

ZooKeeper 是什么

ZooKeeper 是 Apache 的顶级项目。ZooKeeper 为分布式应用提供了高效且可靠的分布式协调服务,提供了诸如统一命名服务、配置管理和分布式锁等分布式的基础服务。在解决分布式数据一致性方面,ZooKeeper 并没有直接采用 Paxos 算法,而是采用了名为 ZAB 的一致性协议

ZooKeeper 主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储。但是 ZooKeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控存储数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理

很多大名鼎鼎的框架都基于 ZooKeeper 来实现分布式高可用,如:Dubbo、Kafka 等。

ZooKeeper 的特性

ZooKeeper 具有以下特性:

  • 顺序一致性:所有客户端看到的服务端数据模型都是一致的;从一个客户端发起的事务请求,最终都会严格按照其发起顺序被应用到 ZooKeeper 中。具体的实现可见:原子广播
  • 原子性 - 所有事务请求的处理结果在整个集群中所有机器上的应用情况是一致的,即整个集群要么都成功应用了某个事务,要么都没有应用。 实现方式可见:事务
  • 单一视图 - 无论客户端连接的是哪个 Zookeeper 服务器,其看到的服务端数据模型都是一致的。
  • 高性能 - ZooKeeper 将数据全量存储在内存中,所以其性能很高。需要注意的是:由于 ZooKeeper 的所有更新和删除都是基于事务的,因此 ZooKeeper 在读多写少的应用场景中有性能表现较好,如果写操作频繁,性能会大大下滑
  • 高可用 - ZooKeeper 的高可用是基于副本机制实现的,此外 ZooKeeper 支持故障恢复,可见:选举 Leader

ZooKeeper 的应用场景

  • 配置管理
    • 集群节点可以通过中心源获取启动配置
    • 更简单的部署
  • 分布式集群管理
    • 节点加入/离开
    • 节点的实时状态
  • 命名服务,如:DNS
  • 分布式同步:如锁、栅栏、队列
  • 分布式系统的选主
  • 中心化和高可靠的数据注册

ZooKeeper 的设计目标

  • 简单的数据模型:ZooKeeper 的数据模型是一个树形结构的文件系统,树中的节点被称为 **znode**。
  • 可以构建集群:ZooKeeper 支持集群模式,可以通过伸缩性,来控制集群的吞吐量。需要注意的是:由于 ZooKeeper 采用一主多从架构,所以其写性能是有上限的,比较适合于读多写少的场景。
  • 顺序访问:对于来自客户端的每个更新请求,Zookeeper 都会分配一个全局唯一的递增 ID,这个 ID 反映了所有事务请求的先后顺序。
  • 高性能、高可用:ZooKeeper 将数据存全量储在内存中以保持高性能,并通过服务集群来实现高可用,由于 Zookeeper 的所有更新和删除都是基于事务的,所以其在读多写少的应用场景中有着很高的性能表现。

ZooKeeper 核心概念

服务

Zookeeper 服务是一个基于主从复制的高可用集群,集群中每个节点都存储了一份数据副本(内存中)。

客户端只会连接一个 ZooKeeper 服务器节点,并维持 TCP 连接。

数据模型

ZooKeeper 的数据模型是一个树形结构的文件系统

树中的节点被称为 znode,其中根节点为 /,每个节点上都会保存自己的数据和节点信息。znode 可以用于存储数据,并且有一个与之相关联的 ACL(详情可见 ACL)。ZooKeeper 的设计目标是实现协调服务,而不是真的作为一个文件存储,因此 znode 存储数据的大小被限制在 1MB 以内

ZooKeeper 的数据访问具有原子性。其读写操作都是要么全部成功,要么全部失败。

znode 通过路径被引用。znode 节点路径必须是绝对路径

znode 有两种类型:

  • 临时的( EPHEMERAL - 户端会话结束时,ZooKeeper 就会删除临时的 znode。不允许有子节点。
  • 持久的(PERSISTENT - 除非客户端主动执行删除操作,否则 ZooKeeper 不会删除持久的 znode。

节点信息

znode 上有一个顺序标志( SEQUENTIAL。如果在创建 znode 时,设置了顺序标志( SEQUENTIAL,那么 ZooKeeper 会使用计数器为 znode 添加一个单调递增的数值,即 zxid。ZooKeeper 正是利用 zxid 实现了严格的顺序访问控制能力。

每个 znode 节点在存储数据的同时,都会维护一个叫做 Stat 的数据结构,里面存储了关于该节点的全部状态信息。如下:

状态属性 说明
czxid 数据节点创建时的事务 ID
ctime 数据节点创建时的时间
mzxid 数据节点最后一次更新时的事务 ID
mtime 数据节点最后一次更新时的时间
pzxid 数据节点的子节点最后一次被修改时的事务 ID
cversion 子节点的更改次数
version 节点数据的更改次数
aversion 节点的 ACL 的更改次数
ephemeralOwner 如果节点是临时节点,则表示创建该节点的会话的 SessionID;如果节点是持久节点,则该属性值为 0
dataLength 数据内容的长度
numChildren 数据节点当前的子节点个数

集群角色

Zookeeper 集群是一个基于主从复制的高可用集群,集群中每个节点都存储了一份数据副本(内存中)。此外,每个服务器节点承担如下三种角色中的一种:

  • Leader - 它负责 发起并维护与各 Follwer 及 Observer 间的心跳。所有的写操作必须要通过 Leader 完成再由 Leader 将写操作广播给其它服务器。一个 Zookeeper 集群同一时间只会有一个实际工作的 Leader。
  • Follower - 它会响应 Leader 的心跳。Follower 可直接处理并返回客户端的读请求,同时会将写请求转发给 Leader 处理,并且负责在 Leader 处理写请求时对请求进行投票。一个 Zookeeper 集群可能同时存在多个 Follower。
  • Observer - 角色与 Follower 类似,但是无投票权。

客户端可以从任意 ZooKeeper 服务器节点读取数据,但只能通过 Leader 服务写数据并需要半数以上 Follower 的 ACK,才算写入成功。记住这个重要的知识点,下文会详细讲述。

ACL

ZooKeeper 采用 ACL(Access Control Lists)策略来进行权限控制

每个 znode 创建时都会带有一个 ACL 列表,用于决定谁可以对它执行何种操作。

ACL 依赖于 ZooKeeper 的客户端认证机制。ZooKeeper 提供了以下几种认证方式:

  • digest - 用户名和密码 来识别客户端
  • sasl - 通过 kerberos 来识别客户端
  • ip - 通过 IP 来识别客户端

ZooKeeper 定义了如下五种权限:

  • CREATE - 允许创建子节点;
  • READ - 允许从节点获取数据并列出其子节点;
  • WRITE - 允许为节点设置数据;
  • DELETE - 允许删除子节点;
  • ADMIN - 允许为节点设置权限。

ZooKeeper 工作原理

读操作

Leader/Follower/Observer 都可直接处理读请求,从本地内存中读取数据并返回给客户端即可

由于处理读请求不需要服务器之间的交互,Follower/Observer 越多,整体系统的读请求吞吐量越大,也即读性能越好。

写操作

所有的写请求实际上都要交给 Leader 处理。Leader 将写请求以事务形式发给所有 Follower 并等待 ACK,一旦收到半数以上 Follower 的 ACK,即认为写操作成功。

写 Leader

由上图可见,通过 Leader 进行写操作,主要分为五步:

  1. 客户端向 Leader 发起写请求
  2. Leader 将写请求以事务 Proposal 的形式发给所有 Follower 并等待 ACK
  3. Follower 收到 Leader 的事务 Proposal 后返回 ACK
  4. Leader 得到过半数的 ACK(Leader 对自己默认有一个 ACK)后向所有的 Follower 和 Observer 发送 Commmit
  5. Leader 将处理结果返回给客户端

注意

  • Leader 不需要得到 Observer 的 ACK,即 Observer 无投票权。
  • Leader 不需要得到所有 Follower 的 ACK,只要收到过半的 ACK 即可,同时 Leader 本身对自己有一个 ACK。上图中有 4 个 Follower,只需其中两个返回 ACK 即可,因为 $$(2+1) / (4+1) > 1/2$$ 。
  • Observer 虽然无投票权,但仍须同步 Leader 的数据从而在处理读请求时可以返回尽可能新的数据。

写 Follower/Observer

  • Follower/Observer 均可接受写请求,但不能直接处理,而需要将写请求转发给 Leader 处理。
  • 除了多了一步请求转发,其它流程与直接写 Leader 无任何区别。

事务

对于来自客户端的每个更新请求,ZooKeeper 具备严格的顺序访问控制能力。

为了保证事务的顺序一致性,ZooKeeper 采用了递增的事务 id 号(zxid)来标识事务

Leader 服务会为每一个 Follower 服务器分配一个单独的队列,然后将事务 Proposal 依次放入队列中,并根据 FIFO(先进先出) 的策略进行消息发送。Follower 服务在接收到 Proposal 后,会将其以事务日志的形式写入本地磁盘中,并在写入成功后反馈给 Leader 一个 Ack 响应。当 Leader 接收到超过半数 Follower 的 Ack 响应后,就会广播一个 Commit 消息给所有的 Follower 以通知其进行事务提交,之后 Leader 自身也会完成对事务的提交。而每一个 Follower 则在接收到 Commit 消息后,完成事务的提交。

所有的提议(**proposal**)都在被提出的时候加上了 zxid。zxid 是一个 64 位的数字,它的高 32 位是 epoch 用来标识 Leader 关系是否改变,每次一个 Leader 被选出来,它都会有一个新的 epoch,标识当前属于那个 leader 的统治时期。低 32 位用于递增计数。

详细过程如下:

  1. Leader 等待 Server 连接;
  2. Follower 连接 Leader,将最大的 zxid 发送给 Leader;
  3. Leader 根据 Follower 的 zxid 确定同步点;
  4. 完成同步后通知 follower 已经成为 uptodate 状态;
  5. Follower 收到 uptodate 消息后,又可以重新接受 client 的请求进行服务了。

观察

ZooKeeper 允许客户端监听它关心的 znode,当 znode 状态发生变化(数据变化、子节点增减变化)时,ZooKeeper 服务会通知客户端

客户端和服务端保持连接一般有两种形式:

  • 客户端向服务端不断轮询
  • 服务端向客户端推送状态

Zookeeper 的选择是服务端主动推送状态,也就是观察机制( Watch )。

ZooKeeper 的观察机制允许用户在指定节点上针对感兴趣的事件注册监听,当事件发生时,监听器会被触发,并将事件信息推送到客户端。

  • 监听器实时触发
  • 监听器总是有序的
  • 创建新的 znode 数据前,客户端就能收到监听事件。

客户端使用 getData 等接口获取 znode 状态时传入了一个用于处理节点变更的回调,那么服务端就会主动向客户端推送节点的变更:

1
public byte[] getData(final String path, Watcher watcher, Stat stat)

从这个方法中传入的 Watcher 对象实现了相应的 process 方法,每次对应节点出现了状态的改变,WatchManager 都会通过以下的方式调用传入 Watcher 的方法:

1
2
3
4
5
6
7
8
9
10
11
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
Set<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
}
for (Watcher w : watchers) {
w.process(e);
}
return watchers;
}

Zookeeper 中的所有数据其实都是由一个名为 DataTree 的数据结构管理的,所有的读写数据的请求最终都会改变这颗树的内容,在发出读请求时可能会传入 Watcher 注册一个回调函数,而写请求就可能会触发相应的回调,由 WatchManager 通知客户端数据的变化。

通知机制的实现其实还是比较简单的,通过读请求设置 Watcher 监听事件,写请求在触发事件时就能将通知发送给指定的客户端。

会话

ZooKeeper 客户端通过 TCP 长连接连接到 ZooKeeper 服务集群会话 (Session) 从第一次连接开始就已经建立,之后通过心跳检测机制来保持有效的会话状态。通过这个连接,客户端可以发送请求并接收响应,同时也可以接收到 Watch 事件的通知。

每个 ZooKeeper 客户端配置中都配置了 ZooKeeper 服务器集群列表。启动时,客户端会遍历列表去尝试建立连接。如果失败,它会尝试连接下一个服务器,依次类推。

一旦一台客户端与一台服务器建立连接,这台服务器会为这个客户端创建一个新的会话。每个会话都会有一个超时时间,若服务器在超时时间内没有收到任何请求,则相应会话被视为过期。一旦会话过期,就无法再重新打开,且任何与该会话相关的临时 znode 都会被删除。

通常来说,会话应该长期存在,而这需要由客户端来保证。客户端可以通过心跳方式(ping)来保持会话不过期。

ZooKeeper 的会话具有四个属性:

  • sessionID - 会话 ID,唯一标识一个会话,每次客户端创建新的会话时,Zookeeper 都会为其分配一个全局唯一的 sessionID。
  • TimeOut - 会话超时时间,客户端在构造 Zookeeper 实例时,会配置 sessionTimeout 参数用于指定会话的超时时间,Zookeeper 客户端向服务端发送这个超时时间后,服务端会根据自己的超时时间限制最终确定会话的超时时间。
  • TickTime - 下次会话超时时间点,为了便于 Zookeeper 对会话实行”分桶策略”管理,同时为了高效低耗地实现会话的超时检查与清理,Zookeeper 会为每个会话标记一个下次会话超时时间点,其值大致等于当前时间加上 TimeOut。
  • isClosing - 标记一个会话是否已经被关闭,当服务端检测到会话已经超时失效时,会将该会话的 isClosing 标记为”已关闭”,这样就能确保不再处理来自该会话的心情求了。

Zookeeper 的会话管理主要是通过 SessionTracker 来负责,其采用了分桶策略(将类似的会话放在同一区块中进行管理)进行管理,以便 Zookeeper 对会话进行不同区块的隔离处理以及同一区块的统一处理。

ZAB 协议

ZooKeeper 并没有直接采用 Paxos 算法,而是采用了名为 ZAB 的一致性协议。**ZAB 协议不是 Paxos 算法**,只是比较类似,二者在操作上并不相同。Multi-Paxos 实现的是一系列值的共识,不关心最终达成共识的值是什么,不关心各值的顺序。而 ZooKeeper 需要确保操作的顺序性。

ZAB 协议是 Zookeeper 专门设计的一种支持崩溃恢复的原子广播协议

ZAB 协议是 ZooKeeper 的数据一致性和高可用解决方案。

ZAB 协议定义了两个可以无限循环的流程:

  • 选举 Leader - 用于故障恢复,从而保证高可用。
  • 原子广播 - 用于主从同步,从而保证数据一致性。

选举 Leader

ZooKeeper 的故障恢复

ZooKeeper 集群采用一主(称为 Leader)多从(称为 Follower)模式,主从节点通过副本机制保证数据一致。

  • 如果 Follower 节点挂了 - ZooKeeper 集群中的每个节点都会单独在内存中维护自身的状态,并且各节点之间都保持着通讯,只要集群中有半数机器能够正常工作,那么整个集群就可以正常提供服务
  • 如果 Leader 节点挂了 - 如果 Leader 节点挂了,系统就不能正常工作了。此时,需要通过 ZAB 协议的选举 Leader 机制来进行故障恢复。

ZAB 协议的选举 Leader 机制简单来说,就是:基于过半选举机制产生新的 Leader,之后其他机器将从新的 Leader 上同步状态,当有过半机器完成状态同步后,就退出选举 Leader 模式,进入原子广播模式。

术语

  • myid - 每个 Zookeeper 服务器,都需要在数据文件夹下创建一个名为 myid 的文件,该文件包含整个 Zookeeper 集群唯一的 ID(整数)。
  • zxid - 类似于 RDBMS 中的事务 ID,用于标识一次更新操作的 Proposal ID。为了保证顺序性,该 zxid 必须单调递增。因此 Zookeeper 使用一个 64 位的数来表示,高 32 位是 Leader 的 epoch,从 1 开始,每次选出新的 Leader,epoch 加一。低 32 位为该 epoch 内的序号,每次 epoch 变化,都将低 32 位的序号重置。这样保证了 zxid 的全局递增性。

服务器状态

  • LOOKING - 不确定 Leader 状态。该状态下的服务器认为当前集群中没有 Leader,会发起 Leader 选举
  • FOLLOWING - 跟随者状态。表明当前服务器角色是 Follower,并且它知道 Leader 是谁
  • LEADING - 领导者状态。表明当前服务器角色是 Leader,它会维护与 Follower 间的心跳
  • OBSERVING - 观察者状态。表明当前服务器角色是 Observer,与 Folower 唯一的不同在于不参与选举,也不参与集群写操作时的投票

选票数据结构

每个服务器在进行领导选举时,会发送如下关键信息

  • logicClock - 每个服务器会维护一个自增的整数,名为 logicClock,它表示这是该服务器发起的第多少轮投票
  • state - 当前服务器的状态
  • self_id - 当前服务器的 myid
  • self_zxid - 当前服务器上所保存的数据的最大 zxid
  • vote_id - 被推举的服务器的 myid
  • vote_zxid - 被推举的服务器上所保存的数据的最大 zxid

投票流程

(1)自增选举轮次 - Zookeeper 规定所有有效的投票都必须在同一轮次中。每个服务器在开始新一轮投票时,会先对自己维护的 logicClock 进行自增操作。

(2)初始化选票 - 每个服务器在广播自己的选票前,会将自己的投票箱清空。该投票箱记录了所收到的选票。例:服务器 2 投票给服务器 3,服务器 3 投票给服务器 1,则服务器 1 的投票箱为(2, 3), (3, 1), (1, 1)。票箱中只会记录每一投票者的最后一票,如投票者更新自己的选票,则其它服务器收到该新选票后会在自己票箱中更新该服务器的选票。

(3)发送初始化选票 - 每个服务器最开始都是通过广播把票投给自己。

(4)接收外部投票 - 服务器会尝试从其它服务器获取投票,并记入自己的投票箱内。如果无法获取任何外部投票,则会确认自己是否与集群中其它服务器保持着有效连接。如果是,则再次发送自己的投票;如果否,则马上与之建立连接。

(5)判断选举轮次 - 收到外部投票后,首先会根据投票信息中所包含的 logicClock 来进行不同处理

  • 外部投票的 logicClock 大于自己的 logicClock。说明该服务器的选举轮次落后于其它服务器的选举轮次,立即清空自己的投票箱并将自己的 logicClock 更新为收到的 logicClock,然后再对比自己之前的投票与收到的投票以确定是否需要变更自己的投票,最终再次将自己的投票广播出去。
  • 外部投票的 logicClock 小于自己的 logicClock。当前服务器直接忽略该投票,继续处理下一个投票。
  • 外部投票的 logickClock 与自己的相等。当时进行选票 PK。

(6)选票 PK - 选票 PK 是基于(self_id, self_zxid)(vote_id, vote_zxid) 的对比

  • 外部投票的 logicClock 大于自己的 logicClock,则将自己的 logicClock 及自己的选票的 logicClock 变更为收到的 logicClock
  • 若 logicClock 一致,则对比二者的 vote_zxid,若外部投票的 vote_zxid 比较大,则将自己的票中的 vote_zxid 与 vote_myid 更新为收到的票中的 vote_zxid 与 vote_myid 并广播出去,另外将收到的票及自己更新后的票放入自己的票箱。如果票箱内已存在(self_myid, self_zxid)相同的选票,则直接覆盖
  • 若二者 vote_zxid 一致,则比较二者的 vote_myid,若外部投票的 vote_myid 比较大,则将自己的票中的 vote_myid 更新为收到的票中的 vote_myid 并广播出去,另外将收到的票及自己更新后的票放入自己的票箱

(7)统计选票 - 如果已经确定有过半服务器认可了自己的投票(可能是更新后的投票),则终止投票。否则继续接收其它服务器的投票。

(8)更新服务器状态 - 投票终止后,服务器开始更新自身状态。若过半的票投给了自己,则将自己的服务器状态更新为 LEADING,否则将自己的状态更新为 FOLLOWING

通过以上流程分析,我们不难看出:要使 Leader 获得多数 Server 的支持,则 ZooKeeper 集群节点数必须是奇数。且存活的节点数目不得少于 N + 1

每个 Server 启动后都会重复以上流程。在恢复模式下,如果是刚从崩溃状态恢复的或者刚启动的 server 还会从磁盘快照中恢复数据和会话信息,zk 会记录事务日志并定期进行快照,方便在恢复时进行状态恢复。

原子广播(Atomic Broadcast)

ZooKeeper 通过副本机制来实现高可用

那么,ZooKeeper 是如何实现副本机制的呢?答案是:ZAB 协议的原子广播。

ZAB 协议的原子广播要求:

**所有的写请求都会被转发给 Leader,Leader 会以原子广播的方式通知 Follow。当半数以上的 Follow 已经更新状态持久化后,Leader 才会提交这个更新,然后客户端才会收到一个更新成功的响应**。这有些类似数据库中的两阶段提交协议。

在整个消息的广播过程中,Leader 服务器会每个事务请求生成对应的 Proposal,并为其分配一个全局唯一的递增的事务 ID(ZXID),之后再对其进行广播。

ZAB 是通过“一切以领导者为准”的强领导者模型和严格按照顺序提交日志,来实现操作的顺序性的,这一点和 Raft 是一样的。

ZooKeeper 应用

ZooKeeper 可以用于发布/订阅、负载均衡、命令服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能

命名服务

在分布式系统中,通常需要一个全局唯一的名字,如生成全局唯一的订单号等,ZooKeeper 可以通过顺序节点的特性来生成全局唯一 ID,从而可以对分布式系统提供命名服务。

配置管理

利用 ZooKeeper 的观察机制,可以将其作为一个高可用的配置存储器,允许分布式应用的参与者检索和更新配置文件。

分布式锁

可以通过 ZooKeeper 的临时节点和 Watcher 机制来实现分布式排它锁。

举例来说,有一个分布式系统,有三个节点 A、B、C,试图通过 ZooKeeper 获取分布式锁。

(1)访问 /lock (这个目录路径由程序自己决定),创建 带序列号的临时节点(EPHEMERAL)

(2)每个节点尝试获取锁时,拿到 /locks节点下的所有子节点(id_0000,id_0001,id_0002),判断自己创建的节点是不是序列号最小的

  • 如果序列号是最小的,则成功获取到锁。
    • 释放锁:执行完操作后,把创建的节点给删掉。
  • 如果不是,则监听比自己要小 1 的节点变化。

(3)释放锁,即删除自己创建的节点。

图中,NodeA 删除自己创建的节点 id_0000,NodeB 监听到变化,发现自己的节点已经是最小节点,即可获取到锁。

集群管理

ZooKeeper 还能解决大多数分布式系统中的问题:

  • 如可以通过创建临时节点来建立心跳检测机制。如果分布式系统的某个服务节点宕机了,则其持有的会话会超时,此时该临时节点会被删除,相应的监听事件就会被触发。
  • 分布式系统的每个服务节点还可以将自己的节点状态写入临时节点,从而完成状态报告或节点工作进度汇报。
  • 通过数据的订阅和发布功能,ZooKeeper 还能对分布式系统进行模块的解耦和任务的调度。
  • 通过监听机制,还能对分布式系统的服务节点进行动态上下线,从而实现服务的动态扩容。

选举 Leader 节点

分布式系统一个重要的模式就是主从模式 (Master/Salves),ZooKeeper 可以用于该模式下的 Matser 选举。可以让所有服务节点去竞争性地创建同一个 ZNode,由于 ZooKeeper 不能有路径相同的 ZNode,必然只有一个服务节点能够创建成功,这样该服务节点就可以成为 Master 节点。

队列管理

ZooKeeper 可以处理两种类型的队列:

  1. 当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达,这种是同步队列。
  2. 队列按照 FIFO 方式进行入队和出队操作,例如实现生产者和消费者模型。

同步队列用 ZooKeeper 实现的实现思路如下:

创建一个父目录 /synchronizing,每个成员都监控标志(Set Watch)位目录 /synchronizing/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /synchronizing/member_i 的临时目录节点,然后每个成员获取 /synchronizing 目录的所有目录节点,也就是 member_i。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /synchronizing/start 的出现,如果已经相等就创建 /synchronizing/start

ZooKeeper 的缺点

ZooKeeper 不是为高可用性设计的

生产环境中常常需要通过多机房部署来容灾。出于成本考虑,一般多机房都是同时提供服务的,即一个机房撑不住所有流量。ZooKeeper 集群只能有一个 Leader,一旦机房之间连接出现故障,那么只有 Leader 所在的机房可以正常工作,其他机房只能停摆。于是所有流量集中到 Leader 所在的机房,由于处理不过来而导致崩溃。

即使是在同一个机房里面,由于网段的不同,在调整机房交换机的时候偶尔也会发生网段隔离的情况。实际上机房每个月基本上都会发生短暂的网络隔离之类的子网段调整。在那个时刻 ZooKeeper 将处于不可用状态。如果业务系统重度依赖 ZooKeeper(比如用 Dubbo 作为 RPC,且使用 ZooKeeper 作为注册中心),则系统的可用性将非常脆弱。

由于 ZooKeeper 对于网络隔离的极度敏感,导致 ZooKeeper 对于网络的任何风吹草动都会做出激烈反应。这使得 ZooKeeper 的不可用时间比较多。我们不能让 ZooKeeper 的不可用,变成系统的不可用

ZooKeeper 的选举过程速度很慢

互联网环境中,网络不稳定几乎是必然的,而 ZooKeeper 网络隔离非常敏感。一旦出现网络隔离,zookeeper 就要发起选举流程。

ZooKeeper 的选举流程通常耗时 30 到 120 秒,期间 ZooKeeper 由于没有 Leader,都是不可用的。

对于网络里面偶尔出现的,比如半秒一秒的网络隔离,ZooKeeper 会由于选举过程,而把不可用时间放大几十倍。

ZooKeeper 的性能是有限的

典型的 ZooKeeper 的 TPS 大概是一万多,无法支撑每天动辄几十亿次的调用。因此,每次请求都去 ZooKeeper 获取业务系统信息是不可能的。

为此,ZooKeeper 的 client 必须自己缓存业务系统的信息。这就导致 ZooKeeper 提供的强一致性实际上是做不到的。如果我们需要强一致性,还需要其他机制来进行保障:比如用自动化脚本把业务系统的 old master 给 kill 掉,但是这可能会引发很多其他问题。

ZooKeeper 无法进行有效的权限控制

ZooKeeper 的权限控制非常弱。在大型的复杂系统里面,使用 ZooKeeper 必须自己再额外的开发一套权限控制系统,通过那套权限控制系统再访问 ZooKeeper。

额外的权限控制系统不但增加了系统复杂性和维护成本,而且降低了系统的总体性能。

即使有了 ZooKeeper 也很难避免业务系统的数据不一致

由于 ZooKeeper 的性能限制,我们无法让每次系统内部调用都走 ZooKeeper,因此总有某些时刻,业务系统会存在两份数据(业务系统 client 那边缓存的业务系统信息是定时从 ZooKeeper 更新的,因此会有更新不同步的问题)。

如果要保持数据的强一致性,唯一的方法是“先 kill 掉当前 Leader,再在 ZooKeeper 上更新 Leader 信息”。是否要 kill 掉当前 Leader 这个问题上,程序是无法完全自动决定的(因为网络隔离的时候 ZooKeeper 已经不可用了,自动脚本没有全局信息,不管怎么做都可能是错的,什么都不做也可能是错的。当网络故障的时候,只有运维人员才有全局信息,程序是无法得知其他机房的情况的)。因此系统无法自动的保障数据一致性,必须要人工介入。而人工介入的典型时间是半个小时以上,我们不能让系统这么长时间不可用。因此我们必须在某个方向上进行妥协,最常见的妥协方式是放弃强一致性,而接受最终一致性

如果我们需要人工介入才能保证可靠的强一致性,那么 ZooKeeper 的价值就大打折扣。

参考资料

ZooKeeper 面试

ZooKeeper 简介

【基础】什么是 ZooKeeper?

:::details 要点

Zookeeper 是一个开源的分布式协调服务,目前由 Apache 进行维护。Zookeeper 可以用于实现分布式系统中常见的发布/订阅、负载均衡、命令服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能。

Zookeeper 具有以下特性:

  • 顺序一致性:从一个客户端发起的事务请求,最终都会严格按照其发起顺序被应用到 Zookeeper 中;
  • 原子性:所有事务请求的处理结果在整个集群中所有机器上都是一致的;不存在部分机器应用了该事务,而另一部分没有应用的情况;
  • 单一视图:所有客户端看到的服务端数据模型都是一致的;
  • 可靠性:一旦服务端成功应用了一个事务,则其引起的改变会一直保留,直到被另外一个事务所更改;
  • 实时性:一旦一个事务被成功应用后,Zookeeper 可以保证客户端立即可以读取到这个事务变更后的最新状态的数据。

:::

【基础】ZooKeeper 中有哪些应用场景?

:::details 要点

ZooKeeper 可以用于发布/订阅、负载均衡、命令服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能

发布订阅

通过 Zookeeper 进行数据的发布与订阅其实可以说是它提供的最基本功能,它能够允许多个客户端同时订阅某一个节点的变更并在变更发生时执行我们预先设置好的回调函数,在运行时改变服务的配置和行为:

1
2
3
4
5
6
7
8
9
ZooKeeper zk = new ZooKeeper("localhost", 3000, null);
zk.getData("/config", new Watcher() {
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent.toString());
}
}, null);
zk.setData("/config", "draven".getBytes(), 0);

// WatchedEvent state:SyncConnected type:NodeDataChanged path:/config

发布与订阅是 Zookeeper 提供的一个最基本的功能,它的使用非常的简单,我们可以在 getData 中传入实现 process 方法的 Watcher 对象,在每次改变节点的状态时,process 方法都会被调用,在这个方法中就可以对变更进行响应动态修改一些行为。

zookeeper-pubsub

通过 Zookeeper 这个中枢,每一个客户端对节点状态的改变都能够推送给节点的订阅者,在发布订阅模型中,Zookeeper 的每一个节点都可以被理解成一个主题,每一个客户端都可以向这个主题推送详细,同时也可以订阅这个主题中的消息;只是 Zookeeper 引入了文件系统的父子层级的概念将发布订阅功能实现得更加复杂。

1
2
3
4
5
6
7
public static enum EventType {
None(-1),
NodeCreated(1),
NodeDeleted(2),
NodeDataChanged(3),
NodeChildrenChanged(4);
}

如果我们订阅了一个节点的变更信息,那么该节点的子节点出现数量变更时就会调用 process 方法通知观察者,这也意味着更复杂的实现,同时和专门做发布订阅的中间件相比也没有性能优势,在海量推送的应用场景下,消息队列更能胜任,而 Zookeeper 更适合做一些类似服务配置的动态下发的工作。

命名服务

在分布式系统中,通常需要一个全局唯一的名字,如生成全局唯一的订单号等,ZooKeeper 可以通过顺序节点的特性来生成全局唯一 ID,从而可以对分布式系统提供命名服务。

配置管理

利用 ZooKeeper 的观察机制,可以将其作为一个高可用的配置存储器,允许分布式应用的参与者检索和更新配置文件。

分布式锁

可以通过 ZooKeeper 的临时节点和 Watcher 机制来实现分布式排它锁。

举例来说,有一个分布式系统,有三个节点 A、B、C,试图通过 ZooKeeper 获取分布式锁。

(1)访问 /lock (这个目录路径由程序自己决定),创建 带序列号的临时节点(EPHEMERAL)

(2)每个节点尝试获取锁时,拿到 /locks节点下的所有子节点(id_0000,id_0001,id_0002),判断自己创建的节点是不是序列号最小的

  • 如果序列号是最小的,则成功获取到锁。
    • 释放锁:执行完操作后,把创建的节点给删掉。
  • 如果不是,则监听比自己要小 1 的节点变化。

(3)释放锁,即删除自己创建的节点。

图中,NodeA 删除自己创建的节点 id_0000,NodeB 监听到变化,发现自己的节点已经是最小节点,即可获取到锁。

集群管理

ZooKeeper 还能解决大多数分布式系统中的协调问题:

  • 可以通过创建临时节点来建立心跳检测机制。如果分布式系统的某个服务节点宕机了,则其持有的会话会超时,此时该临时节点会被删除,相应的监听事件就会被触发。
  • 分布式系统的每个服务节点还可以将自己的节点状态写入临时节点,从而完成状态报告或节点工作进度汇报
  • 通过数据的订阅和发布功能,ZooKeeper 还能对分布式系统进行模块的解耦和任务的调度
  • 通过监听机制,还能对分布式系统的服务节点进行动态上下线,从而实现服务的动态扩容。

选举 Leader 节点

分布式系统一个重要的模式就是主从模式 (Leader/Followers),ZooKeeper 可以用于该模式下的 Leader 选举。可以让所有服务节点去竞争性地创建同一个 ZNode,由于 ZooKeeper 不能有路径相同的 ZNode,必然只有一个服务节点能够创建成功,这样该服务节点就可以成为 Leader 节点。

队列管理

ZooKeeper 可以处理两种类型的队列:

  1. 当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达,这种是同步队列。
  2. 队列按照 FIFO 方式进行入队和出队操作,例如实现生产者和消费者模型。

同步队列用 ZooKeeper 实现的实现思路如下:

创建一个父目录 /synchronizing,每个成员都监控标志(Set Watch)位目录 /synchronizing/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /synchronizing/member_i 的临时目录节点,然后每个成员获取 /synchronizing 目录的所有目录节点,也就是 member_i。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /synchronizing/start 的出现,如果已经相等就创建 /synchronizing/start

:::

ZooKeeper 存储

【基础】ZooKeeper 如何存储数据?

:::details 要点

ZooKeeper 采用类似于文件系统的层级结构存储数据

树中的节点被称为 znode,其中根节点为 /,每个节点上都会保存自己的数据和节点信息。znode 可以用于存储数据,并且有一个与之相关联的 ACL(详情可见 ACL)。ZooKeeper 的设计目标是实现协调服务,而不是真的作为一个文件存储,因此 znode 存储数据的大小被限制在 1MB 以内

ZooKeeper 的数据访问具有原子性。其读写操作都是要么全部成功,要么全部失败。

znode 通过路径被引用。znode 节点路径必须是绝对路径

:::

【基础】ZooKeeper 有几种节点类型?

:::details 要点

znode 其实有 PERSISTENTPERSISTENT_SEQUENTIALEPHEMERALEPHEMERAL_SEQUENTIAL 四种类型,它们是临时与持久、顺序与非顺序两个不同的方向组合成的四种类型。

临时节点是客户端在连接 Zookeeper 时才会保持存在的节点,一旦客户端和服务端之间的连接中断,当前连接持有的所有节点都会被删除,而持久的节点不会随着会话连接的中断而删除,它们需要被客户端主动删除;Zookeeper 中另一种节点的特性就是顺序和非顺序,如果我们使用 Zookeeper 创建了顺序的节点,那么所有节点就会在名字的末尾附加一个序列号,序列号是一个由父节点维护的单调递增计数器。

:::

ZooKeeper 架构

【中级】ZooKeeper 的设计目标是什么?

:::details 要点

Zookeeper 致力于为那些高吞吐的大型分布式系统提供一个高性能、高可用、且具有严格顺序访问控制能力的分布式协调服务。它具有以下四个目标:

目标一:简单的数据模型

Zookeeper 通过树形结构来存储数据,它由一系列被称为 znode 的数据节点组成,类似于常见的文件系统。不过和常见的文件系统不同,Zookeeper 将数据全量存储在内存中,以此来实现高吞吐,减少访问延迟。

img

目标二:构建集群

可以由一组 Zookeeper 服务构成 Zookeeper 集群,集群中每台机器都会单独在内存中维护自身的状态,并且每台机器之间都保持着通讯,只要集群中有半数机器能够正常工作,那么整个集群就可以正常提供服务。

img

目标三:顺序访问

对于来自客户端的每个更新请求,Zookeeper 都会分配一个全局唯一的递增 ID,这个 ID 反映了所有事务请求的先后顺序。

目标四:高性能高可用

ZooKeeper 将数据存全量储在内存中以保持高性能,并通过服务集群来实现高可用,由于 Zookeeper 的所有更新和删除都是基于事务的,所以其在读多写少的应用场景中有着很高的性能表现。

:::

【中级】ZooKeeper 集群有几种角色?

:::details 要点

Zookeeper 集群是一个基于主从复制的高可用集群,集群中每个节点都存储了一份数据副本(内存中)。此外,每个服务器节点承担如下三种角色中的一种:

  • Leader - 它负责 发起并维护与各 Follwer 及 Observer 间的心跳。所有的写操作必须要通过 Leader 完成再由 Leader 将写操作广播给其它服务器。一个 Zookeeper 集群同一时间只会有一个实际工作的 Leader。
  • Follower - 它会响应 Leader 的心跳。Follower 可直接处理并返回客户端的读请求,同时会将写请求转发给 Leader 处理,并且负责在 Leader 处理写请求时对请求进行投票。一个 Zookeeper 集群可能同时存在多个 Follower。
  • Observer - 角色与 Follower 类似,但是无投票权。

客户端可以从任意 ZooKeeper 服务器节点读取数据,但只能通过 Leader 服务写数据并需要半数以上 Follower 的 ACK,才算写入成功。记住这个重要的知识点,下文会详细讲述。

:::

【中级】ZooKeeper 的权限控制如何设计的?

:::details 要点

ZooKeeper 采用 ACL(Access Control Lists)策略来进行权限控制

每个 znode 创建时都会带有一个 ACL 列表,用于决定谁可以对它执行何种操作。

ACL 依赖于 ZooKeeper 的客户端认证机制。ZooKeeper 提供了以下几种认证方式:

  • digest - 用户名和密码 来识别客户端
  • sasl - 通过 kerberos 来识别客户端
  • ip - 通过 IP 来识别客户端

ZooKeeper 定义了如下五种权限:

  • CREATE - 允许创建子节点;
  • READ - 允许从节点获取数据并列出其子节点;
  • WRITE - 允许为节点设置数据;
  • DELETE - 允许删除子节点;
  • ADMIN - 允许为节点设置权限。

:::

【高级】ZooKeeper 的架构有什么缺点?

:::details 要点

ZooKeeper 不是为高可用性设计的

生产环境中常常需要通过多机房部署来容灾。出于成本考虑,一般多机房都是同时提供服务的,即一个机房撑不住所有流量。ZooKeeper 集群只能有一个 Leader,一旦机房之间连接出现故障,那么只有 Leader 所在的机房可以正常工作,其他机房只能停摆。于是所有流量集中到 Leader 所在的机房,由于处理不过来而导致崩溃。

即使是在同一个机房里面,由于网段的不同,在调整机房交换机的时候偶尔也会发生网段隔离的情况。实际上机房每个月基本上都会发生短暂的网络隔离之类的子网段调整。在那个时刻 ZooKeeper 将处于不可用状态。如果业务系统重度依赖 ZooKeeper(比如用 Dubbo 作为 RPC,且使用 ZooKeeper 作为注册中心),则系统的可用性将非常脆弱。

由于 ZooKeeper 对于网络隔离的极度敏感,导致 ZooKeeper 对于网络的任何风吹草动都会做出激烈反应。这使得 ZooKeeper 的不可用时间比较多。我们不能让 ZooKeeper 的不可用,变成系统的不可用

ZooKeeper 的选举过程速度很慢

互联网环境中,网络不稳定几乎是必然的,而 ZooKeeper 网络隔离非常敏感。一旦出现网络隔离,zookeeper 就要发起选举流程。

ZooKeeper 的选举流程通常耗时 30 到 120 秒,期间 ZooKeeper 由于没有 Leader,都是不可用的

对于网络里面偶尔出现的,比如半秒一秒的网络隔离,ZooKeeper 会由于选举过程,而把不可用时间放大几十倍。

ZooKeeper 的性能是有限的

典型的 ZooKeeper 的 TPS 大概是一万多,无法支撑每天动辄几十亿次的调用。因此,每次请求都去 ZooKeeper 获取业务系统信息是不可能的。

为此,ZooKeeper 的 client 必须自己缓存业务系统的信息。这就导致 ZooKeeper 提供的强一致性实际上是做不到的。如果我们需要强一致性,还需要其他机制来进行保障:比如用自动化脚本把业务系统的 old master 给 kill 掉,但是这可能会引发很多其他问题。

ZooKeeper 无法进行有效的权限控制

ZooKeeper 的权限控制非常弱。在大型的复杂系统里面,使用 ZooKeeper 必须自己再额外的开发一套权限控制系统,通过那套权限控制系统再访问 ZooKeeper。

额外的权限控制系统不但增加了系统复杂性和维护成本,而且降低了系统的总体性能。

即使有了 ZooKeeper 也很难避免业务系统的数据不一致

由于 ZooKeeper 的性能限制,我们无法让每次系统内部调用都走 ZooKeeper,因此总有某些时刻,业务系统会存在两份数据(业务系统 client 那边缓存的业务系统信息是定时从 ZooKeeper 更新的,因此会有更新不同步的问题)。

如果要保持数据的强一致性,唯一的方法是“先 kill 掉当前 Leader,再在 ZooKeeper 上更新 Leader 信息”。是否要 kill 掉当前 Leader 这个问题上,程序是无法完全自动决定的(因为网络隔离的时候 ZooKeeper 已经不可用了,自动脚本没有全局信息,不管怎么做都可能是错的,什么都不做也可能是错的。当网络故障的时候,只有运维人员才有全局信息,程序是无法得知其他机房的情况的)。因此系统无法自动的保障数据一致性,必须要人工介入。而人工介入的典型时间是半个小时以上,我们不能让系统这么长时间不可用。因此我们必须在某个方向上进行妥协,最常见的妥协方式是放弃强一致性,而接受最终一致性

如果我们需要人工介入才能保证可靠的强一致性,那么 ZooKeeper 的价值就大打折扣。

:::

ZooKeeper 工作流

【中级】ZooKeeper 读操作工作流程是怎样的?

:::details 要点

Leader/Follower/Observer 都可直接处理读请求,从本地内存中读取数据并返回给客户端即可

由于处理读请求不需要服务器之间的交互,Follower/Observer 越多,整体系统的读请求吞吐量越大,也即读性能越好。

:::

【中级】ZooKeeper 写操作工作流程是怎样的?

:::details 要点

所有的写请求实际上都要交给 Leader 处理。Leader 将写请求以事务形式发给所有 Follower 并等待 ACK,一旦收到半数以上 Follower 的 ACK,即认为写操作成功。

写 Leader

由上图可见,通过 Leader 进行写操作,主要分为五步:

  1. 客户端向 Leader 发起写请求
  2. Leader 将写请求以事务 Proposal 的形式发给所有 Follower 并等待 ACK
  3. Follower 收到 Leader 的事务 Proposal 后返回 ACK
  4. Leader 得到过半数的 ACK(Leader 对自己默认有一个 ACK)后向所有的 Follower 和 Observer 发送 Commmit
  5. Leader 将处理结果返回给客户端

注意

  • Leader 不需要得到 Observer 的 ACK,即 Observer 无投票权。
  • Leader 不需要得到所有 Follower 的 ACK,只要收到过半的 ACK 即可,同时 Leader 本身对自己有一个 ACK。上图中有 4 个 Follower,只需其中两个返回 ACK 即可,因为 $$(2+1) / (4+1) > 1/2$$ 。
  • Observer 虽然无投票权,但仍须同步 Leader 的数据从而在处理读请求时可以返回尽可能新的数据。

写 Follower/Observer

  • Follower/Observer 均可接受写请求,但不能直接处理,而需要将写请求转发给 Leader 处理。
  • 除了多了一步请求转发,其它流程与直接写 Leader 无任何区别。

:::

【中级】ZooKeeper 事务机制是怎样的?

:::details 要点

对于来自客户端的每个更新请求,ZooKeeper 具备严格的顺序访问控制能力。

为了保证事务的顺序一致性,ZooKeeper 采用了递增的事务 id 号(zxid)来标识事务

Leader 服务会为每一个 Follower 服务器分配一个单独的队列,然后将事务 Proposal 依次放入队列中,并根据 FIFO(先进先出) 的策略进行消息发送。Follower 服务在接收到 Proposal 后,会将其以事务日志的形式写入本地磁盘中,并在写入成功后反馈给 Leader 一个 Ack 响应。当 Leader 接收到超过半数 Follower 的 Ack 响应后,就会广播一个 Commit 消息给所有的 Follower 以通知其进行事务提交,之后 Leader 自身也会完成对事务的提交。而每一个 Follower 则在接收到 Commit 消息后,完成事务的提交。

所有的提议(**proposal**)都在被提出的时候加上了 zxid。zxid 是一个 64 位的数字,它的高 32 位是 epoch 用来标识 Leader 关系是否改变,每次一个 Leader 被选出来,它都会有一个新的 epoch,标识当前属于那个 leader 的统治时期。低 32 位用于递增计数。

详细过程如下:

  1. Leader 等待 Server 连接;
  2. Follower 连接 Leader,将最大的 zxid 发送给 Leader;
  3. Leader 根据 Follower 的 zxid 确定同步点;
  4. 完成同步后通知 follower 已经成为 uptodate 状态;
  5. Follower 收到 uptodate 消息后,又可以重新接受 client 的请求进行服务了。

:::

【中级】ZooKeeper 监听机制是怎样的?

:::details 要点

ZooKeeper 允许客户端监听它关心的 znode,当 znode 状态发生变化(数据变化、子节点增减变化)时,ZooKeeper 服务会通知客户端

需要注意的是:ZooKeeper 的监听通知是一次性的。无论是服务端还是客户端,一旦一个 Watcher 被触发,Zookeeper 都会将其从相应的存储中移除。这样的设计有效的减轻了服务端的压力,不然对于更新非常频繁的节点,服务端会不断的向客户端发送事件通知,无论对于网络还是服务端的压力都非常大。

客户端和服务端保持连接一般有两种形式:

  • 客户端向服务端不断轮询
  • 服务端向客户端推送状态

Zookeeper 的选择是服务端主动推送状态,也就是观察机制( Watch

ZooKeeper 的观察机制允许用户在指定节点上针对感兴趣的事件注册监听,当事件发生时,监听器会被触发,并将事件信息推送到客户端。

  • 监听器实时触发
  • 监听器总是有序的
  • 创建新的 znode 数据前,客户端就能收到监听事件。

客户端使用 getData 等接口获取 znode 状态时传入了一个用于处理节点变更的回调,那么服务端就会主动向客户端推送节点的变更:

1
public byte[] getData(final String path, Watcher watcher, Stat stat)

从这个方法中传入的 Watcher 对象实现了相应的 process 方法,每次对应节点出现了状态的改变,WatchManager 都会通过以下的方式调用传入 Watcher 的方法:

1
2
3
4
5
6
7
8
9
10
11
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
Set<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
}
for (Watcher w : watchers) {
w.process(e);
}
return watchers;
}

Zookeeper 中的所有数据其实都是由一个名为 DataTree 的数据结构管理的,所有的读写数据的请求最终都会改变这颗树的内容,在发出读请求时可能会传入 Watcher 注册一个回调函数,而写请求就可能会触发相应的回调,由 WatchManager 通知客户端数据的变化。

通知机制的实现其实还是比较简单的,通过读请求设置 Watcher 监听事件,写请求在触发事件时就能将通知发送给指定的客户端。

:::

【中级】ZooKeeper 会话机制是怎样的?

:::details 要点

ZooKeeper 客户端通过 TCP 长连接连接到 ZooKeeper 服务集群会话 (Session) 从第一次连接开始就已经建立,之后通过心跳检测机制来保持有效的会话状态。通过这个连接,客户端可以发送请求并接收响应,同时也可以接收到 Watch 事件的通知。

每个 ZooKeeper 客户端配置中都配置了 ZooKeeper 服务器集群列表。启动时,客户端会遍历列表去尝试建立连接。如果失败,它会尝试连接下一个服务器,依次类推。

一旦一台客户端与一台服务器建立连接,这台服务器会为这个客户端创建一个新的会话。每个会话都会有一个超时时间,若服务器在超时时间内没有收到任何请求,则相应会话被视为过期。一旦会话过期,就无法再重新打开,且任何与该会话相关的临时 znode 都会被删除。

通常来说,会话应该长期存在,而这需要由客户端来保证。客户端可以通过心跳方式(ping)来保持会话不过期。

ZooKeeper 的会话具有四个属性:

  • sessionID - 会话 ID,唯一标识一个会话,每次客户端创建新的会话时,Zookeeper 都会为其分配一个全局唯一的 sessionID。
  • TimeOut - 会话超时时间,客户端在构造 Zookeeper 实例时,会配置 sessionTimeout 参数用于指定会话的超时时间,Zookeeper 客户端向服务端发送这个超时时间后,服务端会根据自己的超时时间限制最终确定会话的超时时间。
  • TickTime - 下次会话超时时间点,为了便于 Zookeeper 对会话实行”分桶策略”管理,同时为了高效低耗地实现会话的超时检查与清理,Zookeeper 会为每个会话标记一个下次会话超时时间点,其值大致等于当前时间加上 TimeOut。
  • isClosing - 标记一个会话是否已经被关闭,当服务端检测到会话已经超时失效时,会将该会话的 isClosing 标记为”已关闭”,这样就能确保不再处理来自该会话的心情求了。

Zookeeper 的会话管理主要是通过 SessionTracker 来负责,其采用了分桶策略(将类似的会话放在同一区块中进行管理)进行管理,以便 Zookeeper 对会话进行不同区块的隔离处理以及同一区块的统一处理。

:::

Zab 协议

【中级】什么是 Zab 协议?

:::details 要点

ZooKeeper 并没有直接采用 Paxos 算法,而是采用了名为 ZAB 的一致性协议。**ZAB 协议不是 Paxos 算法**,只是比较类似,二者在操作上并不相同。Multi-Paxos 实现的是一系列值的共识,不关心最终达成共识的值是什么,不关心各值的顺序。而 ZooKeeper 需要确保操作的顺序性。

ZAB 协议是 Zookeeper 专门设计的一种支持故障恢复的原子广播协议。ZAB 协议是 ZooKeeper 的数据一致性和高可用解决方案。

ZAB 协议定义了两个可以无限循环的流程:

  • 选举 Leader - 用于故障恢复,从而保证高可用。
  • 原子广播 - 用于主从同步,从而保证数据一致性。

:::

【高级】Zab 协议中故障恢复的流程是怎样的?

:::details 要点

故障恢复

ZooKeeper 集群采用一主多从模式,主从节点通过副本机制保证数据一致

  • 如果 Follower 节点挂了 - ZooKeeper 集群中的每个节点都会单独在内存中维护自身的状态,并且各节点之间都保持着通讯,只要集群中有半数机器能够正常工作,那么整个集群就可以正常提供服务
  • 如果 Leader 节点挂了 - 如果 Leader 节点挂了,系统就不能正常工作了。此时,需要通过 ZAB 协议的选举 Leader 机制来进行故障恢复。

ZAB 协议的选举 Leader 机制简单来说,就是:基于过半选举机制产生新的 Leader,之后其他机器将从新的 Leader 上同步状态,当有过半机器完成状态同步后,就退出选举 Leader 模式,进入原子广播模式。

术语

  • myid - 每个 Zookeeper 服务器,都需要在数据文件夹下创建一个名为 myid 的文件,该文件包含整个 Zookeeper 集群唯一的 ID(整数)
  • zxid - 类似于 RDBMS 中的事务 ID,用于标识一次更新操作的 Proposal ID。为了保证顺序性,该 zxid 必须单调递增。因此 Zookeeper 使用一个 64 位的数来表示,高 32 位是 Leader 的 epoch,从 1 开始,每次选出新的 Leader,epoch 加一。低 32 位为该 epoch 内的序号,每次 epoch 变化,都将低 32 位的序号重置。这样保证了 zxid 的全局递增性。

服务器状态

  • LOOKING - 不确定 Leader 状态。该状态下的服务器认为当前集群中没有 Leader,会发起 Leader 选举
  • FOLLOWING - 跟随者状态。表明当前服务器角色是 Follower,并且它知道 Leader 是谁
  • LEADING - 领导者状态。表明当前服务器角色是 Leader,它会维护与 Follower 间的心跳
  • OBSERVING - 观察者状态。表明当前服务器角色是 Observer,与 Folower 唯一的不同在于不参与选举,也不参与集群写操作时的投票

选票数据结构

每个服务器在进行领导选举时,会发送如下关键信息

  • logicClock - 每个服务器会维护一个自增的整数,名为 logicClock,它表示这是该服务器发起的第多少轮投票
  • state - 当前服务器的状态
  • self_id - 当前服务器的 myid
  • self_zxid - 当前服务器上所保存的数据的最大 zxid
  • vote_id - 被推举的服务器的 myid
  • vote_zxid - 被推举的服务器上所保存的数据的最大 zxid

投票流程

(1)自增选举轮次 - Zookeeper 规定所有有效的投票都必须在同一轮次中。每个服务器在开始新一轮投票时,会先对自己维护的 logicClock 进行自增操作。

(2)初始化选票 - 每个服务器在广播自己的选票前,会将自己的投票箱清空。该投票箱记录了所收到的选票。例:服务器 2 投票给服务器 3,服务器 3 投票给服务器 1,则服务器 1 的投票箱为 (2, 3), (3, 1), (1, 1)。票箱中只会记录每一投票者的最后一票,如投票者更新自己的选票,则其它服务器收到该新选票后会在自己票箱中更新该服务器的选票。

(3)发送初始化选票 - 每个服务器最开始都是通过广播把票投给自己。

(4)接收外部投票 - 服务器会尝试从其它服务器获取投票,并记入自己的投票箱内。如果无法获取任何外部投票,则会确认自己是否与集群中其它服务器保持着有效连接。如果是,则再次发送自己的投票;如果否,则马上与之建立连接。

(5)判断选举轮次 - 收到外部投票后,首先会根据投票信息中所包含的 logicClock 来进行不同处理

  • 外部投票的 logicClock 大于自己的 logicClock。说明该服务器的选举轮次落后于其它服务器的选举轮次,立即清空自己的投票箱并将自己的 logicClock 更新为收到的 logicClock,然后再对比自己之前的投票与收到的投票以确定是否需要变更自己的投票,最终再次将自己的投票广播出去。
  • 外部投票的 logicClock 小于自己的 logicClock。当前服务器直接忽略该投票,继续处理下一个投票。
  • 外部投票的 logickClock 与自己的相等。当时进行选票 PK。

(6)选票 PK - 选票 PK 是基于(self_id, self_zxid)(vote_id, vote_zxid) 的对比

  • 外部投票的 logicClock 大于自己的 logicClock,则将自己的 logicClock 及自己的选票的 logicClock 变更为收到的 logicClock
  • 若 logicClock 一致,则对比二者的 vote_zxid,若外部投票的 vote_zxid 比较大,则将自己的票中的 vote_zxid 与 vote_myid 更新为收到的票中的 vote_zxid 与 vote_myid 并广播出去,另外将收到的票及自己更新后的票放入自己的票箱。如果票箱内已存在 (self_myid, self_zxid) 相同的选票,则直接覆盖
  • 若二者 vote_zxid 一致,则比较二者的 vote_myid,若外部投票的 vote_myid 比较大,则将自己的票中的 vote_myid 更新为收到的票中的 vote_myid 并广播出去,另外将收到的票及自己更新后的票放入自己的票箱

(7)统计选票 - 如果已经确定有过半服务器认可了自己的投票(可能是更新后的投票),则终止投票。否则继续接收其它服务器的投票。

(8)更新服务器状态 - 投票终止后,服务器开始更新自身状态。若过半的票投给了自己,则将自己的服务器状态更新为 LEADING,否则将自己的状态更新为 FOLLOWING

通过以上流程分析,我们不难看出:要使 Leader 获得多数 Server 的支持,则 ZooKeeper 集群节点数必须是奇数。且存活的节点数目不得少于 N + 1

每个 Server 启动后都会重复以上流程。在恢复模式下,如果是刚从崩溃状态恢复的或者刚启动的 server 还会从磁盘快照中恢复数据和会话信息,zk 会记录事务日志并定期进行快照,方便在恢复时进行状态恢复。

:::

【高级】Zab 协议中原子广播的流程是怎样的?

:::details 要点

ZooKeeper 通过副本机制来实现高可用

那么,ZooKeeper 是如何实现副本机制的呢?答案是:ZAB 协议的原子广播。

ZAB 协议的原子广播要求:

**所有的写请求都会被转发给 Leader,Leader 会以原子广播的方式通知 Follow。当半数以上的 Follow 已经更新状态持久化后,Leader 才会提交这个更新,然后客户端才会收到一个更新成功的响应**。这有些类似数据库中的两阶段提交协议。

在整个消息的广播过程中,Leader 服务器会每个事务请求生成对应的 Proposal,并为其分配一个全局唯一的递增的事务 ID(ZXID),之后再对其进行广播。

ZAB 是通过“一切以领导者为准”的强领导者模型和严格按照顺序提交日志,来实现操作的顺序性的,这一点和 Raft 是一样的。

:::

【中级】Zab 和 Paxos 有什么区别?

:::details 要点

Zab 和 Paxos 协议在实现上其实有非常多的相似点,例如:

  • 主节点会向所有的从节点发出提案;
  • 主节点在接收到一组从节点中一半以上节点的确认后,才会认为当前提案被提交了;
  • Zab 协议中的每一个提案都包含一个 epoch 值,与 Paxos 中的 Ballot 非常相似;

因为它们有一些相同的特点,所以有的观点会认为 Zab 是 Paxos 的一个简化版本,但是 Zab 和 Paxos 在设计理念上就有着比较大的不同,两者的主要区别就在于 Zab 主要是为构建高可用的主备系统设计的,而 Paxos 能够帮助工程师搭建具有一致性的状态机系统。

作为一个一致性状态机系统,它能够保证集群中任意一个状态机副本都按照客户端的请求执行了相同顺序的请求,即使来自客户端请求是异步的并且不同客户端的接收同一个请求的顺序不同,集群中的这些副本就是会使用 Paxos 或者它的变种对提案达成一致;在集群运行的过程中,如果主节点出现了错误导致宕机,其他的节点会重新开始进行选举并处理未提交的请求。

但是在类似 Zookeeper 的高可用主备系统中,所有的副本都需要对增量的状态更新顺序达成一致,这些状态更新的变量都是由主节点创建并发送给其他的从节点的,每一个从节点都会严格按照顺序逐一的执行主节点生成的状态更新请求,如果 Zookeeper 集群中的主节点发生了宕机,新的主节点也必须严格按照顺序对请求进行恢复。

总的来说,使用状态更新节点数据的主备系统相比根据客户端请求改变状态的状态机系统对于请求的执行顺序有着更严格的要求。

这一节对于 Zab 和 Paxos 区别的介绍大都来自于 Zab vs. Paxos ,有兴趣的读者可以阅读相关的内容。

:::

参考资料

ZooKeeper 运维指南

单点服务部署

在安装 ZooKeeper 之前,请确保你的系统是在以下任一操作系统上运行:

  • 任意 Linux OS - 支持开发和部署。适合演示应用程序。
  • Windows OS - 仅支持开发。
  • Mac OS - 仅支持开发。

安装步骤如下:

下载解压

进入官方下载地址:http://zookeeper.apache.org/releases.html#download ,选择合适版本。

解压到本地:

1
2
tar -zxf zookeeper-3.4.6.tar.gz
cd zookeeper-3.4.6

环境变量

执行 vim /etc/profile,添加环境变量:

1
2
export ZOOKEEPER_HOME=/usr/app/zookeeper-3.4.14
export PATH=$ZOOKEEPER_HOME/bin:$PATH

再执行 source /etc/profile , 使得配置的环境变量生效。

修改配置

你必须创建 conf/zoo.cfg 文件,否则启动时会提示你没有此文件。

初次尝试,不妨直接使用 Kafka 提供的模板配置文件 conf/zoo_sample.cfg

1
cp conf/zoo_sample.cfg conf/zoo.cfg

修改后完整配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/log
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

配置参数说明:

  • tickTime:用于计算的基础时间单元。比如 session 超时:N*tickTime;
  • initLimit:用于集群,允许从节点连接并同步到 master 节点的初始化连接时间,以 tickTime 的倍数来表示;
  • syncLimit:用于集群, master 主节点与从节点之间发送消息,请求和应答时间长度(心跳机制);
  • dataDir:数据存储位置;
  • dataLogDir:日志目录;
  • clientPort:用于客户端连接的端口,默认 2181

启动服务

执行以下命令

1
bin/zkServer.sh start

执行此命令后,你将收到以下响应

1
2
3
JMX enabled by default
Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

停止服务

可以使用以下命令停止 zookeeper 服务器。

1
bin/zkServer.sh stop

集群服务部署

分布式系统节点数一般都要求是奇数,且最少为 3 个节点,Zookeeper 也不例外。

这里,规划一个含 3 个节点的最小 ZooKeeper 集群,主机名分别为 hadoop001,hadoop002,hadoop003 。

修改配置

修改配置文件 zoo.cfg,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-cluster/data/
dataLogDir=/usr/local/zookeeper-cluster/log/
clientPort=2181

# server.1 这个1是服务器的标识,可以是任意有效数字,标识这是第几个服务器节点,这个标识要写到dataDir目录下面myid文件里
# 指名集群间通讯端口和选举端口
server.1=hadoop001:2287:3387
server.2=hadoop002:2287:3387
server.3=hadoop003:2287:3387

标识节点

分别在三台主机的 dataDir 目录下新建 myid 文件,并写入对应的节点标识。Zookeeper 集群通过 myid 文件识别集群节点,并通过上文配置的节点通信端口和选举端口来进行节点通信,选举出 Leader 节点。

创建存储目录:

1
2
# 三台主机均执行该命令
mkdir -vp /usr/local/zookeeper-cluster/data/

创建并写入节点标识到 myid 文件:

1
2
3
4
5
6
# hadoop001主机
echo "1" > /usr/local/zookeeper-cluster/data/myid
# hadoop002主机
echo "2" > /usr/local/zookeeper-cluster/data/myid
# hadoop003主机
echo "3" > /usr/local/zookeeper-cluster/data/myid

启动集群

分别在三台主机上,执行如下命令启动服务:

1
/usr/app/zookeeper-cluster/zookeeper/bin/zkServer.sh start

集群验证

启动后使用 zkServer.sh status 查看集群各个节点状态。

参考资料

HBase 命令

进入 HBase Shell 控制台:./bin/hbase shell

如果有 kerberos 认证,需要事先使用相应的 keytab 进行一下认证(使用 kinit 命令),认证成功之后再使用 hbase shell 进入可以使用 whoami 命令可查看当前用户.

基本命令

  • 获取帮助信息:help
  • 获取命令的详细帮助信息:help 'status'
  • 查看服务器状态:status
  • 查看版本信息:version
  • 查看当前登录用户:whoami

DDL

创建表

【语法】create '表名称','列族名称 1','列族名称 2','列名称 N'

【示例】

1
2
# 创建一张名为 test 的表,columnFamliy1、columnFamliy2 是 table1 表的列族。
create 'test','columnFamliy1','columnFamliy2'

启用、禁用表

  • 启用表:enable 'test'
  • 禁用表:disable 'test'
  • 检查表是否被启用:is_enabled 'test'
  • 检查表是否被禁用:is_disabled 'test'

删除表

注意:删除表前需要先禁用表

1
2
disable 'test'
drop 'test'

修改表

添加列族

命令格式: alter ‘表名’, ‘列族名’

1
alter 'test', 'teacherInfo'

删除列族

命令格式:alter ‘表名’, {NAME => ‘列族名’, METHOD => ‘delete’}

1
alter 'test', {NAME => 'teacherInfo', METHOD => 'delete'}

更改列族存储版本的限制

默认情况下,列族只存储一个版本的数据,如果需要存储多个版本的数据,则需要修改列族的属性。修改后可通过 desc 命令查看。

1
alter 'test',{NAME=>'columnFamliy1',VERSIONS=>3}

查看表

  • 查看所有表:list
  • 查看表的详细信息:describe 'test'
  • 检查表是否存在:exists 'test'

增删改

插入数据

命令格式put '表名', '行键','列族:列','值'

注意:如果新增数据的行键值、列族名、列名与原有数据完全相同,则相当于更新操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
put 'test', 'rowkey1', 'columnFamliy1:a', 'valueA'
put 'test', 'rowkey1', 'columnFamliy1:b', 'valueB'
put 'test', 'rowkey1', 'columnFamliy1:c', 'valueC'

put 'test', 'rowkey2', 'columnFamliy1:a', 'valueA'
put 'test', 'rowkey2', 'columnFamliy1:b', 'valueB'
put 'test', 'rowkey2', 'columnFamliy1:c', 'valueC'

put 'test', 'rowkey3', 'columnFamliy1:a', 'valueA'
put 'test', 'rowkey3', 'columnFamliy1:b', 'valueB'
put 'test', 'rowkey3', 'columnFamliy1:c', 'valueC'

put 'test', 'rowkey1', 'columnFamliy2:a', 'valueA'
put 'test', 'rowkey1', 'columnFamliy2:b', 'valueB'
put 'test', 'rowkey1', 'columnFamliy2:c', 'valueC'

获取指定行、列族、列

  • 获取指定行中所有列的数据信息:get 'test','rowkey2'
  • 获取指定行中指定列族下所有列的数据信息:get 'test','rowkey2','columnFamliy1'
  • 获取指定行中指定列的数据信息:get 'test','rowkey2','columnFamliy1:a'

删除指定行、列

  • 删除指定行:delete 'test','rowkey2'
  • 删除指定行中指定列的数据:delete 'test','rowkey2','columnFamliy1:a'

查询

hbase 中访问数据有两种基本的方式:

  • 按指定 rowkey 获取数据:get 方法;
  • 按指定条件获取数据:scan 方法。

scan 可以设置 begin 和 end 参数来访问一个范围内所有的数据。get 本质上就是 begin 和 end 相等的一种特殊的 scan。

get 查询

  • 获取指定行中所有列的数据信息:get 'test','rowkey2'
  • 获取指定行中指定列族下所有列的数据信息:get 'test','rowkey2','columnFamliy1'
  • 获取指定行中指定列的数据信息:get 'test','rowkey2','columnFamliy1:a'

scan 查询

查询整表数据

1
scan 'test'

查询指定列簇的数据

1
scan 'test', {COLUMN=>'columnFamliy1'}

条件查询

1
2
# 查询指定列的数据
scan 'test', {COLUMNS=> 'columnFamliy1:a'}

除了列 (COLUMNS) 修饰词外,HBase 还支持 Limit(限制查询结果行数),STARTROWROWKEY 起始行,会先根据这个 key 定位到 region,再向后扫描)、STOPROW(结束行)、TIMERANGE(限定时间戳范围)、VERSIONS(版本数)、和 FILTER(按条件过滤行)等。

如下代表从 rowkey2 这个 rowkey 开始,查找下两个行的最新 3 个版本的 name 列的数据:

1
scan 'test', {COLUMNS=> 'columnFamliy1:a',STARTROW => 'rowkey2',STOPROW => 'rowkey3',LIMIT=>2, VERSIONS=>3}

条件过滤

Filter 可以设定一系列条件来进行过滤。如我们要查询值等于 24 的所有数据:

1
scan 'test', FILTER=>"ValueFilter(=,'binary:24')"

值包含 valueA 的所有数据:

1
scan 'test', FILTER=>"ValueFilter(=,'substring:valueA')"

列名中的前缀为 b 的:

1
scan 'test', FILTER=>"ColumnPrefixFilter('b')"

FILTER 中支持多个过滤条件通过括号、AND 和 OR 进行组合:

1
2
# 列名中的前缀为 b 且列值中包含1998的数据
scan 'test', FILTER=>"ColumnPrefixFilter('b') AND ValueFilter ValueFilter(=,'substring:A')"

PrefixFilter 用于对 Rowkey 的前缀进行判断:

1
scan 'test', FILTER=>"PrefixFilter('wr')"

参考资料

Zipkin 快速入门

Zipkin 是一个基于 Java 开发的、开源的、分布式实时数据跟踪系统(Distributed Tracking System)。它采集有助于解决服务架构中延迟问题的实时数据。

Zipkin 主要功能是聚集来自各个异构系统的实时监控数据。分布式跟踪系统还有其他比较成熟的实现,例如:Naver 的 Pinpoint、Apache 的 HTrace、阿里的鹰眼 Tracing、京东的 Hydra、新浪的 Watchman,美团点评的 CAT,skywalking 等。

Zipkin 基于 Google Dapper 的论文设计而来,由 Twitter 公司开发贡献。

一、Zipkin 简介

特性

如果日志文件中有跟踪 ID,则可以直接跳至该跟踪 ID。 否则,您可以基于属性进行查询,例如服务,操作名称,标签和持续时间。 将为您总结一些有趣的数据,例如在服务中花费的时间百分比以及操作是否失败。

Zipkin UI 还提供了一个依赖关系图,该关系图显示了每个应用程序中跟踪了多少个请求。这对于识别聚合行为(包括错误路径或对不赞成使用的服务的调用)很有帮助。

Zipkin UI

多平台

Zipkin 官方支持 C#、Go、Java、JavaScript、Ruby、Scala、PHP 语言。

除此以外,社区还贡献了多种其他语言的支持,详情可以参考官方文档:Tracers and Instrumentation

数据

Zipkin 服务器捆绑了用于采集和存储数据的扩展。

默认情况下,数据可以通过 HttpKafkaRabbitMQ 或 RPC 传输。

并存储在内存中或 MySQLCassandraElasticsearch 中。

数据以 json 形式存储,可以参考:Zipkin 官方的 Swagger API

Zipkin Swagger API

二、Zipkin 安装

Docker

Docker 启动方式:

1
docker run -d -p 9411:9411 openzipkin/zipkin

Java

注意:必须运行在 JDK8+ 环境

Java 启动方式:

1
2
curl -sSL https://zipkin.io/quickstart.sh | bash -s
java -jar zipkin.jar

编译方式

适用于需要订制化的场景。

1
2
3
4
5
6
7
# get the latest source
git clone https://github.com/openzipkin/zipkin
cd zipkin
# Build the server and also make its dependencies
./mvnw -DskipTests --also-make -pl zipkin-server clean install
# Run the server
java -jar ./zipkin-server/target/zipkin-server-*exec.jar

三、Zipkin 架构

ZipKin 可以分为两部分,

  • 一部分是 Zipkin server,用来作为数据的采集存储、数据分析与展示;
  • 另一部分是 Zipkin client 是 Zipkin 基于不同的语言及框架封装的一些列客户端工具,这些工具完成了追踪数据的生成与上报功能。

架构如下:

Zipkin 架构

Zipkin Server

Zipkin Server 主要包括四个模块:

  • Collector - 负责采集客户端传输的数据。
  • Storage - 负责存储采集的数据。当前支持 Memory,MySQL,Cassandra,ElasticSearch 等,默认存储在内存中。
  • API(Query) - 负责查询 Storage 中存储的数据。提供简单的 JSON API 获取数据,主要提供给 web UI 使用。
  • UI - 提供简单的 web 界面。

Instrumented Client 和 Instrumented Server,是指分布式架构中使用了 Trace 工具的两个应用,Client 会调用 Server 提供的服务,两者都会向 Zipkin 上报 Trace 相关信息。在 Client 和 Server 通过 Transport 上报 Trace 信息后,由 Zipkin 的 Collector 模块接收,并由 Storage 模块将数据存储在对应的存储介质中,然后 Zipkin 提供 API 供 UI 界面查询 Trace 跟踪信息。Non-Instrumented Server,指的是未使用 Trace 工具的 Server,显然它不会上报 Trace 信息。

Zipkin Client

  • Tracer - Tracer 存在于你的应用中,它负责采集关于已发生操作的实时元数据。它们通常会检测库,因此对于用户是透明的。例如,已检测的 Web 服务器记录它何时接收到请求,以及何时发送响应。收集的跟踪数据称为跨度(Span)。
  • Instrumentation - Instrumentation 保证了生产环境的安全性和很少的开销。因此,它们仅在内部传播 ID,以告知接收方正在进行追踪。完成的 Span 将通过外部通信告知 Zipkin,类似于应用程序异步报告指标的方式。例如,当跟踪某个操作并且需要发出 http 请求时,会添加一些 header 来传播 ID。header 不用于发送详细信息,例如操作名称。
  • Reporter - 能够将数据发送到 Zipkin 的检测应用程序中的组件,被称为 Reporter。Reporter 有多种传输方式,可以将跟踪数据发送到 Zipkin 采集器,后者将跟踪数据持久化保存到存储中。稍后,API 会查询存储以向 UI 提供渲染数据。

以下是 Zipkin 的一个示例工作流:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
┌─────────────┐ ┌───────────────────────┐  ┌─────────────┐  ┌──────────────────┐
│ User Code │ │ Trace Instrumentation │ │ Http Client │ │ Zipkin Collector │
└─────────────┘ └───────────────────────┘ └─────────────┘ └──────────────────┘
│ │ │ │
┌─────────┐
│ ──┤GET /foo ├─▶ │ ────┐ │ │
└─────────┘ │ record tags
│ │ ◀───┘ │ │
────┐
│ │ │ add trace headers │ │
◀───┘
│ │ ────┐ │ │
│ record timestamp
│ │ ◀───┘ │ │
┌─────────────────┐
│ │ ──┤GET /foo ├─▶ │ │
│X-B3-TraceId: aa │ ────┐
│ │ │X-B3-SpanId: 6b │ │ │ │
└─────────────────┘ │ invoke
│ │ │ │ request │

│ │ │ │ │
┌────────┐ ◀───┘
│ │ ◀─────┤200 OK ├─────── │ │
────┐ └────────┘
│ │ │ record duration │ │
┌────────┐ ◀───┘
│ ◀──┤200 OK ├── │ │ │
└────────┘ ┌────────────────────────────────┐
│ │ ──┤ asynchronously report span ├────▶ │
│ │
│{ │
│ "traceId": "aa", │
│ "id": "6b", │
│ "name": "get", │
│ "timestamp": 1483945573944000,│
│ "duration": 386000, │
│ "annotations": [ │
│--snip-- │
└────────────────────────────────┘

Instrumented client 和 server 是分别使用了 ZipKin Client 的服务,Zipkin Client 会根据配置将追踪数据发送到 Zipkin Server 中进行数据存储、分析和展示。

四、Zipkin 客户端

Brave 是 Java 版的 zipkin 客户端。

一般不会手动编写 Trace 相关的代码,Brave 提供可一些开箱即用的库,帮助我们追踪一些特定的请求。比如:dubbo、grpc、servlet、mysql、httpClient、kafka、springMVC 等。

参考资料