Dunwu Blog

大道至简,知易行难

分布式 ID 基本原理

传统数据库软件开发中,主键自动生成技术是基本需求。而各个数据库对于该需求也提供了相应的支持,比如 MySQL 的自增键,Oracle 的自增序列等。

数据分片后,不同数据节点生成全局唯一主键是非常棘手的问题。同一个逻辑表内的不同实际表之间的自增键由于无法互相感知而产生重复主键。 虽然可通过约束自增主键初始值和步长的方式避免碰撞,但需引入额外的运维规则,使解决方案缺乏完整性和可扩展性。

为此,需要使用分布式 ID 来解决此问题。本文总结业界常用的分布式 ID 解决方案。

1. 分布式 ID 简介

首先,分布式 ID 应该具备哪些特性呢?

  1. 全局唯一性 - 不能出现重复的 ID 号,既然是唯一标识,这是最基本的要求。
  2. 趋势递增 - 在 MySQL InnoDB 引擎中使用的是聚集索引,由于多数 RDBMS 使用 B-tree 的数据结构来存储索引数据,在主键的选择上面我们应该尽量使用有序的主键保证写入性能。
  3. 单调递增 - 保证下一个 ID 一定大于上一个 ID,例如事务版本号、IM 增量消息、排序等特殊需求。
  4. 信息安全 - 如果 ID 是连续的,恶意用户的扒取工作就非常容易做了,直接按照顺序下载指定 URL 即可;如果是订单号就更危险了,竞对可以直接知道我们一天的单量。所以在一些应用场景下,会需要 ID 无规则、不规则。

2. UUID

UUID 是最简单的分布式 ID 方案。

UUID 是通用唯一识别码(Universally Unique Identifier)的缩写,开放软件基金会(OSF)规范定义了包括网卡 MAC 地址、时间戳、名字空间(Namespace)、随机或伪随机数、时序等元素。利用这些元素来生成 UUID。

UUID 是由 128 位二进制组成,一般转换成十六进制,然后用 String 表示。在 java 中有个 UUID 类,在他的注释中我们看见这里有 4 种不同的 UUID 的生成策略:

  • random - 基于随机数生成 UUID,由于 Java 中的随机数是伪随机数,其重复的概率是可以被计算出来的。这个一般我们用下面的代码获取基于随机数的 UUID:
1
String id = UUID.randomUUID().toString();
  • time-based - 基于时间的 UUID,这个一般是通过当前时间,随机数,和本地 Mac 地址来计算出来,自带的 JDK 包并没有这个算法的我们在一些 UUIDUtil 中,比如我们的 log4j.core.util,会重新定义 UUID 的高位和低位。
1
2
3
4
5
6
7
8
public static UUID getTimeBasedUuid() {
long time = System.currentTimeMillis() * 10000L + 122192928000000000L + (long)(COUNT.incrementAndGet() % 10000);
long timeLow = (time & 4294967295L) << 32;
long timeMid = (time & 281470681743360L) >> 16;
long timeHi = (time & 1152640029630136320L) >> 48;
long most = timeLow | timeMid | 4096L | timeHi;
return new UUID(most, LEAST);
}
  • DCE security - DCE 安全的 UUID。

  • name-based - 基于名字的 UUID,通过计算名字和名字空间的 MD5 来计算 UUID。

2.1. UUID 的优点

  • 通过本地生成,没有经过网络 I/O,性能较快。

2.2. UUID 的缺点

  • 长度过长 - UUID 太长,16 字节 128 位,通常以 36 长度的字符串表示,很多场景不适用。例如:Mysql 官方明确建议主键越短越好,36 个字符长度的 UUID 不符合要求。
  • 信息不安全 - 基于 MAC 地址生成 UUID 的算法可能会造成 MAC 地址泄露,这个漏洞曾被用于寻找梅丽莎病毒的制作者位置。
  • 无序性 - 不能生成递增有序的数字。这对于一些特定场景不利。例如:MySQL InnoDB 存储引擎使用 B+ 树存储索引数据,而主键也是一种索引。索引数据在 B+ 树中是有序排列的。UUID 的无序性可能会引起数据位置频繁变动,严重影响性能。

2.3. 适用场景

UUID 的适用场景可以为不需要担心过多的空间占用,以及不需要生成有递增趋势的数字。在 Log4j 里 UuidPatternConverter 中加入了 UUID 来标识每一条日志。

3. 利用第三方存储生成键

提到自增键,最先想到的肯定是直接使用数据库自增键。_各数据库对于该需求也提供了相应的支持,比如 MySQL 的自增键,Oracle 的自增序列等_。

当然,也可以考虑是用 Redis 这样的 Nosql,甚至 ZooKeeper 去生成键

3.1. 优点

  • 非常简单,利用现有的功能实现,成本小
  • 有序递增
  • 方便排序和分页

3.2. 缺点

  • 强依赖第三方存储,如果第三方存储非高可用系统,若出现丢失数据的情况,就可能出现重复生成 ID 的问题。
  • 生成 ID 性能瓶颈依赖于第三方存储的性能。
  • 增加了对第三方存储运维的成本。

4. 雪花算法(Snowflake)

雪花算法(Snowflake)是由 Twitter 公布的分布式主键生成算法,它会生成一个 64 bit 的整数,可以保证不同进程主键的不重复性,以及相同进程主键的有序性。

在同一个进程中,它首先是通过时间位保证不重复,如果时间相同则是通过序列位保证。 同时由于时间位是单调递增的,且各个服务器如果大体做了时间同步,那么生成的主键在分布式环境可以认为是总体有序的,这就保证了对索引字段的插入的高效性。

4.1. 基本原理

4.1.1. 键的组成

使用雪花算法生成的主键,二进制表示形式包含 4 部分,从高位到低位分表为:1bit 符号位、41bit 时间戳位、10bit 工作进程位以及 12bit 序列号位。

  • 符号位(1bit)

预留的符号位,恒为零。

  • 时间戳位(41bit)

41 位的时间戳可以容纳的毫秒数是 2 的 41 次幂,一年所使用的毫秒数是:365 * 24 * 60 * 60 * 1000。通过计算可知:

1
Math.pow(2, 41) / (365 * 24 * 60 * 60 * 1000L);

结果约等于 69.73 年。ShardingSphere 的雪花算法的时间纪元从 2016 年 11 月 1 日零点开始,可以使用到 2086 年,相信能满足绝大部分系统的要求。

  • 工作进程位(10bit)

该标志在 Java 进程内是唯一的,如果是分布式应用部署应保证每个工作进程的 id 是不同的。该值默认为 0,可通过属性设置。

  • 序列号位(12bit)

该序列是用来在同一个毫秒内生成不同的 ID。如果在这个毫秒内生成的数量超过 4096(2 的 12 次幂),那么生成器会等待到下个毫秒继续生成。

雪花算法主键的详细结构见下图:

雪花算法

4.1.2. 时钟回拨

服务器时钟回拨会导致产生重复序列,因此默认分布式主键生成器提供了一个最大容忍的时钟回拨毫秒数。 如果时钟回拨的时间超过最大容忍的毫秒数阈值,则程序报错;如果在可容忍的范围内,默认分布式主键生成器会等待时钟同步到最后一次主键生成的时间后再继续工作。 最大容忍的时钟回拨毫秒数的默认值为 0,可通过属性设置。

4.1.3. 灵活定制

上面只是一个将 64bit 划分的标准,当然也不一定这么做,可以根据不同业务的具体场景来划分,比如下面给出一个业务场景:

  • 服务目前 QPS10 万,预计几年之内会发展到百万。
  • 当前机器三地部署,上海,北京,深圳都有。
  • 当前机器 10 台左右,预计未来会增加至百台。

这个时候我们根据上面的场景可以再次合理的划分 62bit,QPS 几年之内会发展到百万,那么每毫秒就是千级的请求,目前 10 台机器那么每台机器承担百级的请求,为了保证扩展,后面的循环位可以限制到 1024,也就是 2^10,那么循环位 10 位就足够了。

机器三地部署我们可以用 3bit 总共 8 来表示机房位置,当前的机器 10 台,为了保证扩展到百台那么可以用 7bit 128 来表示,时间位依然是 41bit,那么还剩下 64-10-3-7-41-1 = 2bit,还剩下 2bit 可以用来进行扩展。

img

4.2. 优点

  • 生成的 ID 都是趋势递增的。
  • 不依赖数据库等第三方系统,以服务的方式部署,稳定性更高,生成 ID 的性能也是非常高的。
  • 可以根据自身业务特性分配 bit 位,非常灵活。

4.3. 缺点

  • 强依赖机器时钟,如果机器上时钟回拨,会导致发号重复或者服务会处于不可用状态。

4.4. 适用场景

当我们需要无序不能被猜测的 ID,并且需要一定高性能,且需要 long 型,那么就可以使用我们雪花算法。比如常见的订单 ID,用雪花算法别人就无法猜测你每天的订单量是多少。

4.5. 防止时钟回拨

雪花算法是强依赖于时间的,而如果机器时间发生回拨,有可能会生成重复的 ID。

我们可以针对算法做一些优化,来防止时钟回拨生成重复 ID。

用当前时间和上一次的时间进行判断,如果当前时间小于上一次的时间那么肯定是发生了回拨。普通的算法会直接抛出异常,这里我们可以对其进行优化,一般分为两个情况:

  • 如果时间回拨时间较短,比如配置 5ms 以内,那么可以直接等待一定的时间,让机器的时间追上来。
  • 如果时间的回拨时间较长,我们不能接受这么长的阻塞等待,那么又有两个策略:
    • 直接拒绝,抛出异常。打日志,通知 RD 时钟回滚。
    • 利用扩展位。上面我们讨论过,不同业务场景位数可能用不到那么多比特位,那么我们可以把扩展位数利用起来。比如:当这个时间回拨比较长的时候,我们可以不需要等待,直接在扩展位加 1。两位的扩展位允许我们有三次大的时钟回拨,一般来说就够了,如果其超过三次我们还是选择抛出异常,打日志。

5. Leaf

美团提供了一种分布式 ID 解决方案 Leaf,其本质可以视为数据库分段+服务缓存 ID。

详情可以参考 Leaf——美团点评分布式 ID 生成系统

5.1. 基本原理

使用数据库生成 ID,但是做了如下改进:

原方案每次获取 ID 都得读写一次数据库,造成数据库压力大。改为利用 proxy server 批量获取,每次获取一个 segment(step 决定大小)号段的值。用完之后再去数据库获取新的号段,可以大大的减轻数据库的压力。 - 各个业务不同的发号需求用 biz_tag 字段来区分,每个 biz-tag 的 ID 获取相互隔离,互不影响。如果以后有性能需求需要对数据库扩容,不需要上述描述的复杂的扩容操作,只需要对 biz_tag 分库分表就行。

数据库表设计如下:

1
2
3
4
5
6
7
8
9
+-------------+--------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+--------------+------+-----+-------------------+-----------------------------+
| biz_tag | varchar(128) | NO | PRI | | |
| max_id | bigint(20) | NO | | 1 | |
| step | int(11) | NO | | NULL | |
| desc | varchar(256) | YES | | NULL | |
| update_time | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+-------------+--------------+------+-----+-------------------+-----------------------------+

重要字段说明:

  • biz_tag 用来区分业务
  • max_id 表示该 biz_tag 目前所被分配的 ID 号段的最大值
  • step 表示每次分配的号段长度。原来获取 ID 每次都需要写数据库,现在只需要把 step 设置得足够大,比如 1000。那么只有当 1000 个号被消耗完了之后才会去重新读写一次数据库。读写数据库的频率从 1 减小到了 1/step。

大致架构如下图所示:

image

test_tag 在第一台 Leaf 机器上是 1~1000 的号段,当这个号段用完时,会去加载另一个长度为 step=1000 的号段,假设另外两台号段都没有更新,这个时候第一台机器新加载的号段就应该是 3001~4000。同时数据库对应的 biz_tag 这条数据的 max_id 会从 3000 被更新成 4000,更新号段的 SQL 语句如下:

1
2
3
4
Begin
UPDATE table SET max_id=max_id+step WHERE biz_tag=xxx
SELECT tag, max_id, step FROM table WHERE biz_tag=xxx
Commit

5.2. 优点

  • 比数据库自增键性能高
  • 能保证键趋势递增。
  • 如果数据库宕机,由于 proxServer 有缓存,依然可以坚持一段时间。

5.3. 缺点

  • 和主键递增一样,容易被人猜测。
  • 数据库宕机后,虽然能支撑一段时间,但是仍然会造成系统不可用。

5.4. 适用场景

需要趋势递增,并且 ID 大小可控制的,可以使用这套方案。

当然这个方案也可以通过一些手段避免被人猜测,把 ID 变成是无序的,比如把我们生成的数据是一个递增的 long 型,把这个 Long 分成几个部分,比如可以分成几组三位数,几组四位数,然后在建立一个映射表,将我们的数据变成无序。

6. 参考资料

oh-my-zsh 应用

Zsh 简介

Zsh 是什么

使用 Linux 的人都知道:*Shell 是一个用 C 语言编写的程序,它是用户使用 Linux 的桥梁。_Shell 既是一种命令语言,又是一种程序设计语言**。

Shell 的类型有很多种,linux 下默认的是 bash,虽然 bash 的功能已经很强大,但对于以懒惰为美德的程序员来说,bash 的提示功能不够强大,界面也不够炫,并非理想工具。

Zsh 也是一种 Shell(据传说 99% 的 Bash 操作 和 Zsh 是相同的),它的功能极其强大,只是配置过于复杂,起初只有极客才在用。后来,出现了一个名叫 oh-my-zsh 的开源项目,使用 zsh 就变得十分简易了。

Zsh 安装

环境要求

  • CentOS 6.7 64 bit
  • root 用户

安装 zsh

  • 先看下你的 CentOS 支持哪些 shell:cat /etc/shells,正常结果应该是这样的:
1
2
3
4
5
6
/bin/sh
/bin/bash
/sbin/nologin
/bin/dash
/bin/tcsh
/bin/csh

如果已经有 zsh ,那么我们就不必安装了。

  • CentOS 安装:sudo yum install -y zsh
  • Ubuntu 安装:sudo apt-get install -y zsh
  • 检查系统的 shell:cat /etc/shells,你会发现多了一个:/bin/zsh

安装 oh-my-zsh

使用 Zsh,怎么能离开灵魂伴侣 oh-my-zsh

1
2
# 安装 oh-my-zsh
wget https://github.com/robbyrussell/oh-my-zsh/raw/master/tools/install.sh -O - | sh

配置 oh-my-zsh

插件

oh-my-zsh 插件太多,不一一列举,请参考:oh-my-zsh 插件列表

  • 启用 oh-my-zsh 中自带的插件。
  • 查看 oh-my-zsh 插件数:ls -l /root/.oh-my-zsh/plugins |grep "^d"|wc -l
  • 编辑配置文件:vim /root/.zshrc
  • 插件推荐:
    • zsh-autosuggestions
      • 这个插件会对历史命令一些补全,类似 fish 终端
      • 安装,复制该命令:git clone https://github.com/zsh-users/zsh-autosuggestions ${ZSH_CUSTOM:-\~/.oh-my-zsh/custom}/plugins/zsh-autosuggestions - 编辑:vim \~/.zshrc,找到这一行,后括号里面的后面添加:plugins=( 前面的一些插件名称,换行,加上:zsh-autosuggestions) - 刷新下配置:source \~/.zshrc
    • extract
      • 功能强大的解压插件,所有类型的文件解压一个命令 x 全搞定,再也不需要去记 tar 后面到底是哪几个参数了。
    • z
      • 强大的目录自动跳转命令,会记忆你曾经进入过的目录,用模糊匹配快速进入你想要的目录。
    • zsh-syntax-highlighting
      • 这个插件会对终端命令高亮显示,比如正确的拼写会是绿色标识,否则是红色,另外对于一些 shell 输出语句也会有高亮显示,算是不错的辅助插件
      • 安装,复制该命令:git clone https://github.com/zsh-users/zsh-syntax-highlighting.git ${ZSH_CUSTOM:-\~/.oh-my-zsh/custom}/plugins/zsh-syntax-highlighting
      • 编辑:vim \~/.zshrc,找到这一行,后括号里面的后面添加:plugins=( 前面的一些插件名称,换行,加上:zsh-syntax-highlighting) - 刷新下配置:source \~/.zshrc
    • wd
      • 简单地讲就是给指定目录映射一个全局的名字,以后方便直接跳转到这个目录,比如:
      • 编辑配置文件,添加上 wd 的名字:vim /root/.zshrc
      • 我常去目录:**/opt/setups**,每次进入该目录下都需要这样:cd /opt/setups
      • 现在用 wd 给他映射一个快捷方式:cd /opt/setups ; wd add setups
      • 以后我在任何目录下只要运行:wd setups 就自动跑到 /opt/setups 目录下了
    • autojump
      • 这个插件会记录你常去的那些目录,然后做一下权重记录,你可以用这个命令看到你的习惯:j --stat,如果这个里面有你的记录,那你就只要敲最后一个文件夹名字即可进入,比如我个人习惯的 program:j program,就可以直接到:/usr/program
      • 插件下载:wget https://github.com/downloads/wting/autojump/autojump_v21.1.2.tar.gz
      • 解压:tar zxvf autojump_v21.1.2.tar.gz
      • 进入解压后目录并安装:cd autojump_v21.1.2/ ; ./install.sh
      • 再执行下这个:source /etc/profile.d/autojump.sh
      • 编辑配置文件,添加上 autojump 的名字:vim /root/.zshrc

主题

oh-my-zsh 主题太多,不一一列举,请参考:oh-my-zsh 主题列表

  • 查看 oh-my-zsh 主题数:ls -l /root/.oh-my-zsh/themes |grep "^-"|wc -l
  • 个人比较推荐的是(排名有先后):
    • ys
    • agnoster
    • avit
    • blinks
  • 编辑配置文件:vim /root/.zshrc
  • 配置好新主题需要重新连接 shell 才能看到效果

zsh 效果如下:

img

快捷键

  • 呃,这个其实可以不用讲的,你自己用的时候你自己会发现的,各种便捷,特别是用 Tab 多的人一定会有各种惊喜的。
  • 使用 ctrl-r 来搜索命令历史记录。按完此快捷键后,可以输入关键命令词语,如果历史记录有含有此词语会显示出来。
  • 命令别名: - 在命令行中输入 alias 可以查看已经有的命令别名 - 自己新增一些别名,编辑文件:vim \~/.zshrc,在文件加入下面格式的命令,比如以下是网友提供的一些思路:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
alias cls='clear'
alias ll='ls -l'
alias la='ls -a'
alias grep="grep --color=auto"
alias -s html='vim' # 在命令行直接输入后缀为 html 的文件名,会在 Vim 中打开
alias -s rb='vim' # 在命令行直接输入 ruby 文件,会在 Vim 中打开
alias -s py='vim' # 在命令行直接输入 python 文件,会用 vim 中打开,以下类似
alias -s js='vim'
alias -s c='vim'
alias -s java='vim'
alias -s txt='vim'
alias -s gz='tar -xzvf' # 在命令行直接输入后缀为 gz 的文件名,会自动解压打开
alias -s tgz='tar -xzvf'
alias -s zip='unzip'
alias -s bz2='tar -xjvf'

参考资料

消息队列基本原理

消息队列(Message Queue,简称 MQ)技术是应用间交换信息的一种技术。

消息队列主要解决异步处理、应用间耦合,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

目前主流的 MQ 有:Kafka、RabbitMQ、RocketMQ、ActiveMQ,而部分数据库如 Redis、MySQL 以及 phxsql 也可实现消息队列的功能。

注意:_为了简便,下文中除了文章标题,一律使用 MQ 简称_。

MQ 的简介

什么是 MQ

消息队列(Message Queue,简称 MQ)技术是应用间交换信息的一种技术。

消息队列主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

MQ 是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

MQ 的数据可驻留在内存或磁盘上,直到它们被应用程序读取。通过 MQ,应用程序可独立地执行,它们不需要知道彼此的位置,不需要等待接收程序接收此消息。在分布式计算环境中,为了集成分布式应用,开发者需要对异构网络环境下的分布式应用提供有效的通信手段。为了管理需要共享的信息,对应用提供公共的信息交换机制是重要的。

目前主流的 MQ 有:Kafka、RabbitMQ、RocketMQ、ActiveMQ。

MQ 通信模型

MQ 通信模型大致有以下类型:

  • 点对点 - 点对点方式是最为传统和常见的通讯方式,它支持一对一、一对多、多对多、多对一等多种配置方式,支持树状、网状等多种拓扑结构。
  • 多点广播 - MQ 适用于不同类型的应用。其中重要的,也是正在发展中的是”多点广播”应用,即能够将消息发送到多个目标站点 (Destination List)。可以使用一条 MQ 指令将单一消息发送到多个目标站点,并确保为每一站点可靠地提供信息。MQ 不仅提供了多点广播的功能,而且还拥有智能消息分发功能,在将一条消息发送到同一系统上的多个用户时,MQ 将消息的一个复制版本和该系统上接收者的名单发送到目标 MQ 系统。目标 MQ 系统在本地复制这些消息,并将它们发送到名单上的队列,从而尽可能减少网络的传输量。
  • 发布/订阅 (Publish/Subscribe) - 发布/订阅模式使消息的分发可以突破目的队列地理位置的限制,使消息按照特定的主题甚至内容进行分发,用户或应用程序可以根据主题或内容接收到所需要的消息。发布/订阅模式使得发送者和接收者之间的耦合关系变得更为松散,发送者不必关心接收者的目的地址,而接收者也不必关心消息的发送地址,而只是根据消息的主题进行消息的收发。
  • 集群 (Cluster) - 为了简化点对点通讯模式中的系统配置,MQ 提供 Cluster(集群) 的解决方案。集群类似于一个域 (Domain),集群内部的队列管理器之间通讯时,不需要两两之间建立消息通道,而是采用集群 (Cluster) 通道与其它成员通讯,从而大大简化了系统配置。此外,集群中的队列管理器之间能够自动进行负载均衡,当某一队列管理器出现故障时,其它队列管理器可以接管它的工作,从而大大提高系统的高可靠性。

MQ 的应用

异步处理

MQ 可以将系统间的处理流程异步化,减少等待响应的时间,从而提高整体并发吞吐量。

一般,MQ 异步处理应用于非核心流程,例如:短信/邮件通知、数据推送、上报数据到监控中心/日志中心等。

假设这样一个场景,用户向系统 A 发起请求,系统 A 处理计算只需要 10 ms,然后通知系统 BCD 写库,系统 BCD 写库耗时分别为:100ms、200ms、300ms。最终总耗时为: 10+100ms+200ms+300ms=610ms。此外,加上请求和响应的网络传输时间,从用户角度看,可能要等待将近 1s 才能得到结果。

img

如果使用 MQ,系统 A 接到请求后,耗时 10ms 处理计算,然后向系统 BCD 连续发送消息,假设耗时 5ms。那么 这一过程的总耗时为 3ms + 5ms = 8ms,这相比于 610 ms,大大缩短了响应时间。至于系统 BCD 的写库操作,只要自行消费 MQ 后处理即可,用户无需关注。

img

系统解耦

通过 MQ,可以消除系统间的强耦合。它的好处在于:

  • 消息的消费者系统可以随意增加,无需修改生产者系统的代码。
  • 生产者系统、消费者系统彼此不会影响对方的流程。
    • 如果生产者系统宕机,消费者系统收不到消息,就不会有下一步的动作。
    • 如果消费者系统宕机,生产者系统让然可以正常发送消息,不影响流程。

不同系统如果要建立通信,传统的做法是:调用接口。

如果需要和新的系统建立通信或删除已建立的通信,都需要修改代码,这种方案显然耦合度很高。

img

如果使用 MQ,系统间的通信只需要通过发布/订阅(Pub/Sub)模型即可,彼此没有直接联系,也就不需要相互感知,从而达到 解耦

img

流量削峰

上下游系统 处理能力存在差距的时候,利用 MQ 做一个 “漏斗” 模型,进行 流控。把 MQ 当成可靠的 消息暂存地,进行一定程度的 消息堆积;在下游有能力处理的时候,再发送消息。

MQ 的流量削峰常用于高并发场景(例如:秒杀、团抢等业务场景),它是缓解瞬时暴增流量的核心手段之一。

如果没有 MQ,两个系统之间通过 协商滑动窗口限流/降级/熔断 等复杂的方案也能实现 流控。但 系统复杂性 指数级增长,势必在上游或者下游做存储,并且要处理 定时拥塞 等一系列问题。而且每当有 处理能力有差距 的时候,都需要 单独 开发一套逻辑来维护这套逻辑。

假设某个系统读写数据库的稳定性能为每秒处理 1000 条数据。平常情况下,远远达不到这么大的处理量。假设,因为因为做活动,系统的瞬时请求量剧增,达到每秒 10000 个并发请求,数据库根本承受不了,可能直接就把数据库给整崩溃了,这样系统服务就不可用了。

img

如果使用 MQ,每秒写入 10000 条请求,但是系统 A 每秒只从 MQ 中消费 1000 条请求,然后写入数据库。这样,就不会超过数据库的承受能力,而是把请求积压在 MQ 中。只要高峰期一过,系统 A 就会很快把积压的消息给处理掉。

img

传输缓冲

(1)MQ 常被用于做海量数据的传输缓冲。

例如,Kafka 常被用于做为各种日志数据、采集数据的数据中转。然后,Kafka 将数据转发给 Logstash、Elasticsearch 中,然后基于 Elasticsearch 来做日志中心,提供检索、聚合、分析日志的能力。开发者可以通过 Kibana 集成 Elasticsearch 数据进行可视化展示,或自行进行定制化开发。

img

(2)MQ 也可以被用于流式处理。

例如,Kafka 几乎已经是流计算的数据采集端的标准组件。而流计算通过实时数据处理能力,提供了更为快捷的聚合计算能力,被大量应用于链路监控、实时监控、实时数仓、实时大屏、风控、推荐等应用领域。

最终一致性

最终一致性 不是 消息队列 的必备特性,但确实可以依靠 消息队列 来做 最终一致性 的事情。

  • 先写消息再操作,确保操作完成后再修改消息状态。定时任务补偿机制 实现消息 可靠发送接收、业务操作的可靠执行,要注意 消息重复幂等设计
  • 所有不保证 100% 不丢消息 的消息队列,理论上无法实现 最终一致性

Kafka 一类的设计,在设计层面上就有 丢消息 的可能(比如 定时刷盘,如果掉电就会丢消息)。哪怕只丢千分之一的消息,业务也必须用其他的手段来保证结果正确。

系统间通信

消息队列一般都内置了 高效的通信机制,因此也可以用于单纯的 消息通讯,比如实现 点对点消息队列 或者 聊天室 等。

生产者/消费者 模式,只需要关心消息是否 送达队列,至于谁希望订阅和需要消费,是 下游 的事情,无疑极大地减少了开发和联调的工作量。

MQ 的问题

任何技术都会有利有弊,MQ 给整体系统架构带来很多好处,但也会付出一定的代价。

MQ 主要引入了以下问题:

  • 系统可用性降低:引入了 MQ 后,通信需要基于 MQ 完成,如果 MQ 宕机,则服务不可用。因此,MQ 要保证是高可用的,详情参考:MQ 的高可用
  • 系统复杂度提高:使用 MQ,需要关注一些新的问题:
    • 如何保证消息没有 重复消费
    • 如何处理 消息丢失 的问题?
    • 如何保证传递 消息的顺序性
    • 如何处理大量 消息积压 的问题?
  • 一致性问题:假设系统 A 处理完直接返回成功的结果给用户,用户认为请求成功。但如果此时,系统 BCD 中只要有任意一个写库失败,那么数据就不一致了。这种情况如何处理?

下面,我们针对以上问题来一一分析。

重复消费

如何保证消息不被重复消费如何保证消息消费的幂等性 是同一个问题。

必须先明确产生重复消费的原因,才能对症下药。

重复消费问题原因

重复消费问题通常不是 MQ 来处理,而是由开发来处理的。

以 Kafka 举例,Kafka 每个 Partition 都是一个有序的、不可变的记录序列,不断追加到结构化的提交日志中。Partition 中为每条记录分配一个连续的 id 号,称为偏移量(Offset),用于唯一标识 Partition 内的记录。

Kafka 的客户端和 Broker 都会保存 Offset。客户端消费消息后,每隔一段时间,就把已消费的 Offset 提交给 Kafka Broker,表示已消费。

img

在这个过程中,如果客户端应用消费消息后,因为宕机、重启等情况而没有提交已消费的 Offset 。当系统恢复后,会继续消费消息,由于 Offset 未提交,就会出现重复消费的问题。

重复消费解决方案

应对重复消费问题,需要在业务层面,通过 幂等性设计 来解决。

MQ 重复消费不可怕,可怕的是没有应对机制,可以借鉴的思路有:

  • 如果是写关系型数据库,可以先根据主键查询,判断数据是否已存在,存在则更新,不存在则插入;
  • 如果是写 Redis,由于 set 操作天然具有幂等性,所以什么都不用做;
  • 如果是根据消息做较复杂的逻辑处理,可以在消息中加入全局唯一 ID,例如:订单 ID 等。在客户端存储中(Mysql、Redis 等)保存已消费消息的 ID。一旦接受到新消息,先判断消息中的 ID 是否在已消费消息 ID 表中存在,存在则不再处理,不存在则处理。

在实际开发中,可以参考上面的例子,结合现实场景,设计合理的幂等性方案。

消息丢失

如何处理消息丢失的问题如何保证消息不被重复消费 是同一个问题。关注点有:

  • MQ Server 丢失数据
  • 消费方丢失数据
  • 生产方丢失数据

消费方丢失数据

唯一可能导致消费方丢失数据的情况是:消费方设置了自动提交 Offset。一旦设置了自动提交 Offset,接受到消息后就会自动提交 Offset 给 Kafka ,Kafka 就认为消息已被消费。如果此时,消费方尚未来得及处理消息就挂了,那么消息就丢了。

解决方法就是:消费方关闭自动提交 Offset,处理完消息后手动提交 Offset。但这种情况下可能会出现重复消费的情形,需要自行保证幂等性。

Kafka Server 丢失数据

当 Kafka 某个 Broker 宕机,需要重新选举 Partition 的 Leader。若此时其他的 Follower 尚未同步 Leader 的数据,那么新选某个 Follower 为 Leader 后,就丢失了部分数据。

为此,一般要求至少设置 4 个参数:

  • 给 Topic 设置 replication.factor 参数 - 这个值必须大于 1,要求每个 Partition 必须有至少 2 个副本。
  • 在 Kafka 服务端设置 min.insync.replicas 参数 - 这个值必须大于 1,这是要求一个 Leader 需要和至少一个 Follower 保持通信,这样才能确保 Leader 挂了还有替补。
  • 在 Producer 端设置 acks=all - 这意味着:要求每条数据,必须是写入所有 replica 之后,才能认为写入成功了
  • 在 Producer 端设置 retries=MAX(很大很大很大的一个值,无限次重试的意思) - 这意味着要求一旦写入失败,就无限重试,卡在这里了。

生产方丢失数据

如果按照上述的思路设置了 acks=all,生产方一定不会丢数据。

要求是,你的 Leader 接收到消息,所有的 Follower 都同步到了消息之后,才认为本生产消息成功了。如果未满足这个条件,生产者会自动不断的重试,重试无限次。

消息的顺序性

要保证 MQ 的顺序性,势必要付出一定的代价,所以实施方案前,要先明确业务场景是不是有必要保证消息的顺序性。只有那些明确对消息处理顺序有要求的业务场景才值得去保证消息顺序性。

方案一

一个 Topic,一个 Partition,一个 Consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。

方案二

  • 写入数据到 Partition 时指定一个全局唯一的 ID,例如订单 ID。发送方保证相同 ID 的消息有序的发送到同一个 Partition。
  • 基于上一点,消费方从 Kafka Partition 中消费消息时,此刻一定是顺序的。但如果消费方式以并发方式消费消息,顺序就可能会被打乱。为此,还有做到以下几点:
    • 消费方维护 N 个缓存队列,具有相同 ID 的数据都写入同一个队列中;
    • 创建 N 个线程,每个线程只负责从指定的一个队列中取数据。

img

消息积压

假设一个 MQ 消费者可以一秒处理 1000 条消息,三个 MQ 消费者可以一秒处理 3000 条消息,那么一分钟的处理量是 18 万条。如果 MQ 中积压了几百万到上千万的数据,即使消费者恢复了,也需要大概很长的时间才能恢复过来。

对于产线环境来说,漫长的等待是不可接受的,所以面临这种窘境时,只能临时紧急扩容以应对了,具体操作步骤和思路如下:

  • 先修复 Consumer 的问题,确保其恢复消费速度,然后将现有 Consumer 都停掉。
  • 新建一个 Topic,Partition 是原来的 10 倍,临时建立好原先 10 倍的 Queue 数量。
  • 然后写一个临时的分发数据的 Consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 Queue。
  • 接着临时征用 10 倍的机器来部署 Consumer ,每一批 Consumer 消费一个临时 Queue 的数据。这种做法相当于是临时将 Queue 资源和 Consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
  • 等快速消费完积压数据之后,得恢复原先部署的架构重新用原先的 consumer 机器来消费消息。

MQ 的高可用

不同 MQ 实现高可用的原理各不相同。因为 Kafka 比较具有代表性,所以这里以 Kafka 为例。

Kafka 的高可用

Kafka 的核心概念

了解 Kafka,必须先了解 Kafka 的核心概念:

  • Broker - Kafka 集群包含一个或多个节点,这种节点被称为 Broker。

  • Topic - 每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(不同 Topic 的消息是物理隔离的;同一个 Topic 的消息保存在一个或多个 Broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)。对于每一个 Topic, Kafka 集群都会维持一个分区日志。

  • Partition - 了提高 Kafka 的吞吐率,每个 Topic 包含一个或多个 Partition,每个 Partition 在物理上对应一个文件夹,该文件夹下存储这个 Partition 的所有消息和索引文件。

    • Kafka 日志的分区(Partition)分布在 Kafka 集群的节点上。每个节点在处理数据和请求时,共享这些分区。每一个分区都会在已配置的节点上进行备份,确保容错性。

img

Kafka 的副本机制

Kafka 是如何实现高可用的呢?

Kafka 在 0.8 以前的版本中,如果一个 Broker 宕机了,其上面的 Partition 都不能用了,这自然不是高可用的。

为了实现高可用,Kafka 引入了复制功能。

简单来说,就是副本机制( Replicate )。

每个 Partition 都有一个 Leader,零个或多个 Follower。Leader 和 Follower 都是 Broker,每个 Broker 都会成为某些分区的 Leader 和某些分区的 Follower,因此集群的负载是平衡的。

  • Leader 处理一切对 Partition (分区)的读写请求
  • 而 Follower 只需被动的同步 Leader 上的数据

同一个 Topic 的不同 Partition 会分布在多个 Broker 上,而且一个 Partition 还会在其他的 Broker 上面进行备份,Producer 在发布消息到某个 Partition 时,先找到该 Partition 的 Leader,然后向这个 Leader 推送消息;每个 Follower 都从 Leader 拉取消息,拉取消息成功之后,向 Leader 发送一个 ACK 确认。

img

FAQ

问:为什么让 Leader 处理一切对对 Partition (分区)的读写请求?

答:因为如果允许所有 Broker 都可以处理读写请求,就可能产生数据一致性问题。

Kafka 选举 Leader

由上文可知,Partition 在多个 Broker 上存在副本。

如果某个 Follower 宕机,啥事儿没有,正常工作。

如果 Leader 宕机了,会从 Follower 中重新选举一个新的 Leader。

主流 MQ

ActiveMQ

ActiveMQ 是由 Apache 出品,ActiveMQ 是一个完全支持JMS1.1J2EE 1.4 规范的 JMS Provider 实现。它非常快速,支持 多种语言的客户端协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。

img

(a) 主要特性

  1. 服从 JMS 规范JMS 规范提供了良好的标准和保证,包括:同步异步 的消息分发,一次和仅一次的消息分发,消息接收订阅 等等。遵从 JMS 规范的好处在于,不论使用什么 JMS 实现提供者,这些基础特性都是可用的;
  2. 连接灵活性ActiveMQ 提供了广泛的 连接协议,支持的协议有:HTTP/SIP 多播SSLTCPUDP 等等。对众多协议的支持让 ActiveMQ 拥有了很好的灵活性;
  3. 支持的协议种类多OpenWireSTOMPRESTXMPPAMQP
  4. 持久化插件和安全插件ActiveMQ 提供了 多种持久化 选择。而且,ActiveMQ 的安全性也可以完全依据用户需求进行 自定义鉴权授权
  5. 支持的客户端语言种类多:除了 Java 之外,还有:C/C++.NETPerlPHPPythonRuby
  6. 代理集群:多个 ActiveMQ 代理 可以组成一个 集群 来提供服务;
  7. 异常简单的管理ActiveMQ 是以开发者思维被设计的。所以,它并不需要专门的管理员,因为它提供了简单又使用的管理特性。有很多中方法可以 监控 ActiveMQ 不同层面的数据,包括使用在 JConsole 或者在 ActiveMQWeb Console 中使用 JMX。通过处理 JMX 的告警消息,通过使用 命令行脚本,甚至可以通过监控各种类型的 日志

(b) 部署环境

ActiveMQ 可以运行在 Java 语言所支持的平台之上。使用 ActiveMQ 需要:

  • Java JDK
  • ActiveMQ 安装包

(c) 优点

  1. 跨平台 (JAVA 编写与平台无关,ActiveMQ 几乎可以运行在任何的 JVM 上);
  2. 可以用 JDBC:可以将 数据持久化 到数据库。虽然使用 JDBC 会降低 ActiveMQ 的性能,但是数据库一直都是开发人员最熟悉的存储介质;
  3. 支持 JMS 规范:支持 JMS 规范提供的 统一接口;
  4. 支持 自动重连错误重试机制
  5. 有安全机制:支持基于 shirojaas 等多种 安全配置机制,可以对 Queue/Topic 进行 认证和授权
  6. 监控完善:拥有完善的 监控,包括 Web ConsoleJMXShell 命令行,JolokiaRESTful API
  7. 界面友善:提供的 Web Console 可以满足大部分情况,还有很多 第三方的组件 可以使用,比如 hawtio

(d) 缺点

  1. 社区活跃度不及 RabbitMQ 高;
  2. 根据其他用户反馈,会出莫名其妙的问题,会 丢失消息
  3. 目前重心放到 activemq 6.0 产品 Apollo,对 5.x 的维护较少;
  4. 不适合用于 上千个队列 的应用场景;

RabbitMQ

RabbitMQ2007 年发布,是一个在 AMQP (高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

img

(a) 主要特性

  1. 可靠性:提供了多种技术可以让你在 性能可靠性 之间进行 权衡。这些技术包括 持久性机制投递确认发布者证实高可用性机制
  2. 灵活的路由:消息在到达队列前是通过 交换机 进行 路由 的。RabbitMQ 为典型的路由逻辑提供了 多种内置交换机 类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做 RabbitMQ插件 来使用;
  3. 消息集群:在相同局域网中的多个 RabbitMQ 服务器可以 聚合 在一起,作为一个独立的逻辑代理来使用;
  4. 队列高可用:队列可以在集群中的机器上 进行镜像,以确保在硬件问题下还保证 消息安全
  5. 支持多种协议:支持 多种消息队列协议
  6. 支持多种语言:用 Erlang 语言编写,支持只要是你能想到的 所有编程语言
  7. 管理界面RabbitMQ 有一个易用的 用户界面,使得用户可以 监控管理 消息 Broker 的许多方面;
  8. 跟踪机制:如果 消息异常RabbitMQ 提供消息跟踪机制,使用者可以找出发生了什么;
  9. 插件机制:提供了许多 插件,来从多方面进行扩展,也可以编写自己的插件。

(b) 部署环境

RabbitMQ 可以运行在 Erlang 语言所支持的平台之上,包括 SolarisBSDLinuxMacOSXTRU64Windows 等。使用 RabbitMQ 需要:

  • ErLang 语言包
  • RabbitMQ 安装包

(c) 优点

  1. 由于 Erlang 语言的特性,消息队列性能较好,支持 高并发
  2. 健壮、稳定、易用、跨平台、支持 多种语言、文档齐全;
  3. 有消息 确认机制持久化机制,可靠性高;
  4. 高度可定制的 路由
  5. 管理界面 较丰富,在互联网公司也有较大规模的应用,社区活跃度高。

(d) 缺点

  1. 尽管结合 Erlang 语言本身的并发优势,性能较好,但是不利于做 二次开发和维护
  2. 实现了 代理架构,意味着消息在发送到客户端之前可以在 中央节点 上排队。此特性使得 RabbitMQ 易于使用和部署,但是使得其 运行速度较慢,因为中央节点 增加了延迟消息封装后 也比较大;
  3. 需要学习 比较复杂接口和协议,学习和维护成本较高。

RocketMQ

RocketMQ 出自 阿里 的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进,消息可靠性上Kafka 更好。RocketMQ 在阿里内部  被广泛应用在 订单交易充值流计算消息推送日志流式处理binglog 分发 等场景。

(a) 主要特性

  1. 基于 队列模型:具有 高性能高可靠高实时分布式 等特点;
  2. ProducerConsumer队列 都支持 分布式
  3. Producer 向一些队列轮流发送消息,队列集合 称为 TopicConsumer 如果做 广播消费,则一个 Consumer 实例消费这个 Topic 对应的 所有队列;如果做 集群消费,则 多个 Consumer 实例 平均消费 这个 Topic 对应的队列集合;
  4. 能够保证 严格的消息顺序
  5. 提供丰富的 消息拉取模式
  6. 高效的订阅者 水平扩展能力;
  7. 实时消息订阅机制
  8. 亿级 消息堆积 能力;
  9. 较少的外部依赖。

(b) 部署环境

RocketMQ 可以运行在 Java 语言所支持的平台之上。使用 RocketMQ 需要:

  • Java JDK
  • 安装 gitMaven
  • RocketMQ 安装包

(c) 优点

  1. 单机 支持 1 万以上 持久化队列
  2. RocketMQ 的所有消息都是 持久化的,先写入系统 PAGECACHE,然后 刷盘,可以保证 内存磁盘 都有一份数据,而 访问 时,直接 从内存读取
  3. 模型简单,接口易用(JMS 的接口很多场合并不太实用);
  4. 性能非常好,可以允许 大量堆积消息Broker 中;
  5. 支持 多种消费模式,包括 集群消费广播消费等;
  6. 各个环节 分布式扩展设计,支持 主从高可用
  7. 开发度较活跃,版本更新很快。

(d) 缺点

  1. 支持的 客户端语言 不多,目前是 JavaC++,其中 C++ 还不成熟;
  2. RocketMQ 社区关注度及成熟度也不及前两者;
  3. 没有 Web 管理界面,提供了一个 CLI (命令行界面) 管理工具带来 查询管理诊断各种问题
  4. 没有在 MQ 核心里实现 JMS 等接口;

Kafka

Apache Kafka 是一个 分布式消息发布订阅 系统。它最初由 LinkedIn 公司基于独特的设计实现为一个 分布式的日志提交系统 (a distributed commit log),之后成为 Apache 项目的一部分。Kafka 性能高效可扩展良好 并且 可持久化。它的 分区特性可复制可容错 都是其不错的特性。

(a) 主要特性

  1. 快速持久化:可以在 O(1) 的系统开销下进行 消息持久化
  2. 高吞吐:在一台普通的服务器上既可以达到 10W/s吞吐速率
  3. 完全的分布式系统BrokerProducerConsumer 都原生自动支持 分布式,自动实现 负载均衡
  4. 支持 同步异步 复制两种 高可用机制
  5. 支持 数据批量发送拉取
  6. **零拷贝技术(zero-copy)**:减少 IO 操作步骤,提高 系统吞吐量
  7. 数据迁移扩容 对用户透明;
  8. 无需停机 即可扩展机器;
  9. 其他特性:丰富的 消息拉取模型、高效 订阅者水平扩展、实时的 消息订阅、亿级的 消息堆积能力、定期删除机制;

(b) 部署环境

使用 Kafka 需要:

  • Java JDK
  • Kafka 安装包

(c) 优点

  1. 客户端语言丰富:支持 Java.NetPHPRubyPythonGo 等多种语言;
  2. 高性能:单机写入 TPS 约在 100 万条/秒,消息大小 10 个字节;
  3. 提供 完全分布式架构,并有 replica 机制,拥有较高的 可用性可靠性,理论上支持 消息无限堆积
  4. 支持批量操作;
  5. 消费者 采用 Pull 方式获取消息。消息有序通过控制 能够保证所有消息被消费且仅被消费 一次
  6. 有优秀的第三方 Kafka Web 管理界面 Kafka-Manager
  7. 日志领域 比较成熟,被多家公司和多个开源项目使用。

(d) 缺点

  1. Kafka 单机超过 64队列/分区 时,Load 时会发生明显的飙高现象。队列 越多,负载 越高,发送消息 响应时间变长
  2. 使用 短轮询方式实时性 取决于 轮询间隔时间
  3. 消费失败 不支持重试
  4. 支持 消息顺序,但是 一台代理宕机 后,就会产生 消息乱序
  5. 社区更新较慢。

MQ 的技术选型

MQ 的技术选型一般要考虑以下几点:

  • 是否开源:这决定了能否商用,所以最为重要。
  • 社区活跃度越高越好:高社区活跃度,一般保证了低 Bug 率,因为大部分 Bug,已经有人遇到并解决了。
  • 技术生态适配性:客户端对各种编程语言的支持。比如:如果使用 MQ 的都是 Java 应用,那么 ActiveMQ、RabbitMQ、RocketMQ、Kafka 都可以。如果需要支持其他语言,那么 RMQ 比较合适,因为它支持的编程语言比较丰富。如果 MQ 是应用于大数据或流式计算,那么 Kafka 几乎是标配。如果是应用于在线业务系统,那么 Kafka 就不合适了,可以考虑 RabbitMQ、 RocketMQ 很合适。
  • 高可用性:应用于线上的准入标准。
  • 性能:具备足够好的性能,能满足绝大多数场景的性能要求。
特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,比 RocketMQ、Kafka 低一个数量级 同 ActiveMQ 10 万级,支撑高吞吐 10 万级,高吞吐,一般配合大数据类的系统来进行流式计算、日志采集等场景
topic 数量对吞吐量的影响 topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性 ms 级 微秒级,这是 RabbitMQ 的一大特点,延迟最低 ms 级 延迟在 ms 级以内
可用性 高,基于主从架构实现高可用 同 ActiveMQ 非常高,分布式架构 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到 0 丢失 同 RocketMQ
功能支持 MQ 领域的功能极其完备 基于 erlang 开发,并发能力很强,性能极好,延时很低 MQ 功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

综上,各种对比之后,有如下建议:

  • 业务系统场景,建议使用 RocketMQ、RabbitMQ。如果所有应用都是 Java,优选 RocketMQ,因为 RocketMQ 本身就是 Java 开发的,所以最适配。如果业务中有多种编程语言的应用,建议选择 RabbitMQ。
  • 大数据和流式计算领域,或是作为日志缓冲,强烈建议选择 Kafka,业界标准,久经考验。

JMS

提到 MQ,就顺便提一下 JMS 。

JMS(JAVA Message Service,java 消息服务)API 是一个消息服务的标准/规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

在 EJB 架构中,有消息 bean 可以无缝的与 JMS 消息服务集成。在 J2EE 架构模式中,有消息服务者模式,用于实现消息与应用直接的解耦。

消息模型

在 JMS 标准中,有两种消息模型:

  • P2P(Point to Point)
  • Pub/Sub(Publish/Subscribe)

P2P 模式

P2P 模式包含三个角色:MQ(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

P2P 的特点

  • 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在 MQ 中)
  • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
  • 接收者在成功接收消息之后需向队列应答成功

如果希望发送的每个消息都会被成功处理的话,那么需要 P2P 模式。

Pub/sub 模式

包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到 Topic,系统将这些消息传递给多个订阅者。

Pub/Sub 的特点

  • 每个消息可以有多个消费者
  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
  • 为了消费消息,订阅者必须保持运行的状态。

为了缓和这样严格的时间相关性,JMS 允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用 Pub/Sub 模型。

消息消费

在 JMS 中,消息的产生和消费都是异步的。对于消费来说,JMS 的消息者可以通过两种方式来消费消息。

  • 同步 - 订阅者或接收者通过 receive 方法来接收消息,receive 方法在接收到消息之前(或超时之前)将一直阻塞;
  • 异步 - 订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的 onMessage 方法。

JNDI - Java 命名和目录接口,是一种标准的 Java 命名系统接口。可以在网络上查找和访问服务。通过指定一个资源名称,该名称对应于数据库或命名服务中的一个记录,同时返回资源连接建立所必须的信息。

JNDI 在 JMS 中起到查找和访问发送目标或消息来源的作用。

JMS 编程模型

ConnectionFactory

创建 Connection 对象的工厂,针对两种不同的 jms 消息模型,分别有 QueueConnectionFactory 和 TopicConnectionFactory 两种。可以通过 JNDI 来查找 ConnectionFactory 对象。

Destination

Destination 的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的 Destination 是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的 Destination 也是某个队列或主题(即消息来源)。

所以,Destination 实际上就是两种类型的对象:Queue、Topic。可以通过 JNDI 来查找 Destination。

Connection

Connection 表示在客户端和 JMS 系统之间建立的链接(对 TCP/IP socket 的包装)。Connection 可以产生一个或多个 Session。跟 ConnectionFactory 一样,Connection 也有两种类型:QueueConnection 和 TopicConnection。

Session

Session 是操作消息的接口。可以通过 session 创建生产者、消费者、消息等。Session 提供了事务的功能。当需要使用 session 发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分 QueueSession 和 TopicSession。

消息的生产者

消息生产者由 Session 创建,并用于将消息发送到 Destination。同样,消息生产者分两种类型:QueueSender 和 TopicPublisher。可以调用消息生产者的方法(send 或 publish 方法)发送消息。

消息消费者

消息消费者由 Session 创建,用于接收被发送到 Destination 的消息。两种类型:QueueReceiver 和 TopicSubscriber。可分别通过 session 的 createReceiver(Queue)或 createSubscriber(Topic)来创建。当然,也可以 session 的 creatDurableSubscriber 方法来创建持久化的订阅者。

MessageListener

消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的 onMessage 方法。EJB 中的 MDB(Message-Driven Bean)就是一种 MessageListener。

参考资料

缓存基本原理

缓存是一种利用空间换时间的设计,其目标就是更快更近

缓存简介

为什么需要缓存

众所周知,当今是一个互联网时代,而互联网应用几乎遍及我们日常生活的方方面面。一般而言,一个互联网应用的请求/响应流程会有以下几个主要流程:

  • 客户端发起请求,请求经过网络 I/O,分发到服务层。
  • 服务层可能有多级服务,请求需要被多个服务层层处理。
  • 不同服务根据请求进行计算时,可能依赖于不同数据库的数据,需要通过网络 I/O 读写数据库。

显然,这一套流程下来,可能需要消耗大量的计算机资源,并且响应时间也可能很久。如果并发请求量很大的话,可能会进一步加剧这种问题。

img

为了解决以上问题,最直接的方式就是引入缓存。缓存可以作用于请求/响应流程的任意环节,并有效减少后续环节的执行次数,从而大大提升性能。

实际上,缓存作为性能优化的第一手段,被广泛应用于计算机的硬件、软件领域。

什么是缓存

缓存就是数据交换的缓冲区,用于将频繁访问的数据暂存缓存的本质是一个内存 Hash

缓存的本质是一种利用空间换时间的设计:牺牲一定的数据实时性,使得访问更快更近

  • 将数据存储到读取速度更快的存储(设备);
  • 将数据存储到离应用最近的位置;
  • 将数据存储到离用户最近的位置。

缓存是用于存储数据的硬件或软件的组成部分,以使得后续更快访问相应的数据。缓存中的数据可能是提前计算好的结果、数据的副本等。典型的应用场景:有 cpu cache, 磁盘 cache 等。本文中提及到缓存主要是指互联网应用中所使用的缓存组件。

缓存命中率是缓存的重要度量指标,命中率越高越好。

1
缓存命中率 = 从缓存中读取次数 / 总读取次数

何时需要缓存

引入缓存,会增加系统的复杂度,并牺牲一定的数据实时性。所以,引入缓存前,需要先权衡是否值得,考量点如下:

  • CPU 开销 - 如果应用某个计算需要消耗大量 CPU,可以考虑缓存其计算结果。典型场景:复杂的、频繁调用的正则计算;分布式计算中间状态等。
  • IO 开销 - 如果数据库连接池比较繁忙,可以考虑缓存其查询结果。

在数据层引入缓存,有以下几个好处:

  • 提升数据读取速度。
  • 提升系统扩展能力,通过扩展缓存,提升系统承载能力。
  • 降低存储成本,Cache+DB 的方式可以承担原有需要多台 DB 才能承担的请求量,节省机器成本。

缓存的基本原理

根据业务场景,通常缓存有以下几种使用方式:

  • 懒汉式(读时触发):先查询 DB 里的数据, 然后把相关的数据写入 Cache。
  • 饥饿式(写时触发):写入 DB 后, 然后把相关的数据也写入 Cache。
  • 定期刷新:适合周期性的跑数据的任务,或者列表型的数据,而且不要求绝对实时性。

缓存淘汰策略

缓存淘汰的类型:

  • 基于空间 - 设置缓存空间大小。
  • 基于容量 - 设置缓存存储记录数。
  • 基于时间
    • TTL(Time To Live,即存活期)缓存数据从创建到过期的时间。
    • TTI(Time To Idle,即空闲期)缓存数据多久没被访问的时间。

缓存淘汰算法:

  • FIFO (first in first out) - 先进先出。在这种淘汰算法中,先进入缓存的会先被淘汰。这种可谓是最简单的了,但是会导致我们命中率很低。试想一下我们如果有个访问频率很高的数据是所有数据第一个访问的,而那些不是很高的是后面再访问的,那这样就会把我们的首个数据但是他的访问频率很高给挤出。
  • LRU (least recently used) - 最近最少使用算法。这种算法避免了 FIFO 命中率不高的问题:每次访问数据都会将其放在我们的队尾,如果需要淘汰数据,就只需要淘汰队首即可。但是这个算法依然有缺点:假设,缓存只保留 1 分钟以内的热点数据。如果有个数据在 1 个小时的前 59 分钟访问了 1 万次(可见这是个热点数据),最后一分钟没有任何访问;但是,其他的数据有被访问,就会导致这个热点数据被淘汰。
  • LFU (less frequently used) - 最近最少频率使用。在这种算法中又对上面进行了优化,利用额外的空间记录每个数据的使用频率,然后选出频率最低进行淘汰。这样就避免了 LRU 不能处理时间段的问题。

这三种缓存淘汰算法,实现复杂度一个比一个高,同样的命中率也是一个比一个好。而我们一般来说选择的方案居中即可,即实现成本不是太高,而命中率也还行的 LRU

缓存的分类

缓存从部署角度,可以分为客户端缓存和服务端缓存。

客户端缓存

  • Http 缓存:HTTP/1.1 中的 Cache-Control、HTTP/1 中的 Expires
  • 浏览器缓存:HTML5 提供的 SessionStorage 和 LocalStorage、Cookie
  • APP 缓存
    • Android
    • IOS

服务端缓存

  • CDN 缓存 - 存放 HTML、CSS、JS 等静态资源。
  • 反向代理缓存 - 动静分离,只缓存用户请求的静态资源。
  • 数据库缓存 - 数据库(如 Mysql)自身一般也有缓存,但因为命中率和更新频率问题,不推荐使用。
  • 进程内缓存 - 缓存应用字典等常用数据。
  • 分布式缓存 - 缓存数据库中的热点数据。

其中,CDN 缓存、反向代理缓存、数据库缓存一般由专职人员维护(运维、DBA)。

后端开发一般聚焦于进程内缓存、分布式缓存。

HTTP 缓存

  1. 浏览器发送请求前,根据请求头的 expires (HTTP/1) 和 cache-control (HTTP/1.1) 判断是否命中(包括是否过期)强缓存策略,

    1. 如果命中,直接从缓存获取资源,并不会发送请求。
    2. 如果没有命中,则进入下一步。
  2. 没有命中强缓存规则,浏览器会发送请求,根据请求头的 last-modifiedetag 判断是否命中协商缓存,如果命中,直接从缓存获取资源。如果没有命中,则进入下一步。

  3. 如果前两步都没有命中,则直接从服务端获取资源。

CDN 缓存

CDN 将数据缓存到离用户物理距离最近的服务器,使得用户可以就近获取请求内容。CDN 一般缓存静态资源文件(页面,脚本,图片,视频,文件等)

国内网络异常复杂,跨运营商的网络访问会很慢。为了解决跨运营商或各地用户访问问题,可以在重要的城市,部署 CDN 应用。使用户就近获取所需内容,降低网络拥塞,提高用户访问响应速度和命中率。

img

CDN 原理

CDN 的基本原理是广泛采用各种缓存服务器,将这些缓存服务器分布到用户访问相对集中的地区或网络中,在用户访问网站时,利用全局负载技术将用户的访问指向距离最近的工作正常的缓存服务器上,由缓存服务器直接响应用户请求。

(1)未部署 CDN 应用前的网络路径:

  • 请求:本机网络(局域网)=> 运营商网络 => 应用服务器机房
  • 响应:应用服务器机房 => 运营商网络 => 本机网络(局域网)

在不考虑复杂网络的情况下,从请求到响应需要经过 3 个节点,6 个步骤完成一次用户访问操作。

(2)部署 CDN 应用后网络路径:

  • 请求:本机网络(局域网) => 运营商网络
  • 响应:运营商网络 => 本机网络(局域网)

在不考虑复杂网络的情况下,从请求到响应需要经过 2 个节点,2 个步骤完成一次用户访问操作。

与不部署 CDN 服务相比,减少了 1 个节点,4 个步骤的访问。极大的提高了系统的响应速度。

CDN 特点

优点

  • 本地 Cache 加速 - 提升访问速度,尤其含有大量图片和静态页面站点;
  • 实现跨运营商的网络加速 - 消除了不同运营商之间互联的瓶颈造成的影响,实现了跨运营商的网络加速,保证不同网络中的用户都能得到良好的访问质量;
  • 远程加速 - 远程访问用户根据 DNS 负载均衡技术智能自动选择 Cache 服务器,选择最快的 Cache 服务器,加快远程访问的速度;
  • 带宽优化 - 自动生成服务器的远程 Mirror(镜像)cache 服务器,远程用户访问时从 cache 服务器上读取数据,减少远程访问的带宽、分担网络流量、减轻原站点 WEB 服务器负载等功能。
  • 集群抗攻击 - 广泛分布的 CDN 节点加上节点之间的智能冗余机制,可以有效地预防黑客入侵以及降低各种 D.D.o.S 攻击对网站的影响,同时保证较好的服务质量。

缺点

  • 不适宜缓存动态资源
    • 解决方案:主要缓存静态资源,动态资源建立多级缓存或准实时同步;
  • 存在数据的一致性问题
    • 解决方案(主要是在性能和数据一致性二者间寻找一个平衡)
    • 设置缓存失效时间(1 个小时,过期后同步数据)。
    • 针对资源设置版本号。

反向代理缓存

反向代理(Reverse Proxy)方式是指以代理服务器来接受 internet 上的连接请求,然后将请求转发给内部网络上的服务器,并将从服务器上得到的结果返回给 internet 上请求连接的客户端,此时代理服务器对外就表现为一个反向代理服务器。

img

反向代理缓存原理

反向代理位于应用服务器同一网络,处理所有对 WEB 服务器的请求。

反向代理缓存的原理:

  • 如果用户请求的页面在代理服务器上有缓存的话,代理服务器直接将缓存内容发送给用户。
  • 如果没有缓存则先向 WEB 服务器发出请求,取回数据,本地缓存后再发送给用户。

这种方式通过降低向 WEB 服务器的请求数,从而降低了 WEB 服务器的负载。

反向代理缓存一般针对的是静态资源,而将动态资源请求转发到应用服务器处理。常用的缓存应用服务器有 Varnish,Ngnix,Squid。

反向代理缓存比较

常用的代理缓存有 Varnish,Squid,Ngnix,简单比较如下:

  • Varnish 和 Squid 是专业的 cache 服务,Ngnix 需要第三方模块支持;
  • Varnish 采用内存型缓存,避免了频繁在内存、磁盘中交换文件,性能比 Squid 高;
  • Varnish 由于是内存 cache,所以对小文件如 css、js、小图片的支持很棒,后端的持久化缓存可以采用的是 Squid 或 ATS;
  • Squid 功能全而大,适合于各种静态的文件缓存,一般会在前端挂一个 HAProxy 或 Ngnix 做负载均衡跑多个实例;
  • Nginx 采用第三方模块 ncache 做的缓冲,性能基本达到 Varnish,一般作为反向代理使用,可以实现简单的缓存。

进程内缓存

进程内缓存是指应用内部的缓存,标准的分布式系统,一般有多级缓存构成。本地缓存是离应用最近的缓存,一般可以将数据缓存到硬盘或内存。

  • 硬盘缓存 - 将数据缓存到硬盘中,读取时从硬盘读取。原理是直接读取本机文件,减少了网络传输消耗,比通过网络读取数据库速度更快。可以应用在对速度要求不是很高,但需要大量缓存存储的场景。
  • 内存缓存 - 直接将数据存储到本机内存中,通过程序直接维护缓存对象,是访问速度最快的方式。

常见的本地缓存实现方案:HashMap、Guava Cache、Caffeine、Ehcache。

ConcurrentHashMap

最简单的进程内缓存可以通过 JDK 自带的 HashMapConcurrentHashMap 实现。

适用场景:不需要淘汰的缓存数据

缺点:无法进行缓存淘汰,内存会无限制的增长。

LRUHashMap

可以通过**继承 LinkedHashMap 来实现一个简单的 LRUHashMap**。重写 removeEldestEntry 方法,即可完成一个简单的最近最少使用算法。

缺点:

  • 锁竞争严重,性能比较低。
  • 不支持过期时间
  • 不支持自动刷新

Guava Cache

解决了 LRUHashMap 中的几个缺点。

Guava Cache 采用了类似 ConcurrentHashMap 的思想,分段加锁,减少锁竞争。

Guava Cache 对于过期的 Entry 并没有马上过期(也就是并没有后台线程一直在扫),而是通过进行读写操作的时候进行过期处理,这样做的好处是避免后台线程扫描的时候进行全局加锁。

直接通过查询,判断其是否满足刷新条件,进行刷新。

Caffeine

Caffeine 实现了 W-TinyLFU(LFU + LRU 算法的变种),其命中率和读写吞吐量大大优于 Guava Cache

其实现原理较复杂,可以参考你应该知道的缓存进化史

Ehcache

EhCache 是一个纯 Java 的进程内缓存框架,具有快速、精干等特点,是 Hibernate 中默认的 CacheProvider。

优点

  • 快速、简单
  • 支持多种缓存策略:LRU、LFU、FIFO 淘汰算法
  • 缓存数据有两级:内存和磁盘,因此无需担心容量问题
  • 缓存数据会在虚拟机重启的过程中写入磁盘
  • 可以通过 RMI、可插入 API 等方式进行分布式缓存
  • 具有缓存和缓存管理器的侦听接口
  • 支持多缓存管理器实例,以及一个实例的多个缓存区域
  • 提供 Hibernate 的缓存实现

缺点

  • 使用磁盘 Cache 的时候非常占用磁盘空间
  • 不保证数据的安全
  • 虽然支持分布式缓存,但效率不高(通过组播方式,在不同节点之间同步数据)。

进程内缓存对比

常用进程内缓存技术对比:

比较项 ConcurrentHashMap LRUMap Ehcache Guava Cache Caffeine
读写性能 很好,分段锁 一般,全局加锁 好,需要做淘汰操作 很好
淘汰算法 LRU,一般 支持多种淘汰算法,LRU,LFU,FIFO LRU,一般 W-TinyLFU, 很好
功能丰富程度 功能比较简单 功能比较单一 功能很丰富 功能很丰富,支持刷新和虚引用等 功能和 Guava Cache 类似
工具大小 jdk 自带类,很小 基于 LinkedHashMap,较小 很大,最新版本 1.4MB 是 Guava 工具类中的一个小部分,较小 一般,最新版本 644KB
是否持久化
是否支持集群
  • ConcurrentHashMap - 比较适合缓存比较固定不变的元素,且缓存的数量较小的。虽然从上面表格中比起来有点逊色,但是其由于是 JDK 自带的类,在各种框架中依然有大量的使用,比如我们可以用来缓存我们反射的 Method,Field 等等;也可以缓存一些链接,防止其重复建立。在 Caffeine 中也是使用的 ConcurrentHashMap 来存储元素。
  • LRUMap - 如果不想引入第三方包,又想使用淘汰算法淘汰数据,可以使用这个。
  • Ehcache - 由于其 jar 包很大,较重量级。对于需要持久化和集群的一些功能的,可以选择 Ehcache。需要注意的是,虽然 Ehcache 也支持分布式缓存,但是由于其节点间通信方式为 rmi,表现不如 Redis,所以一般不建议用它来作为分布式缓存。
  • Guava Cache - Guava 这个 jar 包在很多 Java 应用程序中都有大量的引入,所以很多时候其实是直接用就好了,并且其本身是轻量级的而且功能较为丰富,在不了解 Caffeine 的情况下可以选择 Guava Cache。
  • Caffeine - 其在命中率,读写性能上都比 Guava Cache 好很多,并且其 API 和 Guava cache 基本一致,甚至会多一点。在真实环境中使用 Caffeine,取得过不错的效果。

总结一下:**如果不需要淘汰算法则选择 ConcurrentHashMap,如果需要淘汰算法和一些丰富的 API,推荐选择 Caffeine**。

分布式缓存

分布式缓存解决了进程内缓存最大的问题:如果应用是分布式系统,节点之间无法共享彼此的进程内缓存

分布式缓存的应用场景:

  • 缓存经过复杂计算得到的数据
  • 缓存系统中频繁访问的热点数据,减轻数据库压力

不同分布式缓存的实现原理往往有比较大的差异。本文主要针对 Memcached 和 Redis 进行说明。

Memcached

Memcached 是一个高性能,分布式内存对象缓存系统,通过在内存里维护一个统一的巨大的 hash 表,它能够用来存储各种格式的数据,包括图像、视频、文件以及数据库检索的结果等。

简单的说就是:将数据缓存到内存中,然后从内存中读取,从而大大提高读取速度。

Memcached 特性

  • 使用物理内存作为缓存区,可独立运行在服务器上。每个进程最大 2G,如果想缓存更多的数据,可以开辟更多的 Memcached 进程(不同端口)或者使用分布式 Memcached 进行缓存,将数据缓存到不同的物理机或者虚拟机上。
  • 使用 key-value 的方式来存储数据。这是一种单索引的结构化数据组织形式,可使数据项查询时间复杂度为 O(1)。
  • 协议简单,基于文本行的协议。直接通过 telnet 在 Memcached 服务器上可进行存取数据操作,简单,方便多种缓存参考此协议;
  • 基于 libevent 高性能通信。Libevent 是一套利用 C 开发的程序库,它将 BSD 系统的 kqueue,Linux 系统的 epoll 等事件处理功能封装成一个接口,与传统的 select 相比,提高了性能。
  • 分布式能力取决于 Memcached 客户端,服务器之间互不通信。各个 Memcached 服务器之间互不通信,各自独立存取数据,不共享任何信息。服务器并不具有分布式功能,分布式部署取决于 Memcached 客户端。
  • 采用 LRU 缓存淘汰策略。在 Memcached 内存储数据项时,可以指定它在缓存的失效时间,默认为永久。当 Memcached 服务器用完分配的内时,失效的数据被首先替换,然后也是最近未使用的数据。在 LRU 中,Memcached 使用的是一种 Lazy Expiration 策略,自己不会监控存入的 key/vlue 对是否过期,而是在获取 key 值时查看记录的时间戳,检查 key/value 对空间是否过期,这样可减轻服务器的负载。
  • 内置了一套高效的内存管理算法。这套内存管理效率很高,而且不会造成内存碎片,但是它最大的缺点就是会导致空间浪费。当内存满后,通过 LRU 算法自动删除不使用的缓存。
  • 不支持持久化。Memcached 没有考虑数据的容灾问题,重启服务,所有数据会丢失。

Memcached 工作原理

(1)内存管理

Memcached 利用 slab allocation 机制来分配和管理内存,它按照预先规定的大小,将分配的内存分割成特定长度的内存块,再把尺寸相同的内存块分成组,数据在存放时,根据键值 大小去匹配 slab 大小,找就近的 slab 存放,所以存在空间浪费现象。

这套内存管理效率很高,而且不会造成内存碎片,但是它最大的缺点就是会导致空间浪费。

(2)缓存淘汰策略

Memcached 的缓存淘汰策略是 LRU + 到期失效策略。

当你在 Memcached 内存储数据项时,你有可能会指定它在缓存的失效时间,默认为永久。当 Memcached 服务器用完分配的内时,失效的数据被首先替换,然后是最近未使用的数据。

在 LRU 中,Memcached 使用的是一种 Lazy Expiration 策略:Memcached 不会监控存入的 key/vlue 对是否过期,而是在获取 key 值时查看记录的时间戳,检查 key/value 对空间是否过期,这样可减轻服务器的负载。

(3)分区

Memcached 服务器之间彼此不通信,它的分布式能力是依赖客户端来实现。

具体来说,就是在客户端实现一种算法,根据 key 来计算出数据应该向哪个服务器节点读/写。

而这种选取集群节点的算法常见的有三种:

  • 哈希取余算法 - 使用公式:hash(key)% N 计算出 哈希值 来决定数据映射到哪一个节点。
  • 一致性哈希算法 - 可以很好的解决 稳定性问题,可以将所有的 存储节点 排列在 首尾相接Hash 环上,每个 key 在计算 Hash 后会 顺时针 找到 临接存储节点 存放。而当有节点 加入退出 时,仅影响该节点在 Hash 环上 顺时针相邻后续节点
  • 虚拟 Hash 槽算法 - 使用 分散度良好哈希函数 把所有数据 映射 到一个 固定范围整数集合 中,整数定义为 slot),这个范围一般 远远大于 节点数。 是集群内 数据管理迁移基本单位。采用 大范围槽 的主要目的是为了方便 数据拆分集群扩展。每个节点会负责 一定数量的槽

Redis

Redis 是一个开源(BSD 许可)的,基于内存的,多数据结构存储系统。可以用作数据库、缓存和消息中间件。

Redis 还可以使用客户端分片来扩展写性能。内置了 复制(replication),LUA 脚本(Lua scripting),LRU 驱动事件(LRU eviction),事务(transactions) 和不同级别的 磁盘持久化(persistence), 并通过 Redis 哨兵(Sentinel)和自动分区(Cluster)提供高可用性(high availability)。

Redis 特性

  • 支持多种数据类型 - string、hash、list、set、sorted set。

  • 支持多种数据淘汰策略

    • volatile-lru - 从已设置过期时间的数据集中挑选最近最少使用的数据淘汰
    • volatile-ttl - 从已设置过期时间的数据集中挑选将要过期的数据淘汰
    • volatile-random - 从已设置过期时间的数据集中任意选择数据淘汰
    • allkeys-lru - 从所有数据集中挑选最近最少使用的数据淘汰
    • allkeys-random - 从所有数据集中任意选择数据进行淘汰
    • noeviction - 禁止驱逐数据
  • 提供两种持久化方式 - RDB 和 AOF

  • 通过 Redis cluster 提供集群模式。

Redis 原理

  • 缓存淘汰
    • Redis 有两种数据淘汰实现
      • 消极方式 - 访问 Redis key 时,如果发现它已经失效,则删除它
      • 积极方式 - 周期性从设置了失效时间的 key 中,根据淘汰策略,选择一部分失效的 key 进行删除。
  • 分区
    • Redis Cluster 集群包含 16384 个虚拟 Hash 槽,它通过一个高效的算法来计算 key 属于哪个 Hash 槽。
    • Redis Cluster 支持请求分发 - 节点在接到一个命令请求时,会先检测这个命令请求要处理的键所在的槽是否由自己负责,如果不是的话,节点将向客户端返回一个 MOVED 错误,MOVED 错误携带的信息可以指引客户端将请求重定向至正在负责相关槽的节点。
  • 主从复制
    • Redis 2.8 后支持异步复制。它有两种模式:
      • 完整重同步(full resychronization) - 用于初次复制。执行步骤与 SYNC 命令基本一致。
      • 部分重同步(partial resychronization) - 用于断线后重复制。如果条件允许,主服务器可以将主从服务器连接断开期间执行的写命令发送给从服务器,从服务器只需接收并执行这些写命令,即可将主从服务器的数据库状态保持一致。
    • 集群中每个节点都会定期向集群中的其他节点发送 PING 消息,以此来检测对方是否在线。
    • 如果一个主节点被认为下线,则在其从节点中,根据 Raft 算法,选举出一个节点,升级为主节点。
  • 数据一致性
    • Redis 不保证强一致性,因为这会使得集群性能大大降低。
    • Redis 是通过异步复制来实现最终一致性。

分布式缓存对比

不同的分布式缓存功能特性和实现原理方面有很大的差异,因此他们所适应的场景也有所不同。

img

这里选取三个比较出名的分布式缓存(MemCache,Redis,Tair)来作为比较:

比较项 MemCache Redis Tair
数据结构 只支持简单的 Key-Value 结构 String,Hash, List, Set, Sorted Set String,HashMap, List,Set
持久化 不支持 支持 支持
容量大小 数据纯内存,数据存储不宜过多 数据全内存,资源成本考量不宜超过 100GB 可以配置全内存或内存+磁盘引擎,数据容量可无限扩充
读写性能 很高 很高(RT0.5ms 左右) String 类型比较高(RT1ms 左右),复杂类型比较慢(RT5ms 左右)
过期策略 过期后,不删除缓存 有六种策略来处理过期数据 支持
  • MemCache - 只适合基于内存的缓存框架;且不支持数据持久化和容灾。
  • Redis - 支持丰富的数据结构,读写性能很高,但是数据全内存,必须要考虑资源成本,支持持久化。
  • Tair - 支持丰富的数据结构,读写性能较高,部分类型比较慢,理论上容量可以无限扩充。

总结:如果服务对延迟比较敏感,Map/Set 数据也比较多的话,比较适合 Redis。如果服务需要放入缓存量的数据很大,对延迟又不是特别敏感的话,那就可以选择 Memcached。

多级缓存

整体缓存框架

通常,一个大型软件系统的缓存采用多级缓存方案:

img

请求过程:

  1. 浏览器向客户端发起请求,如果 CDN 有缓存则直接返回;
  2. 如果 CDN 无缓存,则访问反向代理服务器;
  3. 如果反向代理服务器有缓存则直接返回;
  4. 如果反向代理服务器无缓存或动态请求,则访问应用服务器;
  5. 应用服务器访问进程内缓存;如果有缓存,则返回代理服务器,并缓存数据;(动态请求不缓存)
  6. 如果进程内缓存无数据,则读取分布式缓存;并返回应用服务器;应用服务器将数据缓存到本地缓存(部分);
  7. 如果分布式缓存无数据,则应用程序读取数据库数据,并放入分布式缓存;

使用进程内缓存

如果应用服务是单点应用,那么进程内缓存当然是缓存的首选方案

对于进程内缓存,其本来受限于内存的大小的限制,以及进程缓存更新后其他缓存无法得知,所以一般来说进程缓存适用于:

  • 数据量不是很大且更新频率较低的数据。
  • 如果更新频繁的数据,也想使用进程内缓存,那么可以将其过期时间设置为较短的时间,或者设置较短的自动刷新时间。

这种方案存在以下问题:

  • 如果应用服务是分布式系统,应用节点之间无法共享缓存,存在数据不一致问题。
  • 由于进程内缓存受限于内存大小的限制,所以缓存不能无限扩展。

使用分布式缓存

如果应用服务是分布式系统,那么最简单的缓存方案就是直接使用分布式缓存。

其应用场景如图所示:

img

Redis 用来存储热点数据,如果缓存不命中,则去查询数据库,并更新缓存。

这种方案存在以下问题:

  1. 缓存服务如果挂了,这时应用只能访问数据库,容易造成缓存雪崩。
  2. 访问分布式缓存服务会有一定的 I/O 以及序列化反序列化的开销,虽然性能很高,但是其终究没有在内存中查询快。

使用多级缓存

单纯使用进程内缓存和分布式缓存都存在各自的不足。如果需要更高的性能以及更好的可用性,我们可以将缓存设计为多级结构。将最热的数据使用进程内缓存存储在内存中,进一步提升访问速度。

这个设计思路在计算机系统中也存在,比如 CPU 使用 L1、L2、L3 多级缓存,用来减少对内存的直接访问,从而加快访问速度。

一般来说,多级缓存架构使用二级缓存已可以满足大部分业务需求,过多的分级会增加系统的复杂度以及维护的成本。因此,多级缓存不是分级越多越好,需要根据实际情况进行权衡。

一个典型的二级缓存架构,可以使用进程内缓存(如: Caffeine/Google Guava/Ehcache/HashMap)作为一级缓存;使用分布式缓存(如:Redis/Memcached)作为二级缓存。

多级缓存查询

img

多级缓存查询流程如下:

  1. 首先,查询 L1 缓存,如果缓存命中,直接返回结果;如果没有命中,执行下一步。
  2. 接下来,查询 L2 缓存,如果缓存命中,直接返回结果并回填 L1 缓存;如果没有命中,执行下一步。
  3. 最后,查询数据库,返回结果并依次回填 L2 缓存、L1 缓存。

多级缓存更新

对于 L1 缓存,如果有数据更新,只能删除并更新所在机器上的缓存,其他机器只能通过超时机制来刷新缓存。超时设定可以有两种策略:

  • 设置成写入后多少时间后过期
  • 设置成写入后多少时间刷新

对于 L2 缓存,如果有数据更新,其他机器立马可见。但是,也必须要设置超时时间,其时间应该比 L1 缓存的有效时间长。

为了解决进程内缓存不一致的问题,设计可以进一步优化:

img

通过消息队列的发布、订阅机制,可以通知其他应用节点对进程内缓存进行更新。使用这种方案,即使消息队列服务挂了或不可靠,由于先执行了数据库更新,但进程内缓存过期,刷新缓存时,也能保证数据的最终一致性。

缓存问题

缓存雪崩

“缓存雪崩”是指,缓存不可用或者大量缓存由于超时时间相同在同一时间段失效,大量请求直接访问数据库,数据库压力过大导致系统雪崩

举例来说,对于系统 A,假设每天高峰期每秒 5000 个请求,本来缓存在高峰期可以扛住每秒 4000 个请求,但是缓存机器意外发生了全盘宕机。缓存挂了,此时 1 秒 5000 个请求全部落数据库,数据库必然扛不住,它会报一下警,然后就挂了。此时,如果没有采用什么特别的方案来处理这个故障,DBA 很着急,重启数据库,但是数据库立马又被新的流量给打死了。

解决缓存雪崩的主要手段如下:

  • 增加缓存系统可用性(事前)。例如:部署 Redis Cluster(主从+哨兵),以实现 Redis 的高可用,避免全盘崩溃。
  • 采用多级缓存方案(事中)。例如:本地缓存(Ehcache/Caffine/Guava Cache) + 分布式缓存(Redis/ Memcached)。
  • 限流、降级、熔断方案(事中),避免被流量打死。如:使用 Hystrix 进行熔断、降级。
  • 缓存如果支持持久化,可以在恢复工作后恢复数据(事后)。如:Redis 支持持久化,一旦重启,自动从磁盘上加载数据,快速恢复缓存数据。

上面的解决方案简单来说,就是多级缓存方案。系统收到一个查询请求,先查本地缓存,再查分布式缓存,最后查数据库,只要命中,立即返回。

解决缓存雪崩的辅助手段如下:

  • 监控缓存,弹性扩容
  • 缓存的过期时间可以取个随机值。这么做是为避免缓存同时失效,使得数据库 IO 骤升。比如:以前是设置 10 分钟的超时时间,那每个 Key 都可以随机 8-13 分钟过期,尽量让不同 Key 的过期时间不同。

缓存穿透

“缓存穿透”是指,查询的数据在数据库中不存在,那么缓存中自然也不存在。所以,应用在缓存中查不到,则会去查询数据库,当这样的请求多了后,数据库的压力就会增大。

解决缓存穿透,一般有两种方法:

(一)缓存空值

对于返回为 NULL 的依然缓存,对于抛出异常的返回不进行缓存

img

采用这种手段的会增加我们缓存的维护成本,需要在插入缓存的时候删除这个空缓存,当然我们可以通过设置较短的超时时间来解决这个问题。

(二)过滤不可能存在的数据

img

制定一些规则过滤一些不可能存在的数据。可以使用布隆过滤器(针对二进制操作的数据结构,所以性能高),比如你的订单 ID 明显是在一个范围 1-1000,如果不是 1-1000 之内的数据那其实可以直接给过滤掉。

针对于一些恶意攻击,攻击带过来的大量 key 是不存在的,那么我们采用第一种方案就会缓存大量不存在 key 的数据。

此时我们采用第一种方案就不合适了,我们完全可以先对使用第二种方案进行过滤掉这些 key。

针对这种 key 异常多、请求重复率比较低的数据,我们就没有必要进行缓存,使用第二种方案直接过滤掉。

而对于空数据的 key 有限的,重复率比较高的,我们则可以采用第一种方式进行缓存。

缓存击穿

“缓存击穿”是指,热点缓存数据失效瞬间,大量请求直接访问数据库。例如,某些 key 是热点数据,访问非常频繁。如果某个 key 失效的瞬间,大量的请求过来,缓存未命中,然后去数据库访问,此时数据库访问量会急剧增加。

为了避免这个问题,我们可以采取下面的两个手段:

  • 分布式锁 - 锁住热点数据的 key,避免大量线程同时访问同一个 key。
  • 定时异步刷新 - 可以对部分数据采取失效前自动刷新的策略,而不是到期自动淘汰。淘汰其实也是为了数据的时效性,所以采用自动刷新也可以。

小结

上面逐一介绍了缓存使用中常见的问题。这里,从发生时间段的角度整体归纳一下缓存问题解决方案。

  • 事前:Redis 高可用方案(Redis Cluster + 主从 + 哨兵),避免缓存全面崩溃。
  • 事中:(一)采用多级缓存方案,本地缓存(Ehcache/Caffine/Guava Cache) + 分布式缓存(Redis/ Memcached)。(二)限流 + 熔断 + 降级(Hystrix),避免极端情况下,数据库被打死。
  • 事后:Redis 持久化(RDB+AOF),一旦重启,自动从磁盘上加载数据,快速恢复缓存数据。

分布式缓存 Memcached ,由于数据类型不如 Redis 丰富,并且不支持持久化、容灾。所以,一般会选择 Redis 做分布式缓存。

缓存策略

缓存预热

缓存预热是指系统启动后,直接查询热点数据并缓存。这样就可以避免用户请求的时候,先查询数据库,然后再更新缓存的问题。

解决方案:

  • 手动刷新缓存:直接写个缓存刷新页面,上线时手工操作下。
  • 应用启动时刷新缓存:数据量不大,可以在项目启动的时候自动进行加载。
  • 定时异步刷新缓存

如何缓存

不过期缓存

缓存更新模式:

  1. 开启事务
  2. 写 SQL
  3. 提交事务
  4. 写缓存

不要把写缓存操作放在事务中,尤其是写分布式缓存。因为网络抖动可能导致写缓存响应时间很慢,引起数据库事务阻塞。如果对缓存数据一致性要求不是那么高,数据量也不是很大,可以考虑定期全量同步缓存。

这种模式存在这样的情况:存在事务成功,但缓存写失败的可能。但这种情况相对于上面的问题,影响较小。

过期缓存

采用懒加载。对于热点数据,可以设置较短的缓存时间,并定期异步加载。

缓存更新

一般来说,系统如果不是严格要求缓存和数据库保持一致性的话,尽量不要将读请求和写请求串行化。串行化可以保证一定不会出现数据不一致的情况,但是它会导致系统的吞吐量大幅度下降。

缓存更新的策略有几种模式:

  • Cache Aside
  • Read/Write Through

需要注意的是:以上几种缓存更新策略,都无法保证数据强一致。如果一定要保证强一致性,可以通过两阶段提交(2PC)或 Paxos 协议来实现。但是 2PC 太慢,而 Paxos 太复杂,所以如果不是非常重要的数据,不建议使用强一致性方案。

Cache Aside

Cache Aside 应该是最常见的缓存更新策略了。

Cache Aside 的思路是:先更新数据库,再删除缓存。具体来说:

  • 失效:尝试读缓存,如果不命中,则读数据库,然后更新缓存。

  • 命中:尝试读缓存,命中则直接返回数据。

  • 更新:先更新数据库,再删除缓存。

为什么不能先更新数据库,再更新缓存?

多个并发的写操作可能导致脏数据:当有多个并发的写请求时,无法保证更新数据库的顺序和更新缓存的顺序一致,从而导致数据库和缓存数据不一致的问题。

说明:如上图的场景中,两个写线程由于执行顺序,导致数据库中 val = 2,而缓存中 val = 1,数据不一致。

为什么不能先删缓存,再更新数据库?

存在并发读请求和写请求时,可能导致脏数据

说明:如上图的场景中,读线程和写线程并行执行,导致数据库中 val = 2,而缓存中 val = 1,数据不一致。

先更新数据库,再删除缓存就没问题了吗

存在并发读请求和写请求时,可能导致脏数据

上图中问题发生的概率非常低:因为通常数据库更新操作比内存操作耗时多出几个数量级,最后一步回写缓存速度非常快,通常会在更新数据库之前完成。所以 Cache Aside 模式选择先更新数据库,再删除缓存,而不是先删缓存,再更新数据库。

不过,如果真的出现了这种场景,为了避免缓存中一直保留着脏数据,可以为缓存设置过期时间,过期后缓存自动失效。通常,业务系统中允许少量数据短时间出现不一致的情况。

Read/Write Through

Read Through 的思路是:查询时更新缓存。当缓存失效时,缓存服务自己进行加载。

Write Through 的思路是:当数据更新时,缓存服务负责更新缓存。

Through vs. Cache Aside

Read Through vs. Cache Aside

  • Cache Aside 模式中,应用需要维护两个数据源头:一个是缓存,一个是数据库。
  • Read-Through 模式中,应用无需管理缓存和数据库,只需要将数据库的同步委托给缓存服务即可。

Write behind

Write Behind 又叫 Write Back。Write Behind 的思路是:应用更新数据时,只更新缓存, 缓存服务每隔一段时间将缓存数据批量更新到数据库中,即延迟写入。这个设计的好处就是让提高 I/O 效率,因为异步,Write Behind 还可以合并对同一个数据的多次操作,所以性能的提高是相当可观的。

更详细的分析可以参考:分布式之数据库和缓存双写一致性方案解析

总结

最后,通过一张思维导图来总结一下本文所述的知识点,帮助大家对缓存有一个系统性的认识。

img

参考资料

分布式事务基本原理

分布式事务指的是事务操作跨越多个节点,并且要求满足事务的 ACID 特性。

分布式事务简介

ACID

ACID 是数据库事务正确执行的四个基本要素。

  • 原子性(Atomicity)
    • 事务被视为不可分割的最小单元,事务中的所有操作要么全部提交成功,要么全部失败回滚。
    • 回滚可以用日志来实现,日志记录着事务所执行的修改操作,在回滚时反向执行这些修改操作即可。
  • 一致性(Consistency)
    • 数据库在事务执行前后都保持一致性状态。
    • 在一致性状态下,所有事务对一个数据的读取结果都是相同的。
  • 隔离性(Isolation)
    • 一个事务所做的修改在最终提交以前,对其它事务是不可见的。
  • 持久性(Durability)
    • 一旦事务提交,则其所做的修改将会永远保存到数据库中。即使系统发生崩溃,事务执行的结果也不能丢失。
    • 可以通过数据库备份和恢复来实现,在系统发生奔溃时,使用备份的数据库进行数据恢复。

一个支持事务(Transaction)中的数据库系统,必需要具有这四种特性,否则在事务过程(Transaction processing)当中无法保证数据的正确性。

  • 只有满足一致性,事务的执行结果才是正确的。
  • 在无并发的情况下,事务串行执行,隔离性一定能够满足。此时只要能满足原子性,就一定能满足一致性。
  • 在并发的情况下,多个事务并行执行,事务不仅要满足原子性,还需要满足隔离性,才能满足一致性。
  • 事务满足持久化是为了能应对系统崩溃的情况。

本地事务和分布式事务

学习分布式之前,先了解一下本地事务的概念。

事务指的是满足 ACID 特性的一组操作。事务内的 SQL 语句,要么全执行成功,要么全执行失败。可以通过 Commit 提交一个事务,也可以使用 Rollback 进行回滚。

分布式事务指的是事务操作跨越多个节点,并且要求满足事务的 ACID 特性。

随着互联网快速发展,微服务,SOA 等服务架构模式正在被大规模的使用,现在分布式系统一般由多个独立的子系统组成,多个子系统通过网络通信互相协作配合完成各个功能。

有很多用例会跨多个子系统才能完成,比较典型的是电子商务网站的下单支付流程,至少会涉及交易系统和支付系统,而且这个过程中会涉及到事务的概念,即保证交易系统和支付系统的数据一致性,此处我们称这种跨系统的事务为分布式事务,具体一点而言,分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。

举个互联网常用的交易业务为例:

上图中包含了库存和订单两个独立的微服务,每个微服务维护了自己的数据库。在交易系统的业务逻辑中,一个商品在下单之前需要先调用库存服务,进行扣除库存,再调用订单服务,创建订单记录。

可以看到,如果多个数据库之间的数据更新没有保证事务,将会导致出现子系统数据不一致,业务出现问题。

分布式事务的难点:

  • 事务的原子性:事务操作跨不同节点,当多个节点某一节点操作失败时,需要保证多节点操作的都做或都不做(All or Nothing)的原子性。
  • 事务的一致性:当发生网络传输故障或者节点故障,节点间数据复制通道中断,在进行事务操作时需要保证数据一致性,保证事务的任何操作都不会使得数据违反数据库定义的约束、触发器等规则。
  • 事务的隔离性:事务隔离性的本质就是如何正确多个并发事务的处理的读写冲突和写写冲突,因为在分布式事务控制中,可能会出现提交不同步的现象,这个时候就有可能出现“部分已经提交”的事务。此时并发应用访问数据如果没有加以控制,有可能出现“脏读”问题。

CAP 和 BASE

CAP 定理又称为 CAP 原则,指的是:在一个分布式系统中, 一致性(C:Consistency)可用性(A:Availability)分区容忍性(P:Partition Tolerance),最多只能同时满足其中两项

BASE 是 基本可用(Basically Available)软状态(Soft State)最终一致性(Eventually Consistent) 三个短语的缩写。BASE 理论是对 CAP 中一致性和可用性权衡的结果,它的理论的核心思想是:即使无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式来使系统达到最终一致性。

柔性事务

在电商等互联网场景下,传统的事务在数据库性能和处理能力上都暴露出了瓶颈。在分布式领域基于 CAP 理论以及 BASE 理论,有人就提出了柔性事务的概念。

柔性事务是指:在不影响系统整体可用性的情况下(Basically Available 基本可用),允许系统存在数据不一致的中间状态(Soft State 软状态),在经过数据同步的延时之后,最终数据能够达到一致。并不是完全放弃了 ACID,而是通过放宽一致性要求,借助本地事务来实现最终分布式事务一致性的同时也保证系统的吞吐

下面介绍的是实现柔性事务的一些常见特性,这些特性在具体的方案中不一定都要满足,因为不同的方案要求不一样。

  • 可见性(对外可查询):在分布式事务执行过程中,如果某一个步骤执行出错,就需要明确的知道其他几个操作的处理情况,这就需要其他的服务都能够提供查询接口,保证可以通过查询来判断操作的处理情况。为了保证操作的可查询,需要对于每一个服务的每一次调用都有一个全局唯一的标识,可以是业务单据号(如订单号)、也可以是系统分配的操作流水号(如支付记录流水号)。除此之外,操作的时间信息也要有完整的记录。
  • 操作幂等性:幂等性,其实是一个数学概念。幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。也就是说,同一个方法,使用同样的参数,调用多次产生的业务结果与调用一次产生的业务结果相同。之所以需要操作幂等性,是因为为了保证数据的最终一致性,很多事务协议都会有很多重试的操作,如果一个方法不保证幂等,那么将无法被重试。幂等操作的实现方式有多种,如在系统中缓存所有的请求与处理结果、检测到重复操作后,直接返回上一次的处理结果等。

两阶段提交(2PC)

方案简介

二阶段提交协议(Two-phase Commit,即 2PC)是常用的分布式事务解决方案,即将事务的提交过程分为两个阶段来进行处理:准备阶段和提交阶段。事务的发起者称协调者,事务的执行者称参与者。

在分布式系统里,每个节点都可以知晓自己操作的成功或者失败,却无法知道其他节点操作的成功或失败。当一个事务跨多个节点时,为了保持事务的原子性与一致性,而引入一个协调者来统一掌控所有参与者的操作结果,并指示它们是否要把操作结果进行真正的提交或者回滚(rollback)。

二阶段提交的思路可以概括为:参与者将操作成败通知协调者,再由协调者根据所有参与者的反馈情报决定各参与者是否要提交操作还是中止操作

核心思想就是对每一个事务都采用先尝试后提交的处理方式,处理后所有的读操作都要能获得最新的数据,因此也可以将二阶段提交看作是一个强一致性算法。

处理流程

简单一点理解,可以把协调者节点比喻为带头大哥,参与者理解比喻为跟班小弟,带头大哥统一协调跟班小弟的任务执行。

阶段 1:准备阶段

  1. 协调者向所有参与者发送事务内容,询问是否可以提交事务,并等待所有参与者答复。
  2. 各参与者执行事务操作,将 undo 和 redo 信息记入事务日志中(但不提交事务)。
  3. 如参与者执行成功,给协调者反馈 yes,即可以提交;如执行失败,给协调者反馈 no,即不可提交。

阶段 2:提交阶段

如果协调者收到了参与者的失败消息或者超时,直接给每个参与者发送回滚(rollback)消息;否则,发送提交(commit)消息;参与者根据协调者的指令执行提交或者回滚操作,释放所有事务处理过程中使用的锁资源。(注意:必须在最后阶段释放锁资源) 接下来分两种情况分别讨论提交阶段的过程。

情况 1,当所有参与者均反馈 yes,提交事务

  1. 协调者向所有参与者发出正式提交事务的请求(即 commit 请求)。
  2. 参与者执行 commit 请求,并释放整个事务期间占用的资源。
  3. 各参与者向协调者反馈 ack(应答)完成的消息。
  4. 协调者收到所有参与者反馈的 ack 消息后,即完成事务提交。

情况 2,当任何阶段 1 一个参与者反馈 no,中断事务

  1. 协调者向所有参与者发出回滚请求(即 rollback 请求)。
  2. 参与者使用阶段 1 中的 undo 信息执行回滚操作,并释放整个事务期间占用的资源。
  3. 各参与者向协调者反馈 ack 完成的消息。
  4. 协调者收到所有参与者反馈的 ack 消息后,即完成事务中断。

方案总结

2PC 方案实现起来简单,实际项目中使用比较少,主要因为以下问题:

  • 性能问题 - 所有参与者在事务提交阶段处于同步阻塞状态,占用系统资源,容易导致性能瓶颈。
  • 可靠性问题 - 如果协调者存在单点故障问题,如果协调者出现故障,参与者将一直处于锁定状态。
  • 数据一致性问题 - 在阶段 2 中,如果发生局部网络问题,一部分事务参与者收到了提交消息,另一部分事务参与者没收到提交消息,那么就导致了节点之间数据的不一致。

三阶段提交(3PC)

方案简介

三阶段提交协议(Three-phase Commit,3PC),是二阶段提交协议的改进版本,与二阶段提交不同的是,引入超时机制。同时在协调者和参与者中都引入超时机制。

三阶段提交将二阶段的准备阶段拆分为 2 个阶段,插入了一个 preCommit 阶段,使得原先在二阶段提交中,参与者在准备之后,由于协调者发生崩溃或错误,而导致参与者处于无法知晓是否提交或者中止的“不确定状态”所产生的可能相当长的延时的问题得以解决。

处理流程

阶段 1:canCommit

协调者向参与者发送 commit 请求,参与者如果可以提交就返回 yes 响应(参与者不执行事务操作),否则返回 no 响应:

  1. 协调者向所有参与者发出包含事务内容的 canCommit 请求,询问是否可以提交事务,并等待所有参与者答复。
  2. 参与者收到 canCommit 请求后,如果认为可以执行事务操作,则反馈 yes 并进入预备状态,否则反馈 no。

阶段 2:preCommit

协调者根据阶段 1 canCommit 参与者的反应情况来决定是否可以基于事务的 preCommit 操作。根据响应情况,有以下两种可能。

情况 1:阶段 1 所有参与者均反馈 yes,参与者预执行事务

  1. 协调者向所有参与者发出 preCommit 请求,进入准备阶段。
  2. 参与者收到 preCommit 请求后,执行事务操作,将 undo 和 redo 信息记入事务日志中(但不提交事务)。
  3. 各参与者向协调者反馈 ack 响应或 no 响应,并等待最终指令。

情况 2:阶段 1 任何一个参与者反馈 no,或者等待超时后协调者尚无法收到所有参与者的反馈,即中断事务

  1. 协调者向所有参与者发出 abort 请求。
  2. 无论收到协调者发出的 abort 请求,或者在等待协调者请求过程中出现超时,参与者均会中断事务。

阶段 3:doCommit

该阶段进行真正的事务提交,也可以分为以下两种情况:

情况 1:阶段 2 所有参与者均反馈 ack 响应,执行真正的事务提交

  1. 如果协调者处于工作状态,则向所有参与者发出 do Commit 请求。
  2. 参与者收到 do Commit 请求后,会正式执行事务提交,并释放整个事务期间占用的资源。
  3. 各参与者向协调者反馈 ack 完成的消息。
  4. 协调者收到所有参与者反馈的 ack 消息后,即完成事务提交。

情况 2:任何一个参与者反馈 no,或者等待超时后协调者尚无法收到所有参与者的反馈,即中断事务

  1. 如果协调者处于工作状态,向所有参与者发出 abort 请求。
  2. 参与者使用阶段 1 中的 undo 信息执行回滚操作,并释放整个事务期间占用的资源。
  3. 各参与者向协调者反馈 ack 完成的消息。
  4. 协调者收到所有参与者反馈的 ack 消息后,即完成事务中断。

注意:进入阶段 3 后,无论协调者出现问题,或者协调者与参与者网络出现问题,都会导致参与者无法接收到协调者发出的 do Commit 请求或 abort 请求。此时,参与者都会在等待超时之后,继续执行事务提交。

方案总结

优点:

  • 相比二阶段提交,三阶段降低了阻塞范围,在等待超时后协调者或参与者会中断事务。避免了协调者单点问题,阶段 3 中协调者出现问题时,参与者会继续提交事务。

缺点:

  • 数据不一致问题依然存在,当在参与者收到 preCommit 请求后等待 do commite 指令时,此时如果协调者请求中断事务,而协调者无法与参与者正常通信,会导致参与者继续提交事务,造成数据不一致。

补偿事务(TCC)

方案简介

TCC(Try-Confirm-Cancel)的概念,最早是由 Pat Helland 于 2007 年发表的一篇名为《Life beyond Distributed Transactions:an Apostate’s Opinion》的论文提出。

TCC 是服务化的二阶段编程模型,其 Try、Confirm、Cancel 3 个方法均由业务编码实现;

  • Try - 操作作为一阶段,负责资源的检查和预留。
  • Confirm - 操作作为二阶段提交操作,执行真正的业务。
  • Cancel - 是预留资源的取消。

TCC 事务的 Try、Confirm、Cancel 可以理解为 SQL 事务中的 Lock、Commit、Rollback。

处理流程

为了方便理解,下面以电商下单为例进行方案解析,这里把整个过程简单分为扣减库存,订单创建 2 个步骤,库存服务和订单服务分别在不同的服务器节点上。

Try 阶段

从执行阶段来看,与传统事务机制中业务逻辑相同。但从业务角度来看,却不一样。TCC 机制中的 Try 仅是一个初步操作,它和后续的确认一起才能真正构成一个完整的业务逻辑,这个阶段主要完成:

  • 完成所有业务检查( 一致性 )
  • 预留必须业务资源( 准隔离性 )
  • Try 尝试执行业务 TCC 事务机制以初步操作(Try)为中心的,确认操作(Confirm)和取消操作(Cancel)都是围绕初步操作(Try)而展开。因此,Try 阶段中的操作,其保障性是最好的,即使失败,仍然有取消操作(Cancel)可以将其执行结果撤销。

假设商品库存为 100,购买数量为 2,这里检查和更新库存的同时,冻结用户购买数量的库存,同时创建订单,订单状态为待确认。

Confirm / Cancel 阶段

根据 Try 阶段服务是否全部正常执行,继续执行确认操作(Confirm)或取消操作(Cancel)。 Confirm 和 Cancel 操作满足幂等性,如果 Confirm 或 Cancel 操作执行失败,将会不断重试直到执行完成。

Confirm:当 Try 阶段服务全部正常执行, 执行确认业务逻辑操作

这里使用的资源一定是 Try 阶段预留的业务资源。在 TCC 事务机制中认为,如果在 Try 阶段能正常的预留资源,那 Confirm 一定能完整正确的提交。Confirm 阶段也可以看成是对 Try 阶段的一个补充,Try+Confirm 一起组成了一个完整的业务逻辑。

Cancel:当 Try 阶段存在服务执行失败, 进入 Cancel 阶段

Cancel 取消执行,释放 Try 阶段预留的业务资源,上面的例子中,Cancel 操作会把冻结的库存释放,并更新订单状态为取消。

方案总结

TCC 事务机制相对于传统事务机制(X/Open XA),TCC 事务机制相比于上面介绍的 XA 事务机制,有以下优点:

  • 性能提升 - 具体业务来实现控制资源锁的粒度变小,不会锁定整个资源。
  • 数据最终一致性 - 基于 Confirm 和 Cancel 的幂等性,保证事务最终完成确认或者取消,保证数据的一致性。
  • 可靠性 - 解决了 XA 协议的协调者单点故障问题,由主业务方发起并控制整个业务活动,业务活动管理器也变成多点,引入集群。

缺点: TCC 的 Try、Confirm 和 Cancel 操作功能要按具体业务来实现,业务耦合度较高,提高了开发成本。

本地消息表

方案简介

本地消息表的方案最初是由 ebay 提出,核心思路是将分布式事务拆分成本地事务进行处理。

方案通过在事务主动发起方额外新建事务消息表,事务发起方处理业务和记录事务消息在本地事务中完成,轮询事务消息表的数据发送事务消息,事务被动方基于消息中间件消费事务消息表中的事务。

这样设计可以避免”业务处理成功 + 事务消息发送失败“,或”业务处理失败 + 事务消息发送成功“的棘手情况出现,保证 2 个系统事务的数据一致性。

处理流程

下面把分布式事务最先开始处理的事务方成为事务主动方,在事务主动方之后处理的业务内的其他事务成为事务被动方。

为了方便理解,下面继续以电商下单为例进行方案解析,这里把整个过程简单分为扣减库存,订单创建 2 个步骤,库存服务和订单服务分别在不同的服务器节点上,其中库存服务是事务主动方,订单服务是事务被动方。

事务的主动方需要额外新建事务消息表,用于记录分布式事务的消息的发生、处理状态。

整个业务处理流程如下:

  1. 步骤 1 事务主动方处理本地事务。 事务主动发在本地事务中处理业务更新操作和写消息表操作。 上面例子中库存服务阶段再本地事务中完成扣减库存和写消息表(图中 1、2)。
  2. 步骤 2 事务主动方通过 MQ 通知事务被动方处理事务。 消息中间件可以基于 Kafka、RocketMQ 消息队列,事务主动方法主动写消息到消息队列,事务消费方消费并处理消息队列中的消息。 上面例子中,库存服务把事务待处理消息写到消息中间件,订单服务消费消息中间件的消息,完成新增订单(图中 3 - 5)。
  3. 步骤 3 事务被动方通过 MQ 反会处理结果。 上面例子中,订单服务把事务已处理消息写到消息中间件,库存服务消费中间件的消息,并将事务消息的状态更新为已完成(图中 6 - 8)

为了数据的一致性,当处理错误需要重试,事务发送方和事务接收方相关业务处理需要支持幂等。具体保存一致性的容错处理如下:

  • 当步骤 1 处理出错,事务回滚,相当于什么都没发生。
  • 当步骤 2、步骤 3 处理出错,由于未处理的事务消息还是保存在事务发送方,事务发送方可以定时轮询为超时消息数据,再次发送的消息中间件进行处理。事务被动方消费事务消息重试处理。
  • 如果是业务上的失败,事务被动方可以发消息给事务主动方进行回滚。
  • 如果多个事务被动方已经消费消息,事务主动方需要回滚事务时需要通知事务被动方回滚。

方案总结

方案的优点如下:

  • 从应用设计开发的角度实现了消息数据的可靠性,消息数据的可靠性不依赖于消息中间件,弱化了对 MQ 中间件特性的依赖。
  • 方案轻量,容易实现。

缺点如下:

  • 与具体的业务场景绑定,耦合性强,不可复用。
  • 消息数据与业务数据同库,占用业务系统资源。
  • 业务系统在使用关系型数据库的情况下,消息服务性能会受到关系型数据库并发性能的局限。

MQ 事务

MQ 事务方案本质是利用 MQ 功能实现的本地消息表。事务消息需要消息队列提供相应的功能才能实现,Kafka 和 RocketMQ 都提供了事务相关功能。

  • Kafka 的解决方案是:直接抛出异常,让用户自行处理。用户可以在业务代码中反复重试提交,直到提交成功,或者删除之前修改的数据记录进行事务补偿。
  • RocketMQ 的解决方案是:通过事务反查机制来解决事务消息提交失败的问题。如果 Producer 在提交或者回滚事务消息时发生网络异常,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。为了支撑这个事务反查机制,业务代码需要实现一个反查本地事务状态的接口,告知 RocketMQ 本地事务是成功还是失败。

RocketMQ 事务消息实现

事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

事务消息处理流程

事务消息交互流程如下图所示。

  1. 生产者将消息发送至 Apache RocketMQ 服务端。
  2. Apache RocketMQ 服务端将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息被标记为”暂不能投递”,这种状态下的消息即为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit 或是 Rollback),服务端收到确认结果后处理逻辑如下:
    • 二次确认结果为 Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为 Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为 Unknown 未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。 说明 服务端回查的间隔时间和最大回查次数,请参见参数限制
  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行处理。

事务消息生命周期 事务消息

  • 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。
  • 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。
  • 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。
  • 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。
  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ 会对消息进行重试处理。具体信息,请参见消费重试
  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
  • 消息删除:Apache RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制

MQ 事务方案总结

相比本地消息表方案,MQ 事务方案优点是:

  • 消息数据独立存储 ,降低业务系统与消息系统之间的耦合。
  • 吞吐量优于使用本地消息表方案。

缺点是:

  • 一次消息发送需要两次网络请求(half 消息 + commit/rollback 消息)
  • 业务处理服务需要实现消息状态回查接口

SAGA

方案简介

Saga 事务源于 1987 年普林斯顿大学的 Hecto 和 Kenneth 发表的如何处理 long lived transaction(长活事务)论文,Saga 事务核心思想是将长事务拆分为多个本地短事务,由 Saga 事务协调器协调,如果正常结束那就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。

处理流程

Saga 事务基本协议如下

  • 每个 Saga 事务由一系列幂等的有序子事务(sub-transaction) Ti 组成。
  • 每个 Ti 都有对应的幂等补偿动作 Ci,补偿动作用于撤销 Ti 造成的结果。

可以看到,和 TCC 相比,Saga 没有“预留”动作,它的 Ti 就是直接提交到库。

下面以下单流程为例,整个操作包括:创建订单、扣减库存、支付、增加积分 Saga 的执行顺序有两种:

Saga事务执行顺序

  • 事务正常执行完成 T1, T2, T3, …, Tn,例如:扣减库存(T1),创建订单(T2),支付(T3),依次有序完成整个事务。
  • 事务回滚 T1, T2, …, Tj, Cj,…, C2, C1,其中 0 < j < n,例如:扣减库存(T1),创建订单(T2),支付(T3,支付失败),支付回滚(C3),订单回滚(C2),恢复库存(C1)。

恢复策略

Saga 定义了两种恢复策略:

  • 向前恢复(forward recovery)

Saga事务向前恢复

对应于上面第一种执行顺序,适用于必须要成功的场景,发生失败进行重试,执行顺序是类似于这样的:T1, T2, …, Tj(失败), Tj(重试),…, Tn,其中 j 是发生错误的子事务(sub-transaction)。该情况下不需要 Ci。

  • 向后恢复(backward recovery)

Saga事务向后恢复

对应于上面提到的第二种执行顺序,其中 j 是发生错误的子事务(sub-transaction),这种做法的效果是撤销掉之前所有成功的子事务,使得整个 Saga 的执行结果撤销。

Saga 事务常见的有两种不同的实现方式:命令协调和事件编排。

命令协调

  • 命令协调(Order Orchestrator):中央协调器负责集中处理事件的决策和业务逻辑排序。

中央协调器(Orchestrator,简称 OSO)以命令/回复的方式与每项服务进行通信,全权负责告诉每个参与者该做什么以及什么时候该做什么。

命令协调模式

以电商订单的例子为例:

  1. 事务发起方的主业务逻辑请求 OSO 服务开启订单事务。
  2. OSO 向库存服务请求扣减库存,库存服务回复处理结果。
  3. OSO 向订单服务请求创建订单,订单服务回复创建结果。
  4. OSO 向支付服务请求支付,支付服务回复处理结果。
  5. 主业务逻辑接收并处理 OSO 事务处理结果回复。

中央协调器必须事先知道执行整个订单事务所需的流程(例如通过读取配置)。如果有任何失败,它还负责通过向每个参与者发送命令来撤销之前的操作来协调分布式的回滚。基于中央协调器协调一切时,回滚要容易得多,因为协调器默认是执行正向流程,回滚时只要执行反向流程即可。

事件编排

  • 事件编排 (Event Choreography0:没有中央协调器(没有单点风险)时,每个服务产生并观察其他服务的事件,并决定是否应采取行动

在事件编排方法中,第一个服务执行一个事务,然后发布一个事件。该事件被一个或多个服务进行监听,这些服务再执行本地事务并发布(或不发布)新的事件。

当最后一个服务执行本地事务并且不发布任何事件时,意味着分布式事务结束,或者它发布的事件没有被任何 Saga 参与者听到都意味着事务结束。

以电商订单的例子为例:

事件编排模式

  1. 事务发起方的主业务逻辑发布开始订单事件
  2. 库存服务监听开始订单事件,扣减库存,并发布库存已扣减事件
  3. 订单服务监听库存已扣减事件,创建订单,并发布订单已创建事件
  4. 支付服务监听订单已创建事件,进行支付,并发布订单已支付事件
  5. 主业务逻辑监听订单已支付事件并处理。

事件/编排是实现 Saga 模式的自然方式,它很简单,容易理解,不需要太多的代码来构建。如果事务涉及 2 至 4 个步骤,则可能是非常合适的。

方案总结

命令协调设计的优点和缺点:

优点如下:

  • 服务之间关系简单,避免服务之间的循环依赖关系,因为 Saga 协调器会调用 Saga 参与者,但参与者不会调用协调器
  • 程序开发简单,只需要执行命令/回复(其实回复消息也是一种事件消息),降低参与者的复杂性。
  • 易维护扩展,在添加新步骤时,事务复杂性保持线性,回滚更容易管理,更容易实施和测试

缺点如下:

  • 中央协调器容易处理逻辑容易过于复杂,导致难以维护。
  • 存在协调器单点故障风险。

事件/编排设计的优点和缺点

优点如下:

  • 避免中央协调器单点故障风险。
  • 当涉及的步骤较少服务开发简单,容易实现。

缺点如下:

  • 服务之间存在循环依赖的风险。
  • 当涉及的步骤较多,服务间关系混乱,难以追踪调测。

值得补充的是,由于 Saga 模型中没有 Prepare 阶段,因此事务间不能保证隔离性,当多个 Saga 事务操作同一资源时,就会产生更新丢失、脏数据读取等问题,这时需要在业务层控制并发,例如:在应用层面加锁,或者应用层面预先冻结资源。

总结

各方案使用场景

介绍完分布式事务相关理论和常见解决方案后,最终的目的在实际项目中运用,因此,总结一下各个方案的常见的使用场景。

方案比较

  • 2PC/3PC 依赖于数据库,能够很好的提供强一致性和强事务性,但相对来说延迟比较高,比较适合传统的单体应用,在同一个方法中存在跨库操作的情况,不适合高并发和高性能要求的场景。
  • TCC 适用于执行时间确定且较短,实时性要求高,对数据一致性要求高,比如互联网金融企业最核心的三个服务:交易、支付、账务。
  • 本地消息表/MQ 事务 都适用于事务中参与方支持操作幂等,对一致性要求不高,业务上能容忍数据不一致到一个人工检查周期,事务涉及的参与方、参与环节较少,业务上有对账/校验系统兜底。
  • Saga 事务 由于 Saga 事务不能保证隔离性,需要在业务层控制并发,适合于业务场景事务并发操作同一资源较少的情况。 Saga 相比缺少预提交动作,导致补偿动作的实现比较麻烦,例如业务是发送短信,补偿动作则得再发送一次短信说明撤销,用户体验比较差。Saga 事务较适用于补偿动作容易处理的场景。

分布式事务方案设计

本文介绍的偏向于原理,业界已经有不少开源的或者收费的解决方案,篇幅所限,就不再展开介绍。

实际运用理论时进行架构设计时,许多人容易犯“手里有了锤子,看什么都觉得像钉子”的错误,设计方案时考虑的问题场景过多,各种重试,各种补偿机制引入系统,导致设计出来的系统过于复杂,落地遥遥无期。

世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!—— 阿里中间件技术专家沈询

有些问题,看起来很重要,但实际上我们可以通过合理的设计或者将问题分解来规避。设计分布式事务系统也不是需要考虑所有异常情况,不必过度设计各种回滚,补偿机制。如果硬要把时间花在解决问题本身,实际上不仅效率低下,而且也是一种浪费。

如果系统要实现回滚流程的话,有可能系统复杂度将大大提升,且很容易出现 Bug,估计出现 Bug 的概率会比需要事务回滚的概率大很多。在设计系统时,我们需要衡量是否值得花这么大的代价来解决这样一个出现概率非常小的问题,可以考虑当出现这个概率很小的问题,能否采用人工解决的方式,这也是大家在解决疑难问题时需要多多思考的地方。

参考资料

分布式会话基本原理

由于 Http 是一种无状态的协议,服务器单单从网络连接上无从知道客户身份。

会话跟踪是 Web 程序中常用的技术,用来跟踪用户的整个会话。常用会话跟踪技术是 Cookie 与 Session。

由于 Http 是一种无状态的协议,服务器单从网络连接上无从知道客户身份。

所以服务器与浏览器为了进行会话跟踪(知道是谁在访问我),就必须主动的去维护一个状态,这个状态用于告知服务端前后两个请求是否来自同一浏览器。而这个状态需要通过 cookie 或者 session 去实现。

Cookie 实际上是存储在用户浏览器上的文本信息,并保留了各种跟踪的信息。

一个简单的 cookie 设置如下:

1
Set-Cookie: <cookie-name>=<cookie-value>
1
2
3
4
5
6
HTTP/2.0 200 OK
Content-Type: text/html
Set-Cookie: yummy_cookie=choco
Set-Cookie: tasty_cookie=strawberry

[page content]
  1. 浏览器请求服务器,如果服务器需要记录该用户的状态,就是用 response 向浏览器颁发一个 Cookie。
  2. 浏览器会把 Cookie 保存下来。
  3. 当浏览器再请求该网站时,浏览器把该请求的网址连同 Cookie 一同提交给服务器。服务器检查该 Cookie,以此来辨认用户状态。

Cookie 主要用于以下三个方面:

  • 会话状态管理(如用户登录状态、购物车、游戏分数或其它需要记录的信息)
  • 个性化设置(如用户自定义设置、主题等)
  • 浏览器行为跟踪(如跟踪分析用户行为等)

注:Cookie 功能需要浏览器的支持,如果浏览器不支持 Cookie 或者 Cookie 禁用了,Cookie 功能就会失效。

属性 说明
name=value 键值对,设置 Cookie 的名称及相对应的值,都必须是字符串类型 - 如果值为 Unicode 字符,需要为字符编码。 - 如果值为二进制数据,则需要使用 BASE64 编码。
domain 指定 cookie 所属域名,默认是当前域名
path **指定 cookie 在哪个路径(路由)下生效,默认是 ‘/‘**。 如果设置为 /abc,则只有 /abc 下的路由可以访问到该 cookie,如:/abc/read
maxAge cookie 失效的时间,单位秒。如果为整数,则该 cookie 在 maxAge 秒后失效。如果为负数,该 cookie 为临时 cookie ,关闭浏览器即失效,浏览器也不会以任何形式保存该 cookie 。如果为 0,表示删除该 cookie 。默认为 -1。 - 比 expires 好用
expires 过期时间,在设置的某个时间点后该 cookie 就会失效。 一般浏览器的 cookie 都是默认储存的,当关闭浏览器结束这个会话的时候,这个 cookie 也就会被删除
secure 该 cookie 是否仅被使用安全协议传输。安全协议有 HTTPS,SSL 等,在网络上传输数据之前先将数据加密。默认为 false。 当 secure 值为 true 时,cookie 在 HTTP 中是无效,在 HTTPS 中才有效。
httpOnly 如果给某个 cookie 设置了 httpOnly 属性,则无法通过 JS 脚本 读取到该 cookie 的信息,但还是能通过 Application 中手动修改 cookie,所以只是在一定程度上可以防止 XSS 攻击,不是绝对的安全

Session

什么是 Session

Session 代表着服务器和客户端一次会话的过程。Session 对象存储特定用户会话所需的属性及配置信息。这样,当用户在应用程序的 Web 页之间跳转时,存储在 Session 对象中的变量将不会丢失,而是在整个用户会话中一直存在下去。当客户端关闭会话,或者 Session 超时失效时会话结束。

  • session 是另一种记录服务器和客户端会话状态的机制
  • session 是基于 cookie 实现的,session 存储在服务器端,sessionId 会被存储到客户端的 cookie 中

session.png

Session 的工作步骤

  1. 用户第一次请求服务器的时候,服务器根据用户提交的相关信息,创建对应的 Session。
  2. 请求返回时将此 Session 的唯一标识信息 SessionID 返回给浏览器。
  3. 浏览器接收到服务器返回的 SessionID 信息后,会将此信息存入到 Cookie 中,同时 Cookie 记录此 SessionID 属于哪个域名。
  4. 当用户第二次访问服务器的时候,请求会自动判断此域名下是否存在 Cookie 信息,如果存在自动将 Cookie 信息也发送给服务端,服务端会从 Cookie 中获取 SessionID,再根据 SessionID 查找对应的 Session 信息,如果没有找到说明用户没有登录或者登录失效,如果找到 Session 证明用户已经登录可执行后面操作。

根据以上流程可知,SessionID 是连接 Cookie 和 Session 的一道桥梁,大部分系统也是根据此原理来验证用户登录状态。

Cookie 和 Session 的主要区别可以参考以下表格:

Cookie Session
作用范围 保存在客户端(浏览器) 保存在服务器端
隐私策略 存储在客户端,比较容易遭到非法获取 存储在服务端,安全性相对 Cookie 要好一些
存储方式 只能保存 ASCII 可以保存任意数据类型。
一般情况下我们可以在 Session 中保持一些常用变量信息,比如说 UserId 等。
存储大小 不能超过 4K 存储大小远高于 Cookie
生命周期 可设置为永久保存
比如我们经常使用的默认登录(记住我)功能
一般失效时间较短
客户端关闭或者 Session 超时都会失效。

既然服务端是根据 Cookie 中的信息判断用户是否登录,那么如果浏览器中禁止了 Cookie,如何保障整个机制的正常运转。

第一种方案,每次请求中都携带一个 SessionID 的参数,也可以 Post 的方式提交,也可以在请求的地址后面拼接 xxx?SessionID=123456...

第二种方案,Token 机制。Token 机制多用于 App 客户端和服务器交互的模式,也可以用于 Web 端做用户状态管理。

Token 的意思是“令牌”,是服务端生成的一串字符串,作为客户端进行请求的一个标识。Token 机制和 Cookie 和 Session 的使用机制比较类似。

当用户第一次登录后,服务器根据提交的用户信息生成一个 Token,响应时将 Token 返回给客户端,以后客户端只需带上这个 Token 前来请求数据即可,无需再次登录验证。

分布式 Session

在分布式场景下,一个用户的 Session 如果只存储在一个服务器上,那么当负载均衡器把用户的下一个请求转发到另一个服务器上,该服务器没有用户的 Session,就可能导致用户需要重新进行登录等操作。

分布式 Session 的几种实现策略:

  1. 粘性 session
  2. 应用服务器间的 session 复制共享
  3. 基于缓存的 session 共享 ✅

推荐:基于缓存的 session 共享

粘性 Session

粘性 Session(Sticky Sessions)需要配置负载均衡器,使得一个用户的所有请求都路由到一个服务器节点上,这样就可以把用户的 Session 存放在该服务器节点中。

缺点:当服务器节点宕机时,将丢失该服务器节点上的所有 Session

Session 复制共享

Session 复制共享(Session Replication)在服务器节点之间进行 Session 同步操作,这样的话用户可以访问任何一个服务器节点。

缺点:占用过多内存同步过程占用网络带宽以及服务器处理器时间

基于缓存的 session 共享

使用一个单独的存储服务器存储 Session 数据,可以存在 MySQL 数据库上,也可以存在 Redis 或者 Memcached 这种内存型数据库。

缺点:需要去实现存取 Session 的代码。

具体实现

JWT Token

使用 JWT Token 储存用户身份,然后再从数据库或者 cache 中获取其他的信息。这样无论请求分配到哪个服务器都无所谓。

tomcat + redis

这个其实还挺方便的,就是使用 session 的代码,跟以前一样,还是基于 tomcat 原生的 session 支持即可,然后就是用一个叫做 Tomcat RedisSessionManager 的东西,让所有我们部署的 tomcat 都将 session 数据存储到 redis 即可。

在 tomcat 的配置文件中配置:

1
2
3
4
5
6
7
<Valve className="com.orangefunction.tomcat.redissessions.RedisSessionHandlerValve" />

<Manager className="com.orangefunction.tomcat.redissessions.RedisSessionManager"
host="{redis.host}"
port="{redis.port}"
database="{redis.dbnum}"
maxInactiveInterval="60"/>

然后指定 redis 的 host 和 port 就 ok 了。

1
2
3
4
5
<Valve className="com.orangefunction.tomcat.redissessions.RedisSessionHandlerValve" />
<Manager className="com.orangefunction.tomcat.redissessions.RedisSessionManager"
sentinelMaster="mymaster"
sentinels="<sentinel1-ip>:26379,<sentinel2-ip>:26379,<sentinel3-ip>:26379"
maxInactiveInterval="60"/>

还可以用上面这种方式基于 redis 哨兵支持的 redis 高可用集群来保存 session 数据,都是 ok 的。

spring session + redis

上面那种 tomcat + redis 的方式好用,但是会严重依赖于 web 容器,不好将代码移植到其他 web 容器上去,尤其是你要是换了技术栈咋整?比如换成了 spring cloud 或者是 spring boot 之类的呢?

所以现在比较好的还是基于 Java 一站式解决方案,也就是 spring。人家 spring 基本上承包了大部分我们需要使用的框架,spirng cloud 做微服务,spring boot 做脚手架,所以用 sping session 是一个很好的选择。

在 pom.xml 中配置:

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
<version>1.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
</dependency>

在 spring 配置文件中配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<bean id="redisHttpSessionConfiguration"
class="org.springframework.session.data.redis.config.annotation.web.http.RedisHttpSessionConfiguration">
<property name="maxInactiveIntervalInSeconds" value="600"/>
</bean>

<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxTotal" value="100" />
<property name="maxIdle" value="10" />
</bean>

<bean id="jedisConnectionFactory"
class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" destroy-method="destroy">
<property name="hostName" value="${redis_hostname}"/>
<property name="port" value="${redis_port}"/>
<property name="password" value="${redis_pwd}" />
<property name="timeout" value="3000"/>
<property name="usePool" value="true"/>
<property name="poolConfig" ref="jedisPoolConfig"/>
</bean>

在 web.xml 中配置:

1
2
3
4
5
6
7
8
<filter>
<filter-name>springSessionRepositoryFilter</filter-name>
<filter-class>org.springframework.web.filter.DelegatingFilterProxy</filter-class>
</filter>
<filter-mapping>
<filter-name>springSessionRepositoryFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@RestController
@RequestMapping("/test")
public class TestController {

@RequestMapping("/putIntoSession")
public String putIntoSession(HttpServletRequest request, String username) {
request.getSession().setAttribute("name", "leo");
return "ok";
}

@RequestMapping("/getFromSession")
public String getFromSession(HttpServletRequest request, Model model){
String name = request.getSession().getAttribute("name");
return name;
}
}

上面的代码就是 ok 的,给 spring session 配置基于 redis 来存储 session 数据,然后配置了一个 spring session 的过滤器,这样的话,session 相关操作都会交给 spring session 来管了。接着在代码中,就用原生的 session 操作,就是直接基于 spring sesion 从 redis 中获取数据了。

实现分布式的会话有很多种方式,我说的只不过是比较常见的几种方式,tomcat + redis 早期比较常用,但是会重耦合到 tomcat 中;近些年,通过 spring session 来实现。

参考资料

分布式锁基本原理

在并发场景下,为了保证并发安全,我们常常要通过互斥(加锁)手段来保证数据同步安全。

JDK 虽然提供了大量锁工具,但是只能作用于单一 Java 进程,无法应用于分布式系统。为了解决这个问题,需要使用分布式锁。

分布式锁的解决方案大致有以下几种:

  • 基于数据库实现
  • 基于缓存(redis,memcached 等)实现
  • 基于 Zookeeper 实现 ✅

注:推荐基于 ZooKeeper 实现分布式锁,具体原因看完本文即可明了。

分布式锁思路

分布式锁的总体思路大同小异,仅在实现细节上有所不同。

分布式锁的主要思路如下:

  • 互斥、可重入 - 创建锁必须是唯一的,表现形式为向数据存储服务器或容器插入一个唯一的 key,一旦有一个线程插入这个 key,其他线程就不能再插入了。
    • 保证 key 唯一性的最简单的方式是使用 UUID。
    • 存储锁的重入次数,以及分布式环境下唯一的线程标识。举例来说,可以使用 json 存储结构化数据,为了保证唯一,可以考虑将 mac 地址(IP 地址、机器 ID)、Jvm 进程 ID(应用 ID、服务 ID)、线程 ID 拼接起来作为唯一标识。
      1
      {"count":1,"expireAt":147506817232,"jvmPid":22224,"mac":"28-D2-44-0E-0D-9A","threadId":14}
  • 避免死锁 - 数据库分布式锁和缓存分布式锁(Redis)的思路都是引入超时机制,即成功申请锁后,超过一定时间,锁失效(删除 key),原因在于它们无法感知申请锁的客户端节点状态。而 ZooKeeper 由于其 znode 以目录、文件形式组织,天然就存在物理空间隔离,只要 znode 存在,即表示客户端节点还在工作,所以不存在这种问题。
  • 容错 - 只要大部分 Redis 节点可用,客户端就能正常加锁。
  • 自旋重试 - 获取不到锁时,不要直接返回失败,而是支持一定的周期自旋重试,设置一个总的超时时间,当过了超时时间以后还没有获取到锁则返回失败。

数据库分布式锁

数据库分布式锁原理

(1)创建表

1
2
3
4
5
6
7
8
CREATE TABLE `methodLock` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
`desc` varchar(1024) NOT NULL DEFAULT '备注信息',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';

(2)获取锁

想要锁住某个方法时,执行以下 SQL:

1
insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)

因为我们对 method_name 做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。

成功插入则获取锁。

(3)释放锁

当方法执行完毕之后,想要释放锁的话,需要执行以下 Sql:

1
delete from methodLock where method_name ='method_name'

数据库分布式锁问题

  • 这把锁强依赖数据库的可用性。如果数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
  • 这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
  • 这把锁只能是非阻塞的,因为数据的 insert 操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
  • 这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。

解决办法:

  • 单点问题可以用多数据库实例,同时塞 N 个表,N/2+1 个成功就任务锁定成功
  • 写一个定时任务,隔一段时间清除一次过期的数据。
  • 写一个 while 循环,不断的重试插入,直到成功。
  • 在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。

数据库分布式锁小结

  • 优点: 直接借助数据库,容易理解。
  • 缺点: 会有各种各样的问题,在解决问题的过程中会使整个方案变得越来越复杂。操作数据库需要一定的开销,性能问题需要考虑。

Redis 分布式锁

相比于用数据库来实现分布式锁,基于缓存实现的分布式锁的性能会更好。目前有很多成熟的分布式产品,包括 Redis、memcache、Tair 等。这里以 Redis 举例。

Redis 分布式锁原理

这个分布式锁有 3 个重要的考量点:

  1. 互斥(只能有一个客户端获取锁)
  2. 不能死锁
  3. 容错(只要大部分 redis 节点创建了这把锁就可以)

对应的 Redis 指令如下:

  • setnx - setnx key val:当且仅当 key 不存在时,set 一个 key 为 val 的字符串,返回 1;若 key 存在,则什么都不做,返回 0。
  • expire - expire key timeout:为 key 设置一个超时时间,单位为 second,超过这个时间锁会自动释放,避免死锁。
  • delete - delete key:删除 key

注意:

不要将 setnxexpire 作为两个命令组合实现加锁,这样就无法保证原子性。如果客户端在 setnx 之后崩溃,那么将导致锁无法释放。正确的做法应是在 setnx 命令中指定 expire 时间。

Redis 分布式锁实现

(1)申请锁

1
SET resource_name my_random_value NX PX 30000

执行这个命令就 ok。

  • NX:表示只有 key 不存在的时候才会设置成功。(如果此时 redis 中存在这个 key,那么设置失败,返回 nil
  • PX 30000:意思是 30s 后锁自动释放。别人创建的时候如果发现已经有了就不能加锁了。

(2)释放锁

释放锁就是删除 key ,但是一般可以用 lua 脚本删除,判断 value 一样才删除:

1
2
3
4
5
6
-- 删除锁的时候,找到 key 对应的 value,跟自己传过去的 value 做比较,如果是一样的才删除。
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

数据库分布式锁小结

为啥要用 random_value 随机值呢?因为如果某个客户端获取到了锁,但是阻塞了很长时间才执行完,比如说超过了 30s,此时可能已经自动释放锁了,此时可能别的客户端已经获取到了这个锁,要是你这个时候直接删除 key 的话会有问题,所以得用随机值加上面的 lua 脚本来释放锁。

但是这样是肯定不行的。因为如果是普通的 redis 单实例,那就是单点故障。或者是 redis 普通主从,那 redis 主从异步复制,如果主节点挂了(key 就没有了),key 还没同步到从节点,此时从节点切换为主节点,别人就可以 set key,从而拿到锁。

RedLock 算法

这个场景是假设有一个 redis cluster,有 5 个 redis master 实例。然后执行如下步骤获取一把锁:

  1. 获取当前时间戳,单位是毫秒;
  2. 跟上面类似,轮流尝试在每个 master 节点上创建锁,过期时间较短,一般就几十毫秒;
  3. 尝试在大多数节点上建立一个锁,比如 5 个节点就要求是 3 个节点 n / 2 + 1
  4. 客户端计算建立好锁的时间,如果建立锁的时间小于超时时间,就算建立成功了;
  5. 要是锁建立失败了,那么就依次之前建立过的锁删除;
  6. 只要别人建立了一把分布式锁,你就得不断轮询去尝试获取锁

Redis 官方给出了以上两种基于 Redis 实现分布式锁的方法,详细说明可以查看:https://redis.io/topics/distlock

ZooKeeper 分布式锁

ZooKeeper 分布式锁原理

ZooKeeper 实现分布式锁基于 ZooKeeper 的两个特性:

  • 顺序临时节点:ZooKeeper 的存储类似于 DNS 那样的具有层级的命名空间。ZooKeeper 节点类型可以分为持久节点(PERSISTENT )、临时节点(EPHEMERAL),每个节点还能被标记为有序性(SEQUENTIAL),一旦节点被标记为有序性,那么整个节点就具有顺序自增的特点。
  • Watch 机制:ZooKeeper 允许用户在指定节点上注册一些 Watcher,并且在特定事件触发的时候,ZooKeeper 服务端会将事件通知给用户。

这也是 ZooKeeper 客户端 curator 的分布式锁实现。

  1. 创建一个目录 mylock;
  2. 线程 A 想获取锁就在 mylock 目录下创建临时顺序节点;
  3. 获取 mylock 目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁;
  4. 线程 B 获取所有节点,判断自己不是最小节点,设置监听比自己次小的节点;
  5. 线程 A 处理完,删除自己的节点,线程 B 监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。

ZooKeeper 分布式锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/**
* ZooKeeperSession
*
* @author bingo
* @since 2018/11/29
*
*/
public class ZooKeeperSession {

private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

private ZooKeeper zookeeper;
private CountDownLatch latch;

public ZooKeeperSession() {
try {
this.zookeeper = new ZooKeeper("192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181", 50000, new ZooKeeperWatcher());
try {
connectedSemaphore.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("ZooKeeper session established......");
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 获取分布式锁
*
* @param productId
*/
public Boolean acquireDistributedLock(Long productId) {
String path = "/product-lock-" + productId;

try {
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
return true;
} catch (Exception e) {
while (true) {
try {
// 相当于是给node注册一个监听器,去看看这个监听器是否存在
Stat stat = zk.exists(path, true);

if (stat != null) {
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
return true;
} catch (Exception ee) {
continue;
}
}

}
return true;
}

/**
* 释放掉一个分布式锁
*
* @param productId
*/
public void releaseDistributedLock(Long productId) {
String path = "/product-lock-" + productId;
try {
zookeeper.delete(path, -1);
System.out.println("release the lock for product[id=" + productId + "]......");
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 建立zk session的watcher
*
* @author bingo
* @since 2018/11/29
*
*/
private class ZooKeeperWatcher implements Watcher {

public void process(WatchedEvent event) {
System.out.println("Receive watched event: " + event.getState());

if (KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}

if (this.latch != null) {
this.latch.countDown();
}
}

}

/**
* 封装单例的静态内部类
*
* @author bingo
* @since 2018/11/29
*
*/
private static class Singleton {

private static ZooKeeperSession instance;

static {
instance = new ZooKeeperSession();
}

public static ZooKeeperSession getInstance() {
return instance;
}

}

/**
* 获取单例
*
* @return
*/
public static ZooKeeperSession getInstance() {
return Singleton.getInstance();
}

/**
* 初始化单例的便捷方法
*/
public static void init() {
getInstance();
}

}

也可以采用另一种方式,创建临时顺序节点:

如果有一把锁,被多个人给竞争,此时多个人会排队,第一个拿到锁的人会执行,然后释放锁;后面的每个人都会去监听排在自己前面的那个人创建的 node 上,一旦某个人释放了锁,排在自己后面的人就会被 zookeeper 给通知,一旦被通知了之后,就 ok 了,自己就获取到了锁,就可以执行代码了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
public class ZooKeeperDistributedLock implements Watcher {

private ZooKeeper zk;
private String locksRoot = "/locks";
private String productId;
private String waitNode;
private String lockNode;
private CountDownLatch latch;
private CountDownLatch connectedLatch = new CountDownLatch(1);
private int sessionTimeout = 30000;

public ZooKeeperDistributedLock(String productId) {
this.productId = productId;
try {
String address = "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181";
zk = new ZooKeeper(address, sessionTimeout, this);
connectedLatch.await();
} catch (IOException e) {
throw new LockException(e);
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
}

public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
connectedLatch.countDown();
return;
}

if (this.latch != null) {
this.latch.countDown();
}
}

public void acquireDistributedLock() {
try {
if (this.tryLock()) {
return;
} else {
waitForLock(waitNode, sessionTimeout);
}
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
}

public boolean tryLock() {
try {
// 传入进去的locksRoot + “/” + productId
// 假设productId代表了一个商品id,比如说1
// locksRoot = locks
// /locks/10000000000,/locks/10000000001,/locks/10000000002
lockNode = zk.create(locksRoot + "/" + productId, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

// 看看刚创建的节点是不是最小的节点
// locks:10000000000,10000000001,10000000002
List<String> locks = zk.getChildren(locksRoot, false);
Collections.sort(locks);

if(lockNode.equals(locksRoot+"/"+ locks.get(0))){
//如果是最小的节点,则表示取得锁
return true;
}

//如果不是最小的节点,找到比自己小1的节点
int previousLockIndex = -1;
for(int i = 0; i < locks.size(); i++) {
if(lockNode.equals(locksRoot + “/” + locks.get(i))) {
previousLockIndex = i - 1;
break;
}
}

this.waitNode = locks.get(previousLockIndex);
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
return false;
}

private boolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException {
Stat stat = zk.exists(locksRoot + "/" + waitNode, true);
if (stat != null) {
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
return true;
}

public void unlock() {
try {
// 删除/locks/10000000000节点
// 删除/locks/10000000001节点
System.out.println("unlock " + lockNode);
zk.delete(lockNode, -1);
lockNode = null;
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}

public class LockException extends RuntimeException {
private static final long serialVersionUID = 1L;

public LockException(String e) {
super(e);
}

public LockException(Exception e) {
super(e);
}
}
}

ZooKeeper 分布式锁小结

ZooKeeper 版本的分布式锁问题相对比较来说少。

  • 锁的占用时间限制:redis 就有占用时间限制,而 ZooKeeper 则没有,最主要的原因是 redis 目前没有办法知道已经获取锁的客户端的状态,是已经挂了呢还是正在执行耗时较长的业务逻辑。而 ZooKeeper 通过临时节点就能清晰知道,如果临时节点存在说明还在执行业务逻辑,如果临时节点不存在说明已经执行完毕释放锁或者是挂了。由此看来 redis 如果能像 ZooKeeper 一样添加一些与客户端绑定的临时键,也是一大好事。
  • 是否单点故障:redis 本身有很多中玩法,如客户端一致性 hash,服务器端 sentinel 方案或者 cluster 方案,很难做到一种分布式锁方式能应对所有这些方案。而 ZooKeeper 只有一种玩法,多台机器的节点数据是一致的,没有 redis 的那么多的麻烦因素要考虑。

总体上来说 ZooKeeper 实现分布式锁更加的简单,可靠性更高。但 ZooKeeper 因为需要频繁的创建和删除节点,性能上不如 Redis 方式。

分布式锁方案对比

数据库分布式锁,问题比较多,解决起来比较麻烦,不推荐。

性能:

  • Redis 分布式锁,其实需要自己不断自旋去尝试获取锁,比较消耗性能。
  • ZooKeeper 分布式锁,获取不到锁,注册个监听器即可,不需要不断主动尝试获取锁,性能开销较小。

可靠性:

  • 如果是 redis 获取锁的那个客户端出现 bug 挂了,那么只能等待超时时间之后才能释放锁;
  • 而 zk 的话,因为创建的是临时 znode,只要客户端挂了,znode 就没了,此时就自动释放锁。

综上分析,ZooKeeper 实现分布式锁更加的简单,可靠性更高。✅

参考资料

传输控制协议 TCP

简介

什么是 TCP

TCP(Transmission Control Protocol),即传输控制协议,它是一种面向连接的可靠的基于字节流的传输层通信协议。TCP 由 RFC 793 定义。

img

TCP 的特性

  • 面向连接的 - 面向连接是指 TCP 需要通过三次握手、四次挥手原则建立和断开双向连接。
  • 可靠的 - 可靠是指 TCP 传输的数据包保证以原始顺序到达目的地,且数据包不被损坏。为了实现这点,TCP 通过以下技术来保证:
    • 数据包的序列号和校验码
    • 确认包和自动重传
      • 如果发送者没有收到正确的响应,它将重新发送数据包。如果多次超时,连接就会断开。
      • TCP 实行流量控制和拥塞控制。这些确保措施会导致延迟,而且通常导致传输效率比 UDP 低。
  • 基于字节流的
    • 虽然应用程序和 TCP 的交互是一次一个数据块(大小不等),但 TCP 把应用程序看成是一连串的无结构的字节流。TCP 有一个缓冲,当应用程序传送的数据块太长,TCP 就可以把它划分短一些再传送。如果应用程序一次只发送一个字节,TCP 也可以等待积累有足够多的字节后再构成报文段发送出去。
    • 在 TCP 建立连接前两次握手的 SYN 报文中选项字段的 MSS 值,通信双方商定通信的最大报文长度。如果应用层交付下来的数据过大,就会对数据分段,然后发送;否则通过滑动窗口来控制通信双发的数据。

TCP 的适用场景

基于以上特性,为了确保高吞吐量,Web 服务器可以保持大量的 TCP 连接,从而导致高内存使用。但要注意的是,在 Web 服务器线程间拥有大量开放连接可能开销巨大,消耗资源过多,这时可以考虑在适用情况下切换到 UDP。

TCP 对于需要高可靠性但时间紧迫的应用程序很有用。比如包括 Web 服务器,数据库信息,SMTP,FTP 和 SSH。

以下情况使用 TCP 代替 UDP:

  • 你需要数据完好无损。
  • 你想对网络吞吐量自动进行最佳评估。

TCP 报文

img

报文字段不一一阐述,重点关注以下几点:

  • TCP 的包是没有 IP 地址的,那是 IP 层上的事。但是有源端口和目标端口。
  • 一个 TCP 连接需要四个元组来表示是同一个连接(src_ip, src_port, dst_ip, dst_port)准确说是五元组,还有一个是协议。但因为这里只是说 TCP 协议,所以,这里我只说四元组。
  • 注意上图中的四个非常重要的东西:
    • Sequence Number是包的序号,用来解决网络包乱序(reordering)问题。
    • Acknowledgement Number就是 ACK——用于确认收到,用来解决不丢包的问题
    • Window 又叫 Advertised-Window,也就是著名的滑动窗口(Sliding Window),用于解决流控的
    • TCP Flag,也就是包的类型,主要是用于操控 TCP 的状态机的

img

TCP 通信流程

img

TCP 完整的通信分为三块:

  1. 三次握手建立连接
  2. 数据传输
  3. 四次挥手端口连接

三次握手

(1)三次握手有什么用?

  • 三次握手负责建立 TCP 双向连接。

(2)什么是三次握手?

img

如上图所示,三次握手流程如下:

  1. 第一次握手 - 客户端向服务端发送带有 SYN 标志的数据包。
  2. 第二次握手 - 服务端向客户端发送带有 SYN/ACK 标志的数据包。
  3. 第三次握手 - 客户端向服务端发送带有带有 ACK 标志的数据包。

至此,TCP 三次握手完成,客户端与服务端已建立双向连接。

💡 说明:SYN 为 synchronize 的缩写,ACK 为 acknowledgment 的缩写。

(3)为什么需要三次握手?

为了便于说明,假设客户端为 A, 服务端为 B。

  1. 第一次握手,A 向 B 发同步消息。B 收到消息后,B 认为:A 发消息没问题;B 收消息没问题。
  2. 第二次握手,B 向 A 发同步消息和确认消息。A 收到消息后,A 认为:A 发消息、收消息都没问题;B 发消息、收消息都没问题。但是,此时 B 不确定自己发消息是否没问题,所以就需要第三次握手。
  3. 第三次握手,A 向 B 发确认消息。B 收到消息后。B 认为:B 发消息没问题。

四次挥手

(1)四次挥手有什么用?

  • 四次挥手负责断开 TCP 连接。

(2)什么是四次挥手?

如上图所示,四次挥手流程如下:

img

  1. 第一次挥手 - 客户端向服务端发送一个 FIN 包,用来关闭客户端到服务端的数据传送。
  2. 第二次挥手 - 服务端收到这个 FIN 包,向客户端发送一个 ACK 包,确认序号为收到的序号加 1。和 SYN 一样,一个 FIN 将占用一个序号。
  3. 第三次挥手 - 服务端关闭与客户端的连接,向客户端发送一个 FIN 包。
  4. 第四次挥手 - 客户端向服务端发送 ACK 包,并将确认序号设置为收到序号加 1。

(3)为什么建立连接是三次握手,关闭连接确是四次挥手呢?

  • 建立连接的时候, 服务器在 LISTEN 状态下,收到建立连接请求的 SYN 报文后,把 ACK 和 SYN 放在一个报文里发送给客户端。
  • 而关闭连接时,服务器收到对方的 FIN 报文时,仅仅表示对方不再发送数据了但是还能接收数据,而自己也未必全部数据都发送给对方了,所以己方可以立即关闭,也可以发送一些数据给对方后,再发送 FIN 报文给对方来表示同意现在关闭连接,因此,己方 ACK 和 FIN 一般都会分开发送,从而导致多了一次。

滑动窗口

什么是滑动窗口?

滑动窗口是 TCP 的一种控制网络流量的技术。

TCP 必需要解决的可靠传输以及包乱序(reordering)的问题,所以,TCP 必需要知道网络实际的数据处理带宽或是数据处理速度,这样才不会引起网络拥塞,导致丢包。

TCP 头里有一个字段叫 Window,又叫 Advertised-Window,这个字段是接收端告诉发送端自己还有多少缓冲区可以接收数据。于是发送端就可以根据这个接收端的处理能力来发送数据,而不会导致接收端处理不过来

滑动窗口原理是什么?

img

  1. 已发送已确认 - 数据流中最早的字节已经发送并得到确认。这些数据是站在发送端的角度来看的。上图中的 31 个字节已经发送并确认。
  2. 已发送但尚未确认 - 已发送但尚未得到确认的字节。发送方在确认之前,不认为这些数据已经被处理。上图中的 32 ~ 45 字节为第 2 类。
  3. 未发送而接收方已 Ready - 设备尚未将数据发出 ,但接收方根据最近一次关于发送方一次要发送多少字节确认自己有足够空间。发送方会立即尝试发送。上图中的 46 ~ 51 字节为第 3 类。
  4. 未发送而接收方 Not Ready - 由于接收方 not ready,还不允许将这部分数据发出。上图中的 52 以后的字节为第 4 类。

img

这张图片相对于上一张图片,滑动窗口偏移了 5 个字节,意味着有 5 个已发送的字节得到了确认。

TCP 重传机制

TCP 要保证所有的数据包都可以到达,所以,必需要有重传机制。

TCP 重传机制主要有两种:

  • 超时重传机制
  • 快速重传机制

超时重传机制

超时重传机制是指:发送数据包在一定的时间周期内没有收到相应的 ACK,等待一定的时间,超时之后就认为这个数据包丢失,就会重新发送。这个等待时间被称为 RTO(Retransmission TimeOut),即重传超时时间。

没有确认的数据包不会从窗口中移走,定时器在重传时间到期内,每个片段的位置不变。

这种机制的重点是 RTO 的设置:

  • RTO 设长了,重发就慢,丢了老半天才重发,没有效率,性能差;
  • RTO 设短了,会导致可能并没有丢就重发。于是重发的就快,会增加网络拥塞,导致更多的超时,更多的超时导致更多的重发

快速重传机制

快速重传机制,实现了另外的一种丢包评定标准,即如果连续收到 3 次重复 ACK,发送方就认为这个 seq 的包丢失了,立刻进行重传。

当接收方收到乱序片段时,需要重复发送 ACK。

参考资料

用户数据报协议 UDP

简介

img

UDP 是无连接的。数据报(类似于数据包)只在数据报级别有保证。数据报可能会无序的到达目的地,也有可能会遗失。UDP 不支持拥塞控制。虽然不如 TCP 那样有保证,但 UDP 通常效率更高。

UDP 可以通过广播将数据报发送至子网内的所有设备。这对 DHCP 很有用,因为子网内的设备还没有分配 IP 地址,而 IP 对于 TCP 是必须的。

UDP 可靠性更低但适合用在网络电话、视频聊天,流媒体和实时多人游戏上。

以下情况使用 UDP 代替 TCP:

  • 你需要低延迟
  • 相对于数据丢失更糟的是数据延迟
  • 你想实现自己的错误校正方法

UDP 特点

  1. 无连接的,即发送数据之前不需要建立连接,因此减少了开销和发送数据之前的时延。
  2. 不保证可靠交付,因此主机不需要为此复杂的连接状态表
  3. 面向报文的,意思是 UDP 对应用层交下来的报文,既不合并,也不拆分,而是保留这些报文的边界,在添加首部后向下交给 IP 层。
  4. 没有阻塞控制,因此网络出现的拥塞不会使发送方的发送速率降低。
  5. 支持一对一、一对多、多对一和多对多的交互通信,也即是提供广播和多播的功能。
  6. 首部开销小,首部只有 8 个字节,分为四部分。

UDP 应用场景

  1. 名字转换(DNS)
  2. 文件传送(TFTP)
  3. 路由选择协议(RIP)
  4. IP 地址配置(BOOTP,DHTP)
  5. 网络管理(SNMP)
  6. 远程文件服务(NFS)
  7. IP 电话
  8. 流式多媒体通信

UDP 报文

UDP 数据报分为数据字段和首部字段。
首部字段只有 8 个字节,由四个字段组成,每个字段的长度是 2 个字节。

首部各字段意义

  1. 源端口:源端口号,在需要对方回信时选用,不需要时可全 0.
  2. 目的端口:目的端口号,在终点交付报文时必须要使用到。
  3. 长度:UDP 用户数据报的长度,在只有首部的情况,其最小值是 8 。
  4. 检验和:检测 UDP 用户数据报在传输中是否有错,有错就丢弃。

网络技术之 WebSocket

Socket

Socket 作为一种抽象层,应用程序通过它来发送和接收数据,使用 Socket 可以将应用程序与处于同一网络中的其他应用程序进行通信交互。简而言之,Socket 提供了应用程序内部与外界通信的端口以及为通信双方提供了数据传输的通道。

Socket 用法

很多编程语言都支持 Socket。这里以 Java 的 Socket 用法为例。

在 Java 的 SDK 中,socket 的共有两个接口:用于监听客户连接的 ServerSocket 和用于通信的 Socket。使用 socket 的步骤如下:

  • 创建 ServerSocket 并监听客户连接
  • 使用 Socket 连接服务端
  • 通过 Socket 获取输入输出流进行通信

Socket 长连接

Socket 长连接,指的是在客户和服务端之间保持一个 socket 连接长时间不断开。

1
socket.setKeepAlive(true);

WebSocket

WebSocket 简介

WebSocket 是什么

WebSocket 是一种网络通信协议。RFC6455 定义了它的通信标准。

WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。

为什么需要 WebSocket

了解计算机网络协议的人,应该都知道:HTTP 协议是一种无状态的、无连接的、单向的应用层协议。它采用了请求/响应模型。通信请求只能由客户端发起,服务端对请求做出应答处理。

这种通信模型有一个弊端:HTTP 协议无法实现服务器主动向客户端发起消息。

这种单向请求的特点,注定了如果服务器有连续的状态变化,客户端要获知就非常麻烦。大多数 Web 应用程序将通过频繁的异步 JavaScript 和 XML(AJAX)请求实现长轮询。轮询的效率低,非常浪费资源(因为必须不停连接,或者 HTTP 连接始终打开)。

img
因此,工程师们一直在思考,有没有更好的方法。WebSocket 就是这样发明的。WebSocket 连接允许客户端和服务器之间进行全双工通信,以便任一方都可以通过建立的连接将数据推送到另一端。WebSocket 只需要建立一次连接,就可以一直保持连接状态。这相比于轮询方式的不停建立连接显然效率要大大提高。

img

WebSocket 如何工作

Web 浏览器和服务器都必须实现 WebSockets 协议来建立和维护连接。由于 WebSockets 连接长期存在,与典型的 HTTP 连接不同,对服务器有重要的影响。

基于多线程或多进程的服务器无法适用于 WebSockets,因为它旨在打开连接,尽可能快地处理请求,然后关闭连接。任何实际的 WebSockets 服务器端实现都需要一个异步服务器。

WebSocket 使用

WebSocket 客户端

在客户端,没有必要为 WebSockets 使用 JavaScript 库。实现 WebSockets 的 Web 浏览器将通过 WebSockets 对象公开所有必需的客户端功能(主要指支持 Html5 的浏览器)。

以下 API 用于创建 WebSocket 对象。

1
var Socket = new WebSocket(url, [protocol] );

以上代码中的第一个参数 url, 指定连接的 URL。第二个参数 protocol 是可选的,指定了可接受的子协议。

WebSocket 属性

以下是 WebSocket 对象的属性。假定我们使用了以上代码创建了 Socket 对象:

属性 描述
Socket.readyState 只读属性 readyState 表示连接状态,可以是以下值:0 - 表示连接尚未建立。1 - 表示连接已建立,可以进行通信。2 - 表示连接正在进行关闭。3 - 表示连接已经关闭或者连接不能打开。
Socket.bufferedAmount 只读属性 bufferedAmount 已被 send() 放入正在队列中等待传输,但是还没有发出的 UTF-8 文本字节数。
WebSocket 事件

以下是 WebSocket 对象的相关事件。假定我们使用了以上代码创建了 Socket 对象:

事件 事件处理程序 描述
open Socket.onopen 连接建立时触发
message Socket.onmessage 客户端接收服务端数据时触发
error Socket.onerror 通信发生错误时触发
close Socket.onclose 连接关闭时触发
WebSocket 方法

以下是 WebSocket 对象的相关方法。假定我们使用了以上代码创建了 Socket 对象:

方法 描述
Socket.send() 使用连接发送数据
Socket.close() 关闭连接

WebSocket 客户端代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 初始化一个 WebSocket 对象
var ws = new WebSocket('ws://localhost:9998/echo')

// 建立 web socket 连接成功触发事件
ws.onopen = function() {
// 使用 send() 方法发送数据
ws.send('发送数据')
alert('数据发送中...')
}

// 接收服务端数据时触发事件
ws.onmessage = function(evt) {
var received_msg = evt.data
alert('数据已接收...')
}

// 断开 web socket 连接成功触发事件
ws.onclose = function() {
alert('连接已关闭...')
}

WebSocket 服务端

WebSocket 在服务端的实现非常丰富。Node.js、Java、C++、Python 等多种语言都有自己的解决方案。

以下,介绍我在学习 WebSocket 过程中接触过的 WebSocket 服务端解决方案。

Node.js

常用的 Node 实现有以下三种。

Java

Java 的 web 一般都依托于 servlet 容器。

我使用过的 servlet 容器有:Tomcat、Jetty、Resin。其中 Tomcat7、Jetty7 及以上版本均开始支持 WebSocket(推荐较新的版本,因为随着版本的更迭,对 WebSocket 的支持可能有变更)。

此外,Spring 框架对 WebSocket 也提供了支持。

虽然,以上应用对于 WebSocket 都有各自的实现。但是,它们都遵循RFC6455 的通信标准,并且 Java API 统一遵循 JSR 356 - JavaTM API for WebSocket 规范。所以,在实际编码中,API 差异不大。

Spring

Spring 对于 WebSocket 的支持基于下面的 jar 包:

1
2
3
4
5
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>${spring.version}</version>
</dependency>

在 Spring 实现 WebSocket 服务器大概分为以下几步:

创建 WebSocket 处理器

扩展 TextWebSocketHandlerBinaryWebSocketHandler ,你可以覆写指定的方法。Spring 在收到 WebSocket 事件时,会自动调用事件对应的方法。

1
2
3
4
5
6
7
8
9
10
11
12
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.TextMessage;

public class MyHandler extends TextWebSocketHandler {

@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) {
// ...
}

}

WebSocketHandler 源码如下,这意味着你的处理器大概可以处理哪些 WebSocket 事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public interface WebSocketHandler {

/**
* 建立连接后触发的回调
*/
void afterConnectionEstablished(WebSocketSession session) throws Exception;

/**
* 收到消息时触发的回调
*/
void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception;

/**
* 传输消息出错时触发的回调
*/
void handleTransportError(WebSocketSession session, Throwable exception) throws Exception;

/**
* 断开连接后触发的回调
*/
void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception;

/**
* 是否处理分片消息
*/
boolean supportsPartialMessages();

}

配置 WebSocket

配置有两种方式:注解和 xml 。其作用就是将 WebSocket 处理器添加到注册中心。

  1. 实现 WebSocketConfigurer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/myHandler");
}

@Bean
public WebSocketHandler myHandler() {
return new MyHandler();
}

}
  1. xml 方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
http://www.springframework.org/schema/websocket/spring-websocket.xsd">

<websocket:handlers>
<websocket:mapping path="/myHandler" handler="myHandler"/>
</websocket:handlers>

<bean id="myHandler" class="org.springframework.samples.MyHandler"/>

</beans>

更多配置细节可以参考:Spring WebSocket 文档

javax.websocket

如果不想使用 Spring 框架的 WebSocket API,你也可以选择基本的 javax.websocket。

首先,需要引入 API jar 包。

1
2
3
4
5
6
<!-- To write basic javax.websocket against -->
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<version>1.0</version>
</dependency>

如果使用嵌入式 jetty,你还需要引入它的实现包:

1
2
3
4
5
6
7
8
9
10
11
12
<!-- To run javax.websocket in embedded server -->
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>javax-websocket-server-impl</artifactId>
<version>${jetty-version}</version>
</dependency>
<!-- To run javax.websocket client -->
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>javax-websocket-client-impl</artifactId>
<version>${jetty-version}</version>
</dependency>

@ServerEndpoint

这个注解用来标记一个类是 WebSocket 的处理器。

然后,你可以在这个类中使用下面的注解来表明所修饰的方法是触发事件的回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 收到消息触发事件
@OnMessage
public void onMessage(String message, Session session) throws IOException, InterruptedException {
...
}

// 打开连接触发事件
@OnOpen
public void onOpen(Session session, EndpointConfig config, @PathParam("id") String id) {
...
}

// 关闭连接触发事件
@OnClose
public void onClose(Session session, CloseReason closeReason) {
...
}

// 传输消息错误触发事件
@OnError
public void onError(Throwable error) {
...
}

ServerEndpointConfig.Configurator

编写完处理器,你需要扩展 ServerEndpointConfig.Configurator 类完成配置:

1
2
3
4
5
6
7
public class WebSocketServerConfigurator extends ServerEndpointConfig.Configurator {
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
HttpSession httpSession = (HttpSession) request.getHttpSession();
sec.getUserProperties().put(HttpSession.class.getName(), httpSession);
}
}

然后就没有然后了,就是这么简单。

WebSocket 代理

如果把 WebSocket 的通信看成是电话连接,Nginx 的角色则像是电话接线员,负责将发起电话连接的电话转接到指定的客服。

Nginx 从 1.3 版开始正式支持 WebSocket 代理。如果你的 web 应用使用了代理服务器 Nginx,那么你还需要为 Nginx 做一些配置,使得它开启 WebSocket 代理功能。

以下为参考配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
server {
# this section is specific to the WebSockets proxying
location /socket.io {
proxy_pass http://app_server_wsgiapp/socket.io;
proxy_redirect off;

proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_read_timeout 600;
}
}

更多配置细节可以参考:Nginx 官方的 websocket 文档

WebSocket 应用示例

如果需要完整示例代码,可以参考我的 Github 代码:

spring-websocket 和 jetty 9.3 版本似乎存在兼容性问题,Tomcat 则木有问题。

我尝试了好几次,没有找到解决方案,只好使用 Jetty 官方的嵌入式示例在 Jetty 中使用 WebSocket 。

FAQ

Http vs. Socket

Http 通信与 Socket 通信方式有何差异?

  • Http
    • 基于请求/响应模式,采取一问一答方式(客户端请求,服务端才会响应)
  • Socket
    • 客户端和服务端建立双向连接。连接成功后,任意一方都可主动发送消息。
    • 数据丢失率低,使用简单,易于移植。

HTTP 和 WebSocket 有什么关系?

Websocket 其实是一个新协议,跟 HTTP 协议基本没有关系,只是为了兼容现有浏览器的握手规范而已,也就是说它是 HTTP 协议上的一种补充。

Html 和 HTTP 有什么关系?

Html 是超文本标记语言,是一种用于创建网页的标准标记语言。它是一种技术标准。Html5 是它的最新版本。

Http 是一种网络通信协议。其本身和 Html 没有直接关系。

参考资料