Dunwu Blog

大道至简,知易行难

Redis 持久化

Redis 是内存型数据库,为了保证数据在宕机后不会丢失,需要将内存中的数据持久化到硬盘上。

Redis 支持两种持久化方式:RDB 和 AOF。这两种持久化方式既可以同时使用,也可以单独使用。

关键词:RDBAOFSAVEBGSAVEappendfsync

RDB 快照

RDB 简介

RDB 即“快照”,它将某时刻的所有 Redis 数据库中的所有键值对数据保存到一个经过压缩的“二进制文件”(RDB 文件)中

RDB 持久化即可以“手动”执行,也可以定期“自动”执行

RDB 文件的“载入”工作是在服务器“启动”时“自动”执行的

对于不同类型的键值对, RDB 文件会使用不同的方式来保存它们。

创建 RDB 后,用户可以对 RDB 进行备份,可以将 RDB 复制到其他服务器从而创建具有相同数据的服务器副本,还可以在重启服务器时使用。一句话来说:RDB 适用于作为“冷备”

RDB 的优点和缺点

RDB 的优点

  • RDB 文件非常紧凑,适合作为“冷备”。比如你可以在每个小时报保存一下过去 24 小时内的数据,同时每天保存过去 30 天的数据,这样即使出了问题你也可以根据需求恢复到不同版本的数据集。
  • 快照在保存 RDB 文件时父进程唯一需要做的就是 fork 出一个子进程,接下来的工作全部由子进程来做,父进程不需要再做其他 IO 操作,所以快照持久化方式可以最大化 Redis 的性能。
  • 恢复大数据集时,RDB 比 AOF 更快

RDB 的缺点

  • 如果系统发生故障,将会丢失最后一次创建快照之后的数据。如果你希望在 Redis 意外停止工作(例如电源中断)的情况下丢失的数据最少的话,那么 快照不适合你。虽然你可以配置不同的 save 时间点(例如每隔 5 分钟并且对数据集有 100 个写的操作),是 Redis 要完整的保存整个数据集是一个比较繁重的工作,你通常会每隔 5 分钟或者更久做一次完整的保存,万一在 Redis 意外宕机,你可能会丢失几分钟的数据。
  • 如果数据量很大,保存快照的时间会很长。快照需要经常 fork 子进程来保存数据集到硬盘上。当数据集比较大的时候,fork 的过程是非常耗时的,可能会导致 Redis 在一些毫秒级内不能响应客户端的请求。如果数据集巨大并且 CPU 性能不是很好的情况下,这种情况会持续 1 秒。AOF 也需要 fork,但是你可以调节重写日志文件的频率来提高数据集的耐久度。

RDB 的创建

有两个 Redis 命令可以用于生成 RDB 文件:SAVEBGSAVE

SAVE 命令由服务器进程直接执行保存操作,直到 RDB 创建完成为止。所以该命令“会阻塞”服务器,在阻塞期间,服务器不能响应任何命令请求。

1
2
>SAVE
"OK"

BGSAVE 命令会“派生”(fork)一个子进程,由子进程负责创建 RDB 文件,服务器进程继续处理命令请求,所以该命令“不会阻塞”服务器

1
2
>BGSAVE
"Background saving started"

🔔 【注意】

BGSAVE 命令的实现采用的是写时复制技术(Copy-On-Write,缩写为 CoW)。

BGSAVE 命令执行期间,SAVEBGSAVEBGREWRITEAOF 三个命令会被拒绝,以免与当前的 BGSAVE 操作产生竞态条件,降低性能。

创建 RDB 的工作由 rdb.c/rdbSave 函数完成。

RDB 的载入

RDB 文件的“载入”工作是在服务器“启动”时“自动”执行的。Redis 并没有专门用于载入 RDB 文件的命令。

服务器载入 RDB 文件期间,会一直处于阻塞状态,直到载入完成为止。

载入 RDB 的工作由 rdb.c/rdbLoad 函数完成。

🔔 【注意】

因为 AOF 的更新频率通常比 RDB 的更新频率高,所以:

  • 如果服务器开了 AOF,则服务器会优先使用 AOF 来还原数据。
  • 只有在 AOF 处于关闭时,服务器才会使用 RDB 来还原数据。

自动间隔保存

Redis 支持通过在 redis.conf 文件中配置 save 选项,让服务器每隔一段时间自动执行一次 BGSAVE 命令。save 选项可以设置多个保存条件,只要其中任意一个条件被满足,服务器就会执行 BGSAVE 命令。

【示例】redis.conf 中自动保存配置

1
2
3
4
5
6
# 900 秒内,至少对数据库进行了 1 次修改
save 900 1
# 300 秒内,至少对数据库进行了 10 次修改
save 300 10
# 60 秒内,至少对数据库进行了 10000 次修改
save 60 10000

只要满足以上任意条件,Redis 服务就会执行 BGSAVE 命令。

自动间隔的保存条件定义在 redis.h/redisServer 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct redisServer {
// 记录了保存条件的数组
struct saveparam *saveparams;

// 自从上次 SAVE 执行以来,数据库被修改的次数
long long dirty;

// 上一次完成 SAVE 的时间
time_t lastsave;
}

// 服务器的保存条件(BGSAVE 自动执行的条件)
struct saveparam {

// 多少秒之内
time_t seconds;

// 发生多少次修改
int changes;

};

redisServer 中的 saveparams 数组维护了多个自动间隔保存条件。

服务每次成功执行一个修改命令后,dirty 计数器就会加 1;而 lastsave 则记录了上一次完成 SAVE 的时间。Redis 会通过一个 serverCron 函数周期性检查 save 选项所设条件是否满足,如果满足,则执行 BGSVAE 命令。

RDB 的文件结构

RDB 文件是一个经过压缩的“二进制文件”,由多个部分组成。

对于不同类型(STRING、HASH、LIST、SET、SORTED SET)的键值对,RDB 文件会使用不同的方式来保存它们。

Redis 本身提供了一个 RDB 文件检查工具 redis-check-dump

RDB 的配置

Redis RDB 默认配置如下:

1
2
3
4
5
6
7
8
save 900 1
save 300 10
save 60 10000
stop-writes-on-bgsave-error yes
rdbcompression yes
rdbchecksum yes
dbfilename dump.rdb
dir ./

Redis 的配置文件 redis.conf 中与 RDB 有关的选项:

  • save - Redis 会根据 save 选项,让服务器每隔一段时间自动执行一次 BGSAVE 命令

  • stop-writes-on-bgsave-error - 当 BGSAVE 命令出现错误时停止写 RDB 文件

  • rdbcompression - RDB 文件开启压缩功能

  • rdbchecksum - 对 RDB 文件进行校验

  • dbfilename - RDB 文件名

  • dir - RDB 文件和 AOF 文件的存储路径

AOF 日志

AOF 简介

AOF(Append Only File) 是将所有写命令追加写入“日志文件”,以此来记录数据的变化。当服务器重启时,会重新载入和执行 AOF 文件中的命令,就可以恢复原始的数据。AOF 适合作为“热备”

AOF 可以通过 appendonly yes 配置选项来开启。

AOF 的优点和缺点

AOF 的优点

  • 如果系统发生故障,AOF 丢失数据比 RDB 少。你可以使用不同的 fsync 策略:无 fsync;每秒 fsync;每次写的时候 fsync。使用默认的每秒 fsync 策略,Redis 的性能依然很好(fsync 是由后台线程进行处理的,主线程会尽力处理客户端请求),一旦出现故障,你最多丢失 1 秒的数据。
  • AOF 文件可修复 - AOF 文件是一个只进行追加的日志文件,所以不需要写入 seek,即使由于某些原因(磁盘空间已满,写的过程中宕机等等)未执行完整的写入命令,你也也可使用 redis-check-aof 工具修复这些问题。
  • AOF 文件可压缩。Redis 可以在 AOF 文件体积变得过大时,自动地在后台对 AOF 进行重写:重写后的新 AOF 文件包含了恢复当前数据集所需的最小命令集合。整个重写操作是绝对安全的,因为 Redis 在创建新 AOF 文件的过程中,会继续将命令追加到现有的 AOF 文件里面,即使重写过程中发生停机,现有的 AOF 文件也不会丢失。而一旦新 AOF 文件创建完毕,Redis 就会从旧 AOF 文件切换到新 AOF 文件,并开始对新 AOF 文件进行追加操作。
  • AOF 文件可读 - AOF 文件有序地保存了对数据库执行的所有写入操作,这些写入操作以 Redis 命令的格式保存。因此 AOF 文件的内容非常容易被人读懂,对文件进行分析(parse)也很轻松。 导出(export) AOF 文件也非常简单。举个例子,如果你不小心执行了 FLUSHALL 命令,但只要 AOF 文件未被重写,那么只要停止服务器,移除 AOF 文件末尾的 FLUSHALL 命令,并重启 Redis ,就可以将数据集恢复到 FLUSHALL 执行之前的状态。

AOF 的缺点

  • AOF 文件体积一般比 RDB 大 - 对于相同的数据集来说,AOF 文件的体积通常要大于 RDB 文件的体积。
  • 恢复大数据集时,AOF 比 RDB 慢。 - 根据所使用的 fsync 策略,AOF 的速度可能会慢于快照。在一般情况下,每秒 fsync 的性能依然非常高,而关闭 fsync 可以让 AOF 的速度和快照一样快,即使在高负荷之下也是如此。不过在处理巨大的写入载入时,快照可以提供更有保证的最大延迟时间(latency)。

AOF 的创建

Redis 命令请求会先保存到 AOF 缓冲区,再定期写入并同步到 AOF 文件

AOF 的实现可以分为命令追加(append)、文件写入、文件同步(sync)三个步骤。

  • 命令追加 - 当 Redis 服务器开启 AOF 功能时,服务器在执行完一个写命令后,会以 Redis 命令协议格式将被执行的写命令追加到 AOF 缓冲区的末尾。
  • 文件写入文件同步
    • Redis 的服务器进程就是一个事件循环,这个循环中的文件事件负责接收客户端的命令请求,以及向客户端发送命令回复。而时间事件则负责执行想 serverCron 这样的定时运行的函数。
    • 因为服务器在处理文件事件时可能会执行写命令,这些写命令会被追加到 AOF 缓冲区,服务器每次结束事件循环前,都会根据 appendfsync 选项来判断 AOF 缓冲区内容是否需要写入和同步到 AOF 文件中。

appendfsync 不同选项决定了不同的持久化行为:

  • always - 将 AOF 缓冲区中所有内容写入并同步到 AOF 文件。这种方式是最数据最安全的,但也是性能最差的。
  • no - 将 AOF 缓冲区所有内容写入到 AOF 文件,但并不对 AOF 文件进行同步,何时同步由操作系统决定。这种方式是数据最不安全的,一旦出现故障,未来得及同步的所有数据都会丢失。
  • everysec - appendfsync 默认选项。将 AOF 缓冲区所有内容写入到 AOF 文件,如果上次同步 AOF 文件的时间距离现在超过一秒钟,那么再次对 AOF 文件进行同步,这个同步操作是有一个线程专门负责执行的。这张方式是前面两种的这种方案——性能足够好,且即使出现故障,仅丢失一秒钟内的数据。

appendfsync 选项的不同值对 AOF 持久化功能的安全性、以及 Redis 服务器的性能有很大的影响。

AOF 的载入

因为 AOF 文件中包含了重建数据库所需的所有写命令,所以服务器只要载入并执行一遍 AOF 文件中保存的写命令,就可以还原服务器关闭前的数据库状态。

AOF 载入过程如下:

  1. 服务器启动载入程序。
  2. 创建一个伪客户端。因为 Redis 命令只能在客户端上下文中执行,所以需要创建一个伪客户端来载入、执行 AOF 文件中记录的命令。
  3. 从 AOF 文件中分析并读取一条写命令。
  4. 使用伪客户端执行写命令。
  5. 循环执行步骤 3、4,直到所有写命令都被处理完毕为止。
  6. 载入完毕。

AOF 的重写

随着 Redis 不断运行,AOF 的体积也会不断增长,这将导致两个问题:

  • AOF 耗尽磁盘可用空间。
  • Redis 重启后需要执行 AOF 文件记录的所有写命令来还原数据集,如果 AOF 过大,则还原操作执行的时间就会非常长。

为了解决 AOF 体积膨胀问题,Redis 提供了 AOF 重写功能,来对 AOF 文件进行压缩。AOF 重写可以产生一个新的 AOF 文件,这个新的 AOF 文件和原来的 AOF 文件所保存的数据库状态一致,但体积更小

AOF 重写并非读取和分析现有 AOF 文件的内容,而是直接从数据库中读取当前的数据库状态。即从数据库中读取键的当前值,然后用一条命令去记录该键值对,以此代替之前可能存在冗余的命令。

AOF 后台重写

作为一种辅助性功能,显然 Redis 并不想在 AOF 重写时阻塞 Redis 服务接收其他命令。因此,Redis 决定通过 BGREWRITEAOF 命令创建一个子进程,然后由子进程负责对 AOF 文件进行重写,这与 BGSAVE 原理类似。

  • 在执行 BGREWRITEAOF 命令时,Redis 服务器会维护一个 AOF 重写缓冲区。当 AOF 重写子进程开始工作后,Redis 每执行完一个写命令,会同时将这个命令发送给 AOF 缓冲区和 AOF 重写缓冲区。
  • 由于彼此不是在同一个进程中工作,AOF 重写不影响 AOF 写入和同步。当子进程完成创建新 AOF 文件的工作之后,服务器会将重写缓冲区中的所有内容追加到新 AOF 文件的末尾,使得新旧两个 AOF 文件所保存的数据库状态一致。
  • 最后,服务器用新的 AOF 文件替换就的 AOF 文件,以此来完成 AOF 重写操作。

BGREWRITEAOF 命令的实现采用的是写时复制技术(Copy-On-Write,缩写为 CoW)。

可以通过设置 auto-aof-rewrite-percentageauto-aof-rewrite-min-size,使得 Redis 在满足条件时,自动执行 BGREWRITEAOF

假设配置如下:

1
2
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

表明,当 AOF 大于 64MB,且 AOF 体积比上一次重写后的体积大了至少 100% 时,执行 BGREWRITEAOF

AOF 的配置

AOF 的默认配置:

1
2
3
4
5
appendonly no
appendfsync everysec
no-appendfsync-on-rewrite no
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

AOF 持久化通过在 redis.conf 中的 appendonly yes 配置选项来开启。

  • appendonly - 开启 AOF 功能。
  • appendfilename - AOF 文件名。
  • appendfsync - 用于设置同步频率,它有以下可选项:
    • always - 每个 Redis 写命令都要同步写入硬盘。这样做会严重降低 Redis 的速度。
    • everysec - 每秒执行一次同步,显示地将多个写命令同步到硬盘。为了兼顾数据安全和写入性能,推荐使用 appendfsync everysec 选项。Redis 每秒同步一次 AOF 文件时的性能和不使用任何持久化特性时的性能相差无几。
    • no - 让操作系统来决定应该何时进行同步。
  • no-appendfsync-on-rewrite - AOF 重写时不支持追加命令。
  • auto-aof-rewrite-percentage - AOF 重写百分比。
  • auto-aof-rewrite-min-size - AOF 重写文件的最小大小。
  • dir - RDB 文件和 AOF 文件的存储路径。

RDB 和 AOF

当 Redis 启动时, 如果 RDB 和 AOF 功能都开启了,那么程序会优先使用 AOF 文件来恢复数据集,因为 AOF 文件所保存的数据通常是最完整的。

如何选择持久化

  • 如果不关心数据丢失,可以不持久化。
  • 如果可以承受数分钟以内的数据丢失,可以只使用 RDB。
  • 如果不能承受数分钟以内的数据丢失,可以同时使用 RDB 和 AOF。

有很多用户都只使用 AOF 持久化, 但并不推荐这种方式: 因为定时生成 RDB 快照(snapshot)非常便于进行数据库备份,并且快照恢复数据集的速度也要比 AOF 恢复的速度要快,除此之外,使用快照还可以避免之前提到的 AOF 程序的 bug 。

RDB 切换为 AOF

在 Redis 2.2 或以上版本,可以在不重启的情况下,从 RDB 切换为 AOF :

  • 为最新的 dump.rdb 文件创建一个备份。
  • 将备份放到一个安全的地方。
  • 执行以下两条命令:
  • redis-cli config set appendonly yes
  • redis-cli config set save
  • 确保写命令会被正确地追加到 AOF 文件的末尾。
  • 执行的第一条命令开启了 AOF 功能: Redis 会阻塞直到初始 AOF 文件创建完成为止, 之后 Redis 会继续处理命令请求, 并开始将写入命令追加到 AOF 文件末尾。

执行的第二条命令用于关闭快照功能。 这一步是可选的, 如果你愿意的话, 也可以同时使用快照和 AOF 这两种持久化功能。

🔔 重要:别忘了在 redis.conf 中打开 AOF 功能!否则的话,服务器重启之后,之前通过 CONFIG SET 设置的配置就会被遗忘,程序会按原来的配置来启动服务器。

AOF 和 RDB 的相互作用

BGSAVEBGREWRITEAOF 命令不可以同时执行。这是为了避免两个 Redis 后台进程同时对磁盘进行大量的 I/O 操作。

如果 BGSAVE 正在执行,并且用户显示地调用 BGREWRITEAOF 命令,那么服务器将向用户回复一个 OK 状态,并告知用户,BGREWRITEAOF 已经被预定执行。一旦 BGSAVE 执行完毕, BGREWRITEAOF 就会正式开始。

混合持久化

RDB 优点是数据恢复速度快,但是快照的频率不好把握。频率太低,丢失的数据就会比较多,频率太高,就会影响性能。AOF 优点是丢失数据少,但是数据恢复不快。

为了集成了两者的优点,Redis 4.0 提出了混合使用 AOF 日志和内存快照,也叫混合持久化,既保证了 Redis 重启速度,又降低数据丢失风险。

混合持久化工作在 AOF 日志重写过程,当开启了混合持久化时,在 AOF 重写日志时,fork 出来的重写子进程会先将与主线程共享的内存数据以 RDB 方式写入到 AOF 文件,然后主线程处理的操作命令会被记录在重写缓冲区里,重写缓冲区里的增量命令会以 AOF 方式写入到 AOF 文件,写入完成后通知主进程将新的含有 RDB 格式和 AOF 格式的 AOF 文件替换旧的的 AOF 文件。

也就是说,使用了混合持久化,AOF 文件的前半部分是 RDB 格式的全量数据,后半部分是 AOF 格式的增量数据

这样的好处在于,重启 Redis 加载数据的时候,由于前半部分是 RDB 内容,这样加载的时候速度会很快

加载完 RDB 的内容后,才会加载后半部分的 AOF 内容,这里的内容是 Redis 后台子进程重写 AOF 期间,主线程处理的操作命令,可以使得数据更少的丢失

混合持久化优点

  • 混合持久化结合了 RDB 和 AOF 持久化的优点,开头为 RDB 的格式,使得 Redis 可以更快的启动,同时结合 AOF 的优点,有减低了大量数据丢失的风险。

混合持久化缺点

  • AOF 文件中添加了 RDB 格式的内容,使得 AOF 文件的可读性变得很差;
  • 兼容性差,如果开启混合持久化,那么此混合持久化 AOF 文件,就不能用在 Redis 4.0 之前版本了。

Redis 备份

应该确保 Redis 数据有完整的备份。

备份 Redis 数据建议采用 RDB。

备份过程

  1. 创建一个定期任务(cron job),每小时将一个 RDB 文件备份到一个文件夹,并且每天将一个 RDB 文件备份到另一个文件夹。
  2. 确保快照的备份都带有相应的日期和时间信息,每次执行定期任务脚本时,使用 find 命令来删除过期的快照:比如说,你可以保留最近 48 小时内的每小时快照,还可以保留最近一两个月的每日快照。
  3. 至少每天一次,将 RDB 备份到你的数据中心之外,或者至少是备份到你运行 Redis 服务器的物理机器之外。

容灾备份

Redis 的容灾备份基本上就是对数据进行备份,并将这些备份传送到多个不同的外部数据中心。

容灾备份可以在 Redis 运行并产生快照的主数据中心发生严重的问题时,仍然让数据处于安全状态。

参考资料

Redis 基本数据类型

关键词:StringHashListSetZset

Redis 提供了多种数据类型,每种数据类型有丰富的命令支持。

Redis 支持的基本数据类型:STRING、HASH、LIST、SET、ZSET

Redis 支持的高级数据类型:BitMap、HyperLogLog、GEO、Stream

使用 Redis ,不仅要了解其数据类型的特性,还需要根据业务场景,灵活的、高效的使用其数据类型来建模。

String

String 简介

String 类型是键值对结构。

String 类型是二进制安全的。二进制安全是指,String 类型不仅可以保存文本数据,还可以保存任意格式的二进制数据,如:图片、音频、视频、压缩文件等。

默认情况下,String 类型的值最大可为 512 MB

String 实现

String 类型的底层的数据结构实现主要是 int 和 SDS(简单动态字符串)。

SDS 和我们认识的 C 字符串不太一样,之所以没有使用 C 语言的字符串表示,因为 SDS 相比于 C 的原生字符串:

  • SDS 不仅可以保存文本数据,还可以保存二进制数据。因为 SDS 使用 len 属性的值而不是空字符来判断字符串是否结束,并且 SDS 的所有 API 都会以处理二进制的方式来处理 SDS 存放在 buf[] 数组里的数据。所以 SDS 不光能存放文本数据,而且能保存图片、音频、视频、压缩文件这样的二进制数据。
  • **SDS 获取字符串长度的时间复杂度是 O(1)**。因为 C 语言的字符串并不记录自身长度,所以获取长度的复杂度为 O(n);而 SDS 结构里用 len 属性记录了字符串长度,所以复杂度为 O(1)
  • Redis 的 SDS API 是安全的,拼接字符串不会造成缓冲区溢出。因为 SDS 在拼接字符串之前会检查 SDS 空间是否满足要求,如果空间不够会自动扩容,所以不会导致缓冲区溢出的问题。

字符串对象的编码可以是 intraw 或者 embstr

字符串对象保存各类型值的编码方式:

编码
可以用 long 类型保存的整数。 int
可以用 long double 类型保存的浮点数。 embstr 或者 raw
字符串值, 或者因为长度太大而没办法用 long 类型表示的整数, 又或者因为长度太大而没办法用 long double 类型表示的浮点数。 embstr 或者 raw

如果一个字符串对象保存的是整数值, 并且这个整数值可以用 long 类型来表示, 那么字符串对象会将整数值保存在字符串对象结构的 ptr 属性里面(将 void* 转换成 long ), 并将字符串对象的编码设置为 int

【示例】set 整数值

1
2
3
4
5
> SET number 10086
OK

> OBJECT ENCODING number
"int"

如果字符串对象保存的是一个字符串值, 并且这个字符串值的长度大于 39 字节, 那么字符串对象将使用一个简单动态字符串(SDS)来保存这个字符串值, 并将对象的编码设置为 raw

1
2
3
4
5
6
7
8
> SET story "Long, long, long ago there lived a king ..."
OK

> STRLEN story
(integer) 43

> OBJECT ENCODING story
"raw"

如果字符串对象保存的是一个字符串值, 并且这个字符串值的长度小于等于 39 字节, 那么字符串对象将使用 embstr 编码的方式来保存这个字符串值。embstr 编码是专门用于保存短字符串的一种优化编码方式。

【示例】set 字符串值

1
2
3
4
5
> SET msg "hello"
OK

> OBJECT ENCODING msg
"embstr"

String 命令

命令 说明
SET 存储一个字符串值
SETNX 仅当键不存在时,才存储字符串值
GET 获取指定 key 的值
MGET 获取一个或多个指定 key 的值
INCRBY 将 key 中储存的数字加上指定的增量值
DECRBY 将 key 中储存的数字减去指定的减量值

更多命令请参考:Redis String 类型官方命令文档

【示例】SET、GET、DEL 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 将 key(name) 的 value 保存为 dunwu
> set name dunwu
OK
# 获取 key(name) 的 value
> get name
"dunwu"
# 将 key(name) 的 value 保存为 unknown(覆盖原 value)
> set name unknown
OK
> get name
"unknown"
# 检查 key(name) 是否存在
> exists name
(integer) 1
# 删除 key(name)
> del name
(integer) 1
> exists name
(integer) 0
> get name
(nil)

【示例】SETNX 操作

1
2
3
4
5
6
7
8
9
10
11
12
# 检查 key(lock) 是否存在
> exists lock
(integer) 0
# 将 key(lock) 的 value 保存为 1,保存成功
> setnx lock 1
(integer) 1
# 将 key(lock) 的 value 保存为 2,由于 key 已存在,保存失败
> setnx lock 2
(integer) 0
# 获取 key(lock) 的 value
> get lock
"1"

【示例】MSET、MGET 操作

1
2
3
4
5
6
7
8
# 批量设置 one、two、three 这 3 个 key
> mset one 1 tow 2 three 3
OK
# 批量获取 one、two、three 3 个 key 的 value
> mget one tow three
1) "1"
2) "2"
3) "3"

【示例】INCR、DECR、INCRBY、DECRBY 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 将 key(counter) 的 value 保存为 0
> set counter 0
OK
# 将 key(counter) 的 value 加 1
> incr counter
(integer) 1
# 将 key(counter) 的 value 加 9
> incrby counter 9
(integer) 10
# 将 key(counter) 的 value 减 1
> decr counter
(integer) 9
# 将 key(counter) 的 value 减 9
> decrby counter 9
(integer) 0

String 应用

适用场景:缓存、计数器、共享 Session

缓存对象

使用 String 来缓存对象有两种方式:

(1)缓存对象的 JSON 值

1
> set user:1 {"name":"dunwu","sex":"man"}

(2)将 key 分离为 user:ID: 属性的形式,采用 MSET 存储,用 MGET 获取各属性值

1
2
3
4
5
> mset user:1:name dunwu user:1:sex man
OK
> mget user:1:name user:1:sex
1) "dunwu"
2) "man"

计数器

【需求场景】

统计网站某内容的点击量、收藏量、点赞数等等。

【解决方案】

使用 Redis 的 String 类型存储一个计数器。

维护计数器的常见操作如下:

  • 增加统计值 - 使用 INCRDECR 命令
  • 减少统计值 - 使用 INCRBYDECRBY 操作

【示例代码】

1
2
3
4
5
6
7
8
9
10
11
12
# 初始化 ID 为 1024 的博文访问量为 0
> set blog:view:1024 0
OK
# ID 为 1024 的博文访问量加 1
> incr blog:view:1024
(integer) 1
# ID 为 1024 的博文访问量加 1
> incr blog:view:1024
(integer) 2
# 查看 ID 为 1024 的博文访问量
> get blog:view:1024
"2"

分布式锁

(1)申请锁

SET 命令有个 NX 参数可以实现“key 不存在才插入”,可以用它来实现分布式锁:

  • 如果 key 不存在,则显示插入成功,可以用来表示加锁成功;
  • 如果 key 存在,则会显示插入失败,可以用来表示加锁失败。

一般而言,还会对分布式锁加上过期时间,分布式锁的命令如下:

1
SET key value NX PX 30000
  • key - 就是分布式锁的关键字;
  • value - 是客户端生成的唯一的标识;
  • NX - 表示只有 key 不存在的时候才会设置成功。(如果此时 redis 中存在这个 key,那么设置失败,返回 nil
  • PX 30000 - 表示:30s 后,key 会被删除(这意味着锁被释放了)。设置过期时间,是为了防止出现各种意外,导致锁始终无法释放的情况。

(2)释放锁

释放锁就是删除 key ,但是一般可以用 lua 脚本删除,判断 value 一样才删除,这是为了保证释放锁操作和申请所操作是同一个客户端。由于涉及两个操作,为了保证原子性,可以使用 lua 脚本来实现,因为 Redis 执行 Lua 脚本时,是以原子性方式执行的。

1
2
3
4
5
6
-- 删除锁的时候,找到 key 对应的 value,跟自己传过去的 value 做比较,如果是一样的才删除。
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

共享 Session 信息

在分布式场景下,一个用户的 Session 如果只存储在一个服务器上,那么当负载均衡器把用户的下一个请求转发到另一个服务器上,该服务器没有用户的 Session,就可能导致用户需要重新进行登录等操作。

分布式 Session 的几种实现策略:

  1. 粘性 session
  2. 应用服务器间的 session 复制共享
  3. 基于缓存的 session 共享 ✅

基于缓存的 session 共享实现

使用一个单独的存储服务器存储 Session 数据,可以存在 MySQL 数据库上,也可以存在 Redis 或者 Memcached 这种内存型数据库。

缺点:需要去实现存取 Session 的代码。

Hash

Hash 简介

Hash 是一个键值对(key - value)集合,其中 value 的形式如: value=[{field1,value1},...{fieldN,valueN}]。Hash 特别适合用于存储对象。

Hash 实现

哈希对象的编码可以是 ziplist 或者 hashtable

ziplist 编码的哈希对象使用压缩列表作为底层实现,每当有新的键值对要加入到哈希对象时, 程序会先将保存了键的压缩列表节点推入到压缩列表表尾, 然后再将保存了值的压缩列表节点推入到压缩列表表尾。

hashtable 编码的哈希对象使用字典作为底层实现, 哈希对象中的每个键值对都使用一个字典键值对来保存。

当哈希对象同时满足以下两个条件时, 使用 ziplist 编码;否则,使用 hashtable 编码。

  1. 哈希对象保存的所有键值对的键和值的字符串长度都小于 64 字节(可由 hash-max-ziplist-value 配置);
  2. 哈希对象保存的键值对数量小于 512 个(可由 hash-max-ziplist-entries 配置);

注意:这两个条件的上限值是可以修改的, 具体请看配置文件中关于 hash-max-ziplist-value 选项和 hash-max-ziplist-entries 选项的说明。

Hash 命令

命令 行为
HSET 将指定字段的值设为 value
HGET 获取指定字段的值
HGETALL 获取所有键值对
HMSET 设置多个键值对
HMGET 获取所有指定字段的值
HDEL 删除指定字段
HINCRBY 为指定字段的整数值加上增量
HKEYS 获取所有字段

更多命令请参考:Redis Hash 类型官方命令文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 存储一个哈希表 key 的键值
HSET key field value
# 获取哈希表 key 对应的 field 键值
HGET key field

# 在一个哈希表 key 中存储多个键值对
HMSET key field value [field value...]
# 批量获取哈希表 key 中多个 field 键值
HMGET key field [field ...]
# 删除哈希表 key 中的 field 键值
HDEL key field [field ...]

# 返回哈希表 key 中 field 的数量
HLEN key
# 返回哈希表 key 中所有的键值
HGETALL key

# 为哈希表 key 中 field 键的值加上增量 n
HINCRBY key field n

Hash 应用

Hash 类型适用于存储结构化数据

缓存对象

Hash 类型的(key,field,value)的结构与对象的(对象 id,属性,值)的结构相似,也可以用来存储对象。

我们以用户信息为例,它在关系型数据库中的结构是这样的:

我们可以使用如下命令,将用户对象的信息存储到 Hash 类型:

1
2
3
4
5
6
7
8
9
10
11
12
# 存储一个哈希表 uid:1 的键值
> HMSET uid:1 name Tom age 15
2
# 存储一个哈希表 uid:2 的键值
> HMSET uid:2 name Jerry age 13
2
# 获取哈希表用户 id 为 1 中所有的键值
> HGETALL uid:1
1) "name"
2) "Tom"
3) "age"
4) "15"

Redis Hash 存储其结构如下图:

在介绍 String 类型的应用场景时有所介绍,String + Json 也是存储对象的一种方式,那么存储对象时,到底用 String + json 还是用 Hash 呢?

一般对象用 String + Json 存储,对象中某些频繁变化的属性可以考虑抽出来用 Hash 类型存储。

购物车

【需求场景】

用户浏览电商平台,添加商品到购物车,并支持查看购物车。需要考虑未登录的情况。

【解决方案】

可以使用 HASH 类型来实现购物车功能。

以用户 session 为 key,存储了商品 ID 和商品数量的映射。其中,商品 id 为 field,商品数量为 value。

为什么不使用用户 ID?

因为很多场景下需要支持用户在免登陆的情况下使用购物车的,因为未登录,所以无法知道用户的用户 ID,这种情况下使用用户 session 更合适。并且由于绑定的是 session,可以在清空 session 时,顺便清空购物车缓存,更加方便。

维护购物车的常见操作如下:

  • 添加商品 - HSET cart:{session} {商品 id} 1
  • 添加数量 - HINCRBY cart:{session} {商品 id} 1
  • 商品总数 - HLEN cart:{session}
  • 删除商品 - HDEL cart:{session} {商品 id}
  • 获取购物车所有商品 - HGETALL cart:{session}

当前仅仅是将商品 ID 存储到了 Redis 中,在回显商品具体信息的时候,还需要拿着商品 id 查询一次数据库,获取完整的商品的信息。

List

Redis 中的 List 类型就是有序列表。

List 简介

List 列表是简单的字符串列表,按照插入顺序排序,可以从头部或尾部向 List 列表添加元素。

列表的最大长度为 2^32 - 1,也即每个列表支持超过 40 亿个元素。

List 实现

列表对象的编码可以是 ziplist 或者 linkedlist

ziplist 编码的列表对象使用压缩列表作为底层实现, 每个压缩列表节点(entry)保存了一个列表元素。

inkedlist 编码的列表对象使用双链表作为底层实现。

当列表对象可以同时满足以下两个条件时, 列表对象使用 ziplist 编码;否则,使用 linkedlist 编码

  1. 列表对象保存的所有字符串元素的长度都小于 64 字节;
  2. 列表对象保存的元素数量小于 512 个;

注意

以上两个条件的上限值是可以修改的, 具体请看配置文件中关于 list-max-ziplist-value 选项和 list-max-ziplist-entries 选项的说明。

List 命令

命令 行为
LPUSH 将给定值推入列表的右端。
RPUSH 将给定值推入列表的右端。
LPOP 从列表的左端弹出一个值,并返回被弹出的值。
RPOP 从列表的右端弹出一个值,并返回被弹出的值。
LRANGE 获取列表在给定范围上的所有值。
LINDEX 获取列表在给定位置上的单个元素。
LREM 从列表的左端弹出一个值,并返回被弹出的值。
LTRIM 只保留指定区间内的元素,删除其他元素。

更多命令请参考:Redis List 类型官方命令文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 将一个或多个值 value 插入到 key 列表的表头(最左边),最后的值在最前面
LPUSH key value [value ...]
# 将一个或多个值 value 插入到 key 列表的表尾(最右边)
RPUSH key value [value ...]
# 移除并返回 key 列表的头元素
LPOP key
# 移除并返回 key 列表的尾元素
RPOP key

# 返回列表 key 中指定区间内的元素,区间以偏移量 start 和 stop 指定,从 0 开始
LRANGE key start stop

# 从 key 列表表头弹出一个元素,没有就阻塞 timeout 秒,如果 timeout=0 则一直阻塞
BLPOP key [key ...] timeout
# 从 key 列表表尾弹出一个元素,没有就阻塞 timeout 秒,如果 timeout=0 则一直阻塞
BRPOP key [key ...] timeout

List 应用

消息队列

消息队列在存取消息时,必须要满足三个需求,分别是消息保序、处理重复的消息和保证消息可靠性

Redis 的 List 和 Stream 两种数据类型,就可以满足消息队列的这三个需求。我们先来了解下基于 List 的消息队列实现方法,后面在介绍 Stream 数据类型时候,在详细说说 Stream。

1、如何满足消息保序需求?

List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,就已经能满足消息保序的需求了。

List 可以使用 LPUSH + RPOP(或者反过来,RPUSH+LPOP)命令实现消息队列。

  • 生产者使用 LPUSH key value[value...] 将消息插入到队列的头部,如果 key 不存在则会创建一个空的队列再插入消息。

  • 消费者使用 RPOP key 依次读取队列的消息,先进先出。

不过,在消费者读取数据时,有一个潜在的性能风险点。

在生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,如果消费者想要及时处理消息,就需要在程序中不停地调用 RPOP 命令(比如使用一个 while(1) 循环)。如果有新消息写入,RPOP 命令就会返回结果,否则,RPOP 命令返回空值,再继续循环。

所以,即使没有新消息写入 List,消费者也要不停地调用 RPOP 命令,这就会导致消费者程序的 CPU 一直消耗在执行 RPOP 命令上,带来不必要的性能损失。

为了解决这个问题,Redis 提供了 BRPOP 命令。BRPOP 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用 RPOP 命令相比,这种方式能节省 CPU 开销。

2、如何处理重复的消息?

消费者要实现重复消息的判断,需要 2 个方面的要求:

  • 每个消息都有一个全局的 ID。
  • 消费者要记录已经处理过的消息的 ID。当收到一条消息后,消费者程序就可以对比收到的消息 ID 和记录的已处理过的消息 ID,来判断当前收到的消息有没有经过处理。如果已经处理过,那么,消费者程序就不再进行处理了。

但是 List 并不会为每个消息生成 ID 号,所以我们需要自行为每个消息生成一个全局唯一 ID,生成之后,我们在用 LPUSH 命令把消息插入 List 时,需要在消息中包含这个全局唯一 ID。

例如,我们执行以下命令,就把一条全局 ID 为 111000102、库存量为 99 的消息插入了消息队列:

1
2
> LPUSH mq "111000102:stock:99"
(integer) 1

3、如何保证消息可靠性?

当消费者程序从 List 中读取一条消息后,List 就不会再留存这条消息了。所以,如果消费者程序在处理消息的过程出现了故障或宕机,就会导致消息没有处理完成,那么,消费者程序再次启动后,就没法再次从 List 中读取消息了。

为了留存消息,List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存

这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。

好了,到这里可以知道基于 List 类型的消息队列,满足消息队列的三大需求(消息保序、处理重复的消息和保证消息可靠性)。

  • 消息保序:使用 LPUSH + RPOP;
  • 阻塞读取:使用 BRPOP;
  • 重复消息处理:生产者自行实现全局唯一 ID;
  • 消息的可靠性:使用 BRPOPLPUSH

List 作为消息队列有什么缺陷?

List 不支持多个消费者消费同一条消息,因为一旦消费者拉取一条消息后,这条消息就从 List 中删除了,无法被其它消费者再次消费。

要实现一条消息可以被多个消费者消费,那么就要将多个消费者组成一个消费组,使得多个消费者可以消费同一条消息,但是 List 类型并不支持消费组的实现

这就要说起 Redis 从 5.0 版本开始提供的 Stream 数据类型了,Stream 同样能够满足消息队列的三大需求,而且它还支持“消费组”形式的消息读取。

输入自动补全

【需求场景】

根据用户输入,自动补全信息,如:联系人、商品名等。

  • 典型场景一 - 社交网站后台记录用户最近联系过的 100 个好友,当用户查找好友时,根据输入的关键字自动补全姓名。
  • 典型场景二 - 电商网站后台记录用户最近浏览过的 10 件商品,当用户查找商品是,根据输入的关键字自动补全商品名称。

【解决方案】

使用 Redis 的 List 类型存储一个最近信息列表,然后在需要自动补全信息时展示相应数量的数据。

维护最近信息列表的常见操作如下:

  • 如果指定信息已经存在于最近信息列表里,那么从列表里移除。使用 LREM 命令。
  • 将指定信息添加到最近信息列表的头部。使用 LPUSH 命令。
  • 添加操作完成后,如果最近信息列表中的数量超过上限 N,进行裁剪操作。使用 LTRIM 命令。

Set

Redis 中的 Set 类型就是无序且去重的集合。

Set 简介

Set 类型是一个无序并唯一的键值集合,它的存储顺序不会按照插入的先后顺序进行存储。

一个集合最多可以存储 2^32-1 个元素。概念和数学中个的集合基本类似,可以交集,并集,差集等等,所以 Set 类型除了支持集合内的增删改查,同时还支持多个集合取交集、并集、差集。

Set 类型和 List 类型的区别如下:

  • List 可以存储重复元素,Set 只能存储非重复元素;
  • List 是按照元素的先后顺序存储元素的,而 Set 则是无序方式存储元素的。

Set 实现

集合对象的编码可以是 intset 或者 hashtable

intset 编码的集合对象使用整数集合作为底层实现, 集合对象包含的所有元素都被保存在整数集合里面。

hashtable 编码的集合对象使用字典作为底层实现, 字典的每个键都是一个字符串对象, 每个字符串对象包含了一个集合元素, 而字典的值则全部被设置为 NULL

当集合对象可以同时满足以下两个条件时,集合对象使用 intset 编码;否则,使用 hashtable 编码:

  1. 集合对象保存的所有元素都是整数值;
  2. 集合对象保存的元素数量不超过 512 个;

注意:第二个条件的上限值是可以修改的, 具体请看配置文件中关于 set-max-intset-entries 选项的说明。

Set 命令

命令 行为
SADD 将给定元素添加到集合。
SMEMBERS 返回集合包含的所有元素。
SISMEMBER 检查给定元素是否存在于集合中。
SREM 如果给定的元素存在于集合中,那么移除这个元素。

更多命令请参考:Redis Set 类型官方命令文档

Set 常用操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 往集合 key 中存入元素,元素存在则忽略,若 key 不存在则新建
SADD key member [member ...]
# 从集合 key 中删除元素
SREM key member [member ...]
# 获取集合 key 中所有元素
SMEMBERS key
# 获取集合 key 中的元素个数
SCARD key

# 判断 member 元素是否存在于集合 key 中
SISMEMBER key member

# 从集合 key 中随机选出 count 个元素,元素不从 key 中删除
SRANDMEMBER key [count]
# 从集合 key 中随机选出 count 个元素,元素从 key 中删除
SPOP key [count]

Set 运算操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 交集运算
SINTER key [key ...]
# 将交集结果存入新集合 destination 中
SINTERSTORE destination key [key ...]

# 并集运算
SUNION key [key ...]
# 将并集结果存入新集合 destination 中
SUNIONSTORE destination key [key ...]

# 差集运算
SDIFF key [key ...]
# 将差集结果存入新集合 destination 中
SDIFFSTORE destination key [key ...]

Set 应用

集合的主要几个特性,无序、不可重复、支持并交差等操作。

因此 Set 类型比较适合用来数据去重和保障数据的唯一性,还可以用来统计多个集合的交集、错集和并集等,当我们存储的数据是无序并且需要去重的情况下,比较适合使用集合类型进行存储。

但是要提醒你一下,这里有一个潜在的风险。Set 的差集、并集和交集的计算复杂度较高,在数据量较大的情况下,如果直接执行这些计算,会导致 Redis 实例阻塞

在主从集群中,为了避免主库因为 Set 做聚合计算(交集、差集、并集)时导致主库被阻塞,我们可以选择一个从库完成聚合统计,或者把数据返回给客户端,由客户端来完成聚合统计。

点赞

Set 类型可以保证一个用户只能点一个赞,这里举例子一个场景,key 是文章 id,value 是用户 id。

uid:1uid:2uid:3 三个用户分别对 article:1 文章点赞了。

1
2
3
4
5
6
7
8
9
# uid:1 用户对文章 article:1 点赞
> SADD article:1 uid:1
(integer) 1
# uid:2 用户对文章 article:1 点赞
> SADD article:1 uid:2
(integer) 1
# uid:3 用户对文章 article:1 点赞
> SADD article:1 uid:3
(integer) 1

uid:1 取消了对 article:1 文章点赞。

1
2
> SREM article:1 uid:1
(integer) 1

获取 article:1 文章所有点赞用户 :

1
2
3
> SMEMBERS article:1
1) "uid:3"
2) "uid:2"

获取 article:1 文章的点赞用户数量:

1
2
> SCARD article:1
(integer) 2

判断用户 uid:1 是否对文章 article:1 点赞了:

1
2
> SISMEMBER article:1 uid:1
(integer) 0 # 返回 0 说明没点赞,返回 1 则说明点赞了

共同关注

Set 类型支持交集运算,所以可以用来计算共同关注的好友、公众号等。

key 可以是用户 id,value 则是已关注的公众号的 id。

uid:1 用户关注公众号 id 为 5、6、7、8、9,uid:2 用户关注公众号 id 为 7、8、9、10、11。

1
2
3
4
5
6
# uid:1 用户关注公众号 id 为 5、6、7、8、9
> SADD uid:1 5 6 7 8 9
(integer) 5
# uid:2 用户关注公众号 id 为 7、8、9、10、11
> SADD uid:2 7 8 9 10 11
(integer) 5

uid:1uid:2 共同关注的公众号:

1
2
3
4
5
# 获取共同关注
> SINTER uid:1 uid:2
1) "7"
2) "8"
3) "9"

uid:2 推荐 uid:1 关注的公众号:

1
2
3
> SDIFF uid:1 uid:2
1) "5"
2) "6"

验证某个公众号是否同时被 uid:1uid:2 关注:

1
2
3
4
> SISMEMBER uid:1 5
(integer) 1 # 返回 1,说明关注了
> SISMEMBER uid:2 5
(integer) 0 # 返回 0,说明没关注

抽奖活动

存储某活动中中奖的用户名,Set 类型因为有去重功能,可以保证同一个用户不会中奖两次。

key 为抽奖活动名,value 为员工名称,把所有员工名称放入抽奖箱:

1
2
> SADD lucky Tom Jerry John Sean Marry Lindy Sary Mark
(integer) 5

如果允许重复中奖,可以使用 SRANDMEMBER 命令。

1
2
3
4
5
6
7
8
9
10
11
12
# 抽取 1 个一等奖:
> SRANDMEMBER lucky 1
1) "Tom"
# 抽取 2 个二等奖:
> SRANDMEMBER lucky 2
1) "Mark"
2) "Jerry"
# 抽取 3 个三等奖:
> SRANDMEMBER lucky 3
1) "Sary"
2) "Tom"
3) "Jerry"

如果不允许重复中奖,可以使用 SPOP 命令。

1
2
3
4
5
6
7
8
9
10
11
12
# 抽取一等奖 1 个
> SPOP lucky 1
1) "Sary"
# 抽取二等奖 2 个
> SPOP lucky 2
1) "Jerry"
2) "Mark"
# 抽取三等奖 3 个
> SPOP lucky 3
1) "John"
2) "Sean"
3) "Lindy"

Zset

Redis 中的 Zset 类型就是有序且去重的集合。

Zset 简介

Zset 类型(有序集合类型)相比于 Set 类型多了一个排序属性 score(分值),对于有序集合 ZSet 来说,每个存储元素相当于有两个值组成的,一个是有序结合的元素值,一个是排序值。

有序集合保留了集合不能有重复成员的特性(分值可以重复),但不同的是,有序集合中的元素可以排序。

Zset 实现

有序集合的编码可以是 ziplist 或者 skiplist

ziplist 编码的有序集合对象使用压缩列表作为底层实现, 每个集合元素使用两个紧挨在一起的压缩列表节点来保存, 第一个节点保存元素的成员(member), 而第二个元素则保存元素的分值(score)。压缩列表内的集合元素按分值从小到大进行排序, 分值较小的元素被放置在靠近表头的方向, 而分值较大的元素则被放置在靠近表尾的方向。

skiplist 编码的有序集合对象使用 zset 结构作为底层实现, 一个 zset 结构同时包含一个字典和一个跳跃表

1
2
3
4
5
6
7
typedef struct zset {

zskiplist *zsl;

dict *dict;

} zset;

zset 结构中的 zsl 跳跃表按分值从小到大保存了所有集合元素, 每个跳跃表节点都保存了一个集合元素: 跳跃表节点的 object 属性保存了元素的成员, 而跳跃表节点的 score 属性则保存了元素的分值。 通过这个跳跃表, 程序可以对有序集合进行范围型操作, 比如 ZRANK 、 ZRANGE 等命令就是基于跳跃表 API 来实现的。

除此之外, zset 结构中的 dict 字典为有序集合创建了一个从成员到分值的映射, 字典中的每个键值对都保存了一个集合元素: 字典的键保存了元素的成员, 而字典的值则保存了元素的分值。 通过这个字典, 程序可以用 O(1) 复杂度查找给定成员的分值, ZSCORE 命令就是根据这一特性实现的, 而很多其他有序集合命令都在实现的内部用到了这一特性。

有序集合每个元素的成员都是一个字符串对象, 而每个元素的分值都是一个 double 类型的浮点数。 值得一提的是, 虽然 zset 结构同时使用跳跃表和字典来保存有序集合元素, 但这两种数据结构都会通过指针来共享相同元素的成员和分值, 所以同时使用跳跃表和字典来保存集合元素不会产生任何重复成员或者分值, 也不会因此而浪费额外的内存。

当有序集合对象可以同时满足以下两个条件时,有序集合对象使用 ziplist 编码;否则,使用 skiplist 编码。

  • 有序集合保存的元素数量小于 128 个;
  • 有序集合保存的所有元素成员的长度都小于 64 字节;

注意:以上两个条件的上限值是可以修改的, 具体请看配置文件中关于 zset-max-ziplist-entries 选项和 zset-max-ziplist-value 选项的说明。

在 Redis 7.0 中,压缩列表数据结构已经废弃了,交由 listpack 数据结构来实现了。

Zset 命令

命令 行为
ZADD 将一个带有给定分值的成员添加到有序集合里面
ZRANGE 顺序排序,并返回指定排名区间的成员
ZREVRANGE 反序排序,并返回指定排名区间的成员
ZRANGEBYSCORE 顺序排序,并返回指定排名区间的成员及其分值
ZREVRANGEBYSCORE 反序排序,并返回指定排名区间的成员及其分值
ZREM 移除指定的成员
ZSCORE 返回指定成员的分值
ZCARD 返回所有成员数

更多命令请参考:Redis ZSet 类型官方命令文档

Zset 常用操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 往有序集合 key 中加入带分值元素
ZADD key score member [[score member]...]
# 往有序集合 key 中删除元素
ZREM key member [member...]
# 返回有序集合 key 中元素 member 的分值
ZSCORE key member
# 返回有序集合 key 中元素个数
ZCARD key

# 为有序集合 key 中元素 member 的分值加上 increment
ZINCRBY key increment member

# 正序获取有序集合 key 从 start 下标到 stop 下标的元素
ZRANGE key start stop [WITHSCORES]
# 倒序获取有序集合 key 从 start 下标到 stop 下标的元素
ZREVRANGE key start stop [WITHSCORES]

# 返回有序集合中指定分数区间内的成员,分数由低到高排序。
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]

# 返回指定成员区间内的成员,按字典正序排列,分数必须相同。
ZRANGEBYLEX key min max [LIMIT offset count]
# 返回指定成员区间内的成员,按字典倒序排列,分数必须相同
ZREVRANGEBYLEX key max min [LIMIT offset count]

Zset 运算操作(相比于 Set 类型,ZSet 类型没有支持差集运算):

1
2
3
4
# 并集计算(相同元素分值相加),numberkeys 一共多少个 key,WEIGHTS 每个 key 对应的分值乘积
ZUNIONSTORE destkey numberkeys key [key...]
# 交集计算(相同元素分值相加),numberkeys 一共多少个 key,WEIGHTS 每个 key 对应的分值乘积
ZINTERSTORE destkey numberkeys key [key...]

Zset 应用

Zset 类型(Sorted Set,有序集合)可以根据元素的权重来排序,我们可以自己来决定每个元素的权重值。比如说,我们可以根据元素插入 Sorted Set 的时间确定权重值,先插入的元素权重小,后插入的元素权重大。

在面对需要展示最新列表、排行榜等场景时,如果数据更新频繁或者需要分页显示,可以优先考虑使用 Sorted Set。

排行榜

【需求场景】

各种排行榜,如:内容平台(视频、歌曲、文章)的播放量/收藏量/评分排行榜;电商网站的销售排行榜;

【解决方案】

有序集合比较典型的使用场景就是排行榜。例如学生成绩的排名榜、游戏积分排行榜、视频播放排名、电商系统中商品的销量排名等。

我们以博文点赞排名为例,dunwu 发表了五篇博文,分别获得赞为 200、40、100、50、150。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# article:1 文章获得了 200 个赞
> ZADD user:dunwu:ranking 200 article:1
(integer) 1
# article:2 文章获得了 40 个赞
> ZADD user:dunwu:ranking 40 article:2
(integer) 1
# article:3 文章获得了 100 个赞
> ZADD user:dunwu:ranking 100 article:3
(integer) 1
# article:4 文章获得了 50 个赞
> ZADD user:dunwu:ranking 50 article:4
(integer) 1
# article:5 文章获得了 150 个赞
> ZADD user:dunwu:ranking 150 article:5
(integer) 1

文章 article:4 新增一个赞,可以使用 ZINCRBY 命令(为有序集合 key 中元素 member 的分值加上 increment):

1
2
> ZINCRBY user:dunwu:ranking 1 article:4
"51"

查看某篇文章的赞数,可以使用 ZSCORE 命令(返回有序集合 key 中元素个数):

1
2
> ZSCORE user:dunwu:ranking article:4
"50"

获取 dunwu 文章赞数最多的 3 篇文章,可以使用 ZREVRANGE 命令(倒序获取有序集合 key 从 start 下标到 stop 下标的元素):

1
2
3
4
5
6
7
8
# WITHSCORES 表示把 score 也显示出来
> ZREVRANGE user:dunwu:ranking 0 2 WITHSCORES
1) "article:1"
2) "200"
3) "article:5"
4) "150"
5) "article:3"
6) "100"

获取 dunwu 100 赞到 200 赞的文章,可以使用 ZRANGEBYSCORE 命令(返回有序集合中指定分数区间内的成员,分数由低到高排序):

1
2
3
4
5
6
7
> ZRANGEBYSCORE user:dunwu:ranking 100 200 WITHSCORES
1) "article:3"
2) "100"
3) "article:5"
4) "150"
5) "article:1"
6) "200"

前缀排序

使用有序集合的 ZRANGEBYLEXZREVRANGEBYLEX 可以帮助我们实现电话号码或姓名的排序,我们以 ZRANGEBYLEX (返回指定成员区间内的成员,按 key 正序排列,分数必须相同)为例。

注意:不要在分数不一致的 SortSet 集合中去使用 ZRANGEBYLEX 和 ZREVRANGEBYLEX 指令,因为获取的结果会不准确。

1、电话排序

我们可以将电话号码存储到 SortSet 中,然后根据需要来获取号段:

1
2
3
4
5
6
> ZADD phone 0 13100111100 0 13110114300 0 13132110901
(integer) 3
> ZADD phone 0 13200111100 0 13210414300 0 13252110901
(integer) 3
> ZADD phone 0 13300111100 0 13310414300 0 13352110901
(integer) 3

获取所有号码:

1
2
3
4
5
6
7
8
9
10
> ZRANGEBYLEX phone - +
1) "13100111100"
2) "13110114300"
3) "13132110901"
4) "13200111100"
5) "13210414300"
6) "13252110901"
7) "13300111100"
8) "13310414300"
9) "13352110901"

获取 132 号段的号码:

1
2
3
4
> ZRANGEBYLEX phone [132 (133
1) "13200111100"
2) "13210414300"
3) "13252110901"

获取 132、133 号段的号码:

1
2
3
4
5
6
7
> ZRANGEBYLEX phone [132 (134
1) "13200111100"
2) "13210414300"
3) "13252110901"
4) "13300111100"
5) "13310414300"
6) "13352110901"

2、姓名排序

1
2
> zadd names 0 Toumas 0 Jake 0 Bluetuo 0 Gaodeng 0 Aimini 0 Aidehua
(integer) 6

获取所有人的名字:

1
2
3
4
5
6
7
> ZRANGEBYLEX names - +
1) "Aidehua"
2) "Aimini"
3) "Bluetuo"
4) "Gaodeng"
5) "Jake"
6) "Toumas"

获取名字中大写字母 A 开头的所有人:

1
2
3
> ZRANGEBYLEX names [A (B
1) "Aidehua"
2) "Aimini"

获取名字中大写字母 C 到 Z 的所有人:

1
2
3
4
> ZRANGEBYLEX names [C [Z
1) "Gaodeng"
2) "Jake"
3) "Toumas"

总结

Redis 常见的五种数据类型:String(字符串),Hash(哈希),List(列表),Set(集合)及 Zset(sorted set:有序集合)

这五种数据类型都由多种数据结构实现的,主要是出于时间和空间的考虑,当数据量小的时候使用更简单的数据结构,有利于节省内存,提高性能。

可以看到,Redis 数据类型的底层数据结构随着版本的更新也有所不同,比如:

  • 在 Redis 3.0 版本中 List 对象的底层数据结构由“双向链表”或“压缩表列表”实现,但是在 3.2 版本之后,List 数据类型底层数据结构是由 quicklist 实现的;
  • 在最新的 Redis 代码中,压缩列表数据结构已经废弃了,交由 listpack 数据结构来实现了。

Redis 五种数据类型的应用场景:

  • String 类型的应用场景:缓存对象、分布式锁、共享 session、计数器、限流、分布式 ID 等。
  • List 类型的应用场景:消息队列(有两个问题:1. 生产者需要自行实现全局唯一 ID;2. 不能以消费组形式消费数据)、输入自动补全等。
  • Hash 类型:缓存对象、购物车等。
  • Set 类型:聚合计算(并集、交集、差集)场景,比如点赞、共同关注、抽奖活动等。
  • Zset 类型:排序场景,比如排行榜、电话和姓名排序等。

Redis 后续版本又支持四种数据类型,它们的应用场景如下:

  • BitMap(2.2 版新增):二值状态统计的场景,比如签到、判断用户登陆状态、连续签到用户总数等;
  • HyperLogLog(2.8 版新增):海量数据基数统计的场景,比如百万级网页 UV 计数等;
  • GEO(3.2 版新增):存储地理位置信息的场景,比如滴滴叫车;
  • Stream(5.0 版新增):消息队列,相比于基于 List 类型实现的消息队列,有这两个特有的特性:自动生成全局唯一消息 ID,支持以消费组形式消费数据。

针对 Redis 是否适合做消息队列,关键看你的业务场景:

  • 如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。
  • 如果你的业务有海量消息,消息积压的概率比较大,并且不能接受数据丢失,那么还是用专业的消息队列中间件吧。

参考资料

Redis 高级数据类型

关键词:BitMapHyperLogLogGeoStream

Redis 支持的高级数据类型:BitMap、HyperLogLog、GEO、Stream

使用 Redis ,不仅要了解其数据类型的特性,还需要根据业务场景,灵活的、高效的使用其数据类型来建模。

BitMap

BitMap 简介

Bitmap,即位图,是一串连续的二进制数组(0 和 1),可以通过偏移量(offset)定位元素。由于 bit 是计算机中最小的单位,使用它进行储存将非常节省空间,特别适合一些数据量大且使用二值统计的场景。例如在一个系统中,不同的用户使用单调递增的用户 ID 表示。40 亿($$2^{32}$$ = $$410241024*1024$$ ≈ 40 亿)用户只需要 512M 内存就能记住某种状态,例如用户是否已登录。

BitMap 实现

实际上,BitMap 不是真实的数据结构,而是基于 String 实现的一组位操作

由于 STRING 是二进制安全的,并且其最大长度是 512 MB,所以 BitMap 能最大设置 $$2^{32}$$ 个不同的 bit。

BitMap 命令

命令 行为
SETBIT 对 key 所储存的字符串值,设置或清除指定偏移量上的位(bit)
GETBIT 对 key 所储存的字符串值,获取指定偏移量上的位(bit)
BITOP 对一个或多个字符串执行位运算

【示例】SETBIT、GETBIT 操作

假设有 1000 个传感器,标记为 0-999。现在,想要快速确定某传感器是否在一小时内对服务器执行了 ping 操作。

1
2
3
4
5
6
7
8
9
# 传感器 123 在 2024 年 1 月 1 日 00:00 内对服务器执行 ping 操作
> SETBIT pings:2024-01-01-00:00 123 1
(integer) 0
# 传感器 123 是否在 2024 年 1 月 1 日 00:00 内对服务器执行 ping 操作
> GETBIT pings:2024-01-01-00:00 123
1
What about sensor 456?
> GETBIT pings:2024-01-01-00:00 456
0

【示例】BITOP 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
# BitMap间的运算
# operations 位移操作符,枚举值
AND 与运算 &
OR 或运算 |
XOR 异或 ^
NOT 取反 ~
# result 计算的结果,会存储在该key中
# key1 … keyn 参与运算的key,可以有多个,空格分割,not运算只能一个key
# 当 BITOP 处理不同长度的字符串时,较短的那个字符串所缺少的部分会被看作 0。返回值是保存到 destkey 的字符串的长度(以字节byte为单位),和输入 key 中最长的字符串长度相等。
BITOP [operations] [result] [key1] [keyn…]

# 返回指定key中第一次出现指定value(0/1)的位置
BITPOS [key] [value]

BitMap 应用

Bitmap 类型非常适合二值状态统计的场景,这里的二值状态就是指集合元素的取值就只有 0 和 1 两种,在记录海量数据时,Bitmap 能够有效地节省内存空间。

签到统计

在签到打卡的场景中,我们只用记录签到(1)或未签到(0),所以它就是非常典型的二值状态。

签到统计时,每个用户一天的签到用 1 个 bit 位就能表示,一个月(假设是 31 天)的签到情况用 31 个 bit 位就可以,而一年的签到也只需要用 365 个 bit 位,根本不用太复杂的集合类型。

假设我们要统计 ID 100 的用户在 2022 年 6 月份的签到情况,就可以按照下面的步骤进行操作。

第一步,执行下面的命令,记录该用户 6 月 3 号已签到。

1
SETBIT uid:sign:100:202206 2 1

第二步,检查该用户 6 月 3 日是否签到。

1
GETBIT uid:sign:100:202206 2

第三步,统计该用户在 6 月份的签到次数。

1
BITCOUNT uid:sign:100:202206

这样,我们就知道该用户在 6 月份的签到情况了。

如何统计这个月首次打卡时间呢?

Redis 提供了 BITPOS key bitValue [start] [end]指令,返回数据表示 Bitmap 中第一个值为 bitValue 的 offset 位置。

在默认情况下,命令将检测整个位图,用户可以通过可选的 start 参数和 end 参数指定要检测的范围。所以我们可以通过执行这条命令来获取 userID = 100 在 2022 年 6 月份首次打卡日期:

1
BITPOS uid:sign:100:202206 1

需要注意的是,因为 offset 从 0 开始的,所以我们需要将返回的 value + 1。

判断用户是否登录

Bitmap 提供了 GETBIT、SETBIT 操作,通过一个偏移值 offset 对 bit 数组的 offset 位置的 bit 位进行读写操作,需要注意的是 offset 从 0 开始。

只需要一个 key = login_status 表示存储用户登陆状态集合数据,将用户 ID 作为 offset,在线就设置为 1,下线设置 0。通过 GETBIT判断对应的用户是否在线。50000 万 用户只需要 6 MB 的空间。

假如我们要判断 ID = 10086 的用户的登陆情况:

第一步,执行以下指令,表示用户已登录。

1
SETBIT login_status 10086 1

第二步,检查该用户是否登陆,返回值 1 表示已登录。

1
GETBIT login_status 10086

第三步,登出,将 offset 对应的 value 设置成 0。

1
SETBIT login_status 10086 0

连续签到用户总数

如何统计出这连续 7 天连续打卡用户总数呢?

我们把每天的日期作为 Bitmap 的 key,userId 作为 offset,若是打卡则将 offset 位置的 bit 设置成 1。

key 对应的集合的每个 bit 位的数据则是一个用户在该日期的打卡记录。

一共有 7 个这样的 Bitmap,如果我们能对这 7 个 Bitmap 的对应的 bit 位做 “与”运算。同样的 UserID offset 都是一样的,当一个 userID 在 7 个 Bitmap 对应对应的 offset 位置的 bit = 1 就说明该用户 7 天连续打卡。

结果保存到一个新 Bitmap 中,我们再通过 BITCOUNT 统计 bit = 1 的个数便得到了连续打卡 7 天的用户总数了。

Redis 提供了 BITOP operation destkey key [key ...]这个指令用于对一个或者多个 key 的 Bitmap 进行位元操作。

  • operation 可以是 andORNOTXOR。当 BITOP 处理不同长度的字符串时,较短的那个字符串所缺少的部分会被看作 0 。空的 key 也被看作是包含 0 的字符串序列。

假设要统计 3 天连续打卡的用户数,则是将三个 bitmap 进行 AND 操作,并将结果保存到 destmap 中,接着对 destmap 执行 BITCOUNT 统计,如下命令:

1
2
3
4
# 与操作
BITOP AND destmap bitmap:01 bitmap:02 bitmap:03
# 统计 bit 位 = 1 的个数
BITCOUNT destmap

即使一天产生一个亿的数据,Bitmap 占用的内存也不大,大约占 12 MB 的内存(10^8/8/1024/1024),7 天的 Bitmap 的内存开销约为 84 MB。同时我们最好给 Bitmap 设置过期时间,让 Redis 删除过期的打卡数据,节省内存。

HyperLogLog

HyperLogLog 简介

Redis HyperLogLog 是 Redis 2.8.9 版本新增的数据类型,是一种用于“统计基数”的数据集合类型,基数统计就是指统计一个集合中不重复的元素个数。但要注意,**HyperLogLog 是统计规则是基于概率完成的,不是非常准确,标准误算率是 0.81%**。

所以,简单来说 HyperLogLog 提供不精确的去重计数

HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的内存空间总是固定的、并且是很小的。

在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数,和元素越多就越耗费内存的 Set 和 Hash 类型相比,HyperLogLog 就非常节省空间。

这什么概念?举个例子给大家对比一下。

用 Java 语言来说,一般 long 类型占用 8 字节,而 1 字节有 8 位,即:1 byte = 8 bit,即 long 数据类型最大可以表示的数是:2^63-1。对应上面的2^64个数,假设此时有2^63-1这么多个数,从 0 ~ 2^63-1,按照long以及1k = 1024 字节的规则来计算内存总数,就是:((2^63-1) * 8/1024)K,这是很庞大的一个数,存储空间远远超过12K,而 HyperLogLog 却可以用 12K 就能统计完。

HyperLogLog 实现

HyperLogLog 的实现涉及到很多数学问题,太费脑子了,我也没有搞懂,如果你想了解一下,课下可以看看这个:HyperLogLog

HyperLogLog 命令

HyperLogLog 命令很少,就三个。

1
2
3
4
5
6
7
8
# 添加指定元素到 HyperLogLog 中
PFADD key element [element ...]

# 返回给定 HyperLogLog 的基数估算值。
PFCOUNT key [key ...]

# 将多个 HyperLogLog 合并为一个 HyperLogLog
PFMERGE destkey sourcekey [sourcekey ...]

HyperLogLog 应用

百万级网页 UV 计数

Redis HyperLogLog 优势在于只需要花费 12 KB 内存,就可以计算接近 2^64 个元素的基数,和元素越多就越耗费内存的 Set 和 Hash 类型相比,HyperLogLog 就非常节省空间。

所以,非常适合统计百万级以上的网页 UV 的场景。

在统计 UV 时,你可以用 PFADD 命令(用于向 HyperLogLog 中添加新元素)把访问页面的每个用户都添加到 HyperLogLog 中。

1
PFADD page1:uv user1 user2 user3 user4 user5

接下来,就可以用 PFCOUNT 命令直接获得 page1 的 UV 值了,这个命令的作用就是返回 HyperLogLog 的统计结果。

1
PFCOUNT page1:uv

不过,有一点需要你注意一下,HyperLogLog 的统计规则是基于概率完成的,所以它给出的统计结果是有一定误差的,标准误算率是 0.81%。

这也就意味着,你使用 HyperLogLog 统计的 UV 是 100 万,但实际的 UV 可能是 101 万。虽然误差率不算大,但是,如果你需要精确统计结果的话,最好还是继续用 Set 或 Hash 类型。

GEO

GEO 简介

Redis GEO 是 Redis 3.2 版本新增的数据类型,主要用于存储地理位置信息,并对存储的信息进行操作。

在日常生活中,我们越来越依赖搜索“附近的餐馆”、在打车软件上叫车,这些都离不开基于位置信息服务(Location-Based Service,LBS)的应用。LBS 应用访问的数据是和人或物关联的一组经纬度信息,而且要能查询相邻的经纬度范围,GEO 就非常适合应用在 LBS 服务的场景中。

GEO 实现

GEO 本身并没有设计新的底层数据结构,而是直接使用了 Zset 类型。

GEO 类型使用 GeoHash 编码方法实现了经纬度到 Sorted Set 中元素权重分数的转换,这其中的两个关键机制就是“对二维地图做区间划分”和“对区间进行编码”。一组经纬度落在某个区间后,就用区间的编码值来表示,并把编码值作为 Sorted Set 元素的权重分数。

这样一来,我们就可以把经纬度保存到 Sorted Set 中,利用 Sorted Set 提供的“按权重进行有序范围查找”的特性,实现 LBS 服务中频繁使用的“搜索附近”的需求。

GEO 命令

1
2
3
4
5
6
7
8
9
10
11
# 存储指定的地理空间位置,可以将一个或多个经度(longitude)、纬度(latitude)、位置名称(member)添加到指定的 key 中。
GEOADD key longitude latitude member [longitude latitude member ...]

# 从给定的 key 里返回所有指定名称(member)的位置(经度和纬度),不存在的返回 nil。
GEOPOS key member [member ...]

# 返回两个给定位置之间的距离。
GEODIST key member1 member2 [m|km|ft|mi]

# 根据用户给定的经纬度坐标来获取指定范围内的地理位置集合。
GEORADIUS key longitude latitude radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC] [STORE key] [STOREDIST key]

GEO 应用

滴滴叫车

这里以滴滴叫车的场景为例,介绍下具体如何使用 GEO 命令:GEOADD 和 GEORADIUS 这两个命令。

假设车辆 ID 是 33,经纬度位置是(116.034579,39.030452),我们可以用一个 GEO 集合保存所有车辆的经纬度,集合 key 是 cars:locations。

执行下面的这个命令,就可以把 ID 号为 33 的车辆的当前经纬度位置存入 GEO 集合中:

1
GEOADD cars:locations 116.034579 39.030452 33

当用户想要寻找自己附近的网约车时,LBS 应用就可以使用 GEORADIUS 命令。

例如,LBS 应用执行下面的命令时,Redis 会根据输入的用户的经纬度信息(116.054579,39.030452),查找以这个经纬度为中心的 5 公里内的车辆信息,并返回给 LBS 应用。

1
GEORADIUS cars:locations 116.054579 39.030452 5 km ASC COUNT 10

Stream

Stream 简介

Redis Stream 是 Redis 5.0 版本新增加的数据类型,Redis 专门为消息队列设计的数据类型。

在 Redis 5.0 Stream 没出来之前,消息队列的实现方式都有着各自的缺陷,例如:

  • 发布订阅模式,不能持久化也就无法可靠的保存消息,并且对于离线重连的客户端不能读取历史消息的缺陷;
  • List 实现消息队列的方式不能重复消费,一个消息消费完就会被删除,而且生产者需要自行实现全局唯一 ID。

基于以上问题,Redis 5.0 便推出了 Stream 类型也是此版本最重要的功能,用于完美地实现消息队列,它支持消息的持久化、支持自动生成全局唯一 ID、支持 ack 确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠。

Stream 命令

Stream 消息队列操作命令:

  • XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
  • XLEN:查询消息长度;
  • XREAD:用于读取消息,可以按 ID 读取数据;
  • XDEL:根据消息 ID 删除消息;
  • DEL:删除整个 Stream;
  • XRANGE:读取区间消息
  • XREADGROUP:按消费组形式读取消息;
  • XPENDING 和 XACK:
    • XPENDING 命令可以用来查询每个消费组内所有消费者“已读取、但尚未确认”的消息;
    • XACK 命令用于向消息队列确认消息处理已完成;

Stream 应用

消息队列

生产者通过 XADD 命令插入一条消息:

1
2
3
4
# * 表示让 Redis 为插入的数据自动生成一个全局唯一的 ID
# 往名称为 mymq 的消息队列中插入一条消息,消息的键是 name,值是 xiaolin
> XADD mymq * name xiaolin
"1654254953808-0"

插入成功后会返回全局唯一的 ID:”1654254953808-0”。消息的全局唯一 ID 由两部分组成:

  • 第一部分“1654254953808”是数据插入时,以毫秒为单位计算的当前服务器时间;
  • 第二部分表示插入消息在当前毫秒内的消息序号,这是从 0 开始编号的。例如,“1654254953808-0”就表示在“1654254953808”毫秒内的第 1 条消息。

消费者通过 XREAD 命令从消息队列中读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取(注意是输入消息 ID 的下一条信息开始读取,不是查询输入 ID 的消息)。

1
2
3
4
5
6
# 从 ID 号为 1654254953807-0 的消息开始,读取后续的所有消息(示例中一共 1 条)。
> XREAD STREAMS mymq 1654254953807-0
1) 1) "mymq"
2) 1) 1) "1654254953808-0"
2) 1) "name"
2) "xiaolin"

如果想要实现阻塞读(当没有数据时,阻塞住),可以调用 XRAED 时设定 BLOCK 配置项,实现类似于 BRPOP 的阻塞读取操作。

比如,下面这命令,设置了 BLOCK 10000 的配置项,10000 的单位是毫秒,表明 XREAD 在读取最新消息时,如果没有消息到来,XREAD 将阻塞 10000 毫秒(即 10 秒),然后再返回。

1
2
3
4
# 命令最后的“$”符号表示读取最新的消息
> XREAD BLOCK 10000 STREAMS mymq $
(nil)
(10.00s)

Stream 的基础方法,使用 xadd 存入消息和 xread 循环阻塞读取消息的方式可以实现简易版的消息队列,交互流程如下图所示:

前面介绍的这些操作 List 也支持的,接下来看看 Stream 特有的功能。

Stream 可以以使用 XGROUP 创建消费组,创建消费组之后,Stream 可以使用 XREADGROUP 命令让消费组内的消费者读取消息。

创建两个消费组,这两个消费组消费的消息队列是 mymq,都指定从第一条消息开始读取:

1
2
3
4
5
6
# 创建一个名为 group1 的消费组,0-0 表示从第一条消息开始读取。
> XGROUP CREATE mymq group1 0-0
OK
# 创建一个名为 group2 的消费组,0-0 表示从第一条消息开始读取。
> XGROUP CREATE mymq group2 0-0
OK

消费组 group1 内的消费者 consumer1 从 mymq 消息队列中读取所有消息的命令如下:

1
2
3
4
5
6
# 命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。
> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
1) 1) "mymq"
2) 1) 1) "1654254953808-0"
2) 1) "name"
2) "xiaolin"

消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了,即同一个消费组里的消费者不能消费同一条消息

比如说,我们执行完刚才的 XREADGROUP 命令后,再执行一次同样的命令,此时读到的就是空值了:

1
2
> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
(nil)

但是,不同消费组的消费者可以消费同一条消息(但是有前提条件,创建消息组的时候,不同消费组指定了相同位置开始读取消息)

比如说,刚才 group1 消费组里的 consumer1 消费者消费了一条 id 为 1654254953808-0 的消息,现在用 group2 消费组里的 consumer1 消费者消费消息:

1
2
3
4
5
> XREADGROUP GROUP group2 consumer1 STREAMS mymq >
1) 1) "mymq"
2) 1) 1) "1654254953808-0"
2) 1) "name"
2) "xiaolin"

因为我创建两组的消费组都是从第一条消息开始读取,所以可以看到第二组的消费者依然可以消费 id 为 1654254953808-0 的这一条消息。因此,不同的消费组的消费者可以消费同一条消息。

使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。

例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 让 group2 中的 consumer1 从 mymq 消息队列中消费一条消息
> XREADGROUP GROUP group2 consumer1 COUNT 1 STREAMS mymq >
1) 1) "mymq"
2) 1) 1) "1654254953808-0"
2) 1) "name"
2) "xiaolin"
# 让 group2 中的 consumer2 从 mymq 消息队列中消费一条消息
> XREADGROUP GROUP group2 consumer2 COUNT 1 STREAMS mymq >
1) 1) "mymq"
2) 1) 1) "1654256265584-0"
2) 1) "name"
2) "xiaolincoding"
# 让 group2 中的 consumer3 从 mymq 消息队列中消费一条消息
> XREADGROUP GROUP group2 consumer3 COUNT 1 STREAMS mymq >
1) 1) "mymq"
2) 1) 1) "1654256271337-0"
2) 1) "name"
2) "Tom"

基于 Stream 实现的消息队列,如何保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息?

Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。

消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 XACK 命令确认消息已经被消费完成,整个流程的执行如下图所示:

如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息

例如,我们来查看一下 group2 中各个消费者已读取、但尚未确认的消息个数,命令如下:

1
2
3
4
5
6
7
8
9
10
127.0.0.1:6379> XPENDING mymq group2
1) (integer) 3
2) "1654254953808-0" # 表示 group2 中所有消费者读取的消息最小 ID
3) "1654256271337-0" # 表示 group2 中所有消费者读取的消息最大 ID
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer2"
2) "1"
3) 1) "consumer3"
2) "1"

如果想查看某个消费者具体读取了哪些数据,可以执行下面的命令:

1
2
3
4
5
6
# 查看 group2 里 consumer2 已从 mymq 消息队列中读取了哪些消息
> XPENDING mymq group2 - + 10 consumer2
1) 1) "1654256265584-0"
2) "consumer2"
3) (integer) 410700
4) (integer) 1

可以看到,consumer2 已读取的消息的 ID 是 1654256265584-0。

一旦消息 1654256265584-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除

1
2
> XACK mymq group2 1654256265584-0
(integer) 1

当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。

1
2
> XPENDING mymq group2 - + 10 consumer2
(empty array)

好了,基于 Stream 实现的消息队列就说到这里了,小结一下:

  • 消息保序:XADD/XREAD
  • 阻塞读取:XREAD block
  • 重复消息处理:Stream 在使用 XADD 命令,会自动生成全局唯一 ID;
  • 消息可靠性:内部使用 PENDING List 自动保存消息,使用 XPENDING 命令查看消费组已经读取但是未被确认的消息,消费者使用 XACK 确认消息;
  • 支持消费组形式消费数据

Redis 基于 Stream 消息队列与专业的消息队列有哪些差距?

一个专业的消息队列,必须要做到两大块:

  • 消息不丢。
  • 消息可堆积。

1、Redis Stream 消息会丢失吗?

使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者,所以要保证消息就是保证三个环节都不能丢失数据。

Redis Stream 消息队列能不能保证三个环节都不丢失数据?

  • Redis 生产者会不会丢消息?生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。从消息被生产出来,然后提交给 MQ 的过程中,只要能正常收到(MQ 中间件)的 ack 确认响应,就表示发送成功,所以只要处理好返回值和异常,如果返回异常则进行消息重发,那么这个阶段是不会出现消息丢失的。
  • Redis 消费者会不会丢消息?不会,因为 Stream(MQ 中间件)会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,但是未被确认的消息。消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。等到消费者执行完业务逻辑后,再发送消费确认 XACK 命令,也能保证消息的不丢失。
  • Redis 消息中间件会不会丢消息?,Redis 在以下 2 个场景下,都会导致数据丢失:

可以看到,Redis 在队列中间件环节无法保证消息不丢。像 RabbitMQ 或 Kafka 这类专业的队列中间件,在使用时是部署一个集群,生产者在发布消息时,队列中间件通常会写“多个节点”,也就是有多个副本,这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失。

2、Redis Stream 消息可堆积吗?

Redis 的数据都存储在内存中,这就意味着一旦发生消息积压,则会导致 Redis 的内存持续增长,如果超过机器内存上限,就会面临被 OOM 的风险。

所以 Redis 的 Stream 提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。

当指定队列最大长度时,队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。

但 Kafka、RabbitMQ 专业的消息队列它们的数据都是存储在磁盘上,当消息积压时,无非就是多占用一些磁盘空间。

因此,把 Redis 当作队列来使用时,会面临的 2 个问题:

  • Redis 本身可能会丢数据;
  • 面对消息挤压,内存资源会紧张;

所以,能不能将 Redis 作为消息队列来使用,关键看你的业务场景:

  • 如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。
  • 如果你的业务有海量消息,消息积压的概率比较大,并且不能接受数据丢失,那么还是用专业的消息队列中间件吧。

补充:Redis 发布/订阅机制为什么不可以作为消息队列?

发布订阅机制存在以下缺点,都是跟丢失数据有关:

  1. 发布/订阅机制没有基于任何数据类型实现,所以不具备“数据持久化”的能力,也就是发布/订阅机制的相关操作,不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,发布/订阅机制的数据也会全部丢失。
  2. 发布订阅模式是“发后既忘”的工作模式,如果有订阅者离线重连之后不能消费之前的历史消息。
  3. 当消费端有一定的消息积压时,也就是生产者发送的消息,消费者消费不过来时,如果超过 32M 或者是 60s 内持续保持在 8M 以上,消费端会被强行断开,这个参数是在配置文件中设置的,默认值是 client-output-buffer-limit pubsub 32mb 8mb 60

所以,发布/订阅机制只适合即时通讯的场景,比如构建哨兵集群的场景采用了发布/订阅机制。

总结

Redis 后续版本又支持四种数据类型,它们的应用场景如下:

  • BitMap(2.2 版新增):二值状态统计的场景,比如签到、判断用户登陆状态、连续签到用户总数等;
  • HyperLogLog(2.8 版新增):海量数据基数统计的场景,比如百万级网页 UV 计数等;
  • GEO(3.2 版新增):存储地理位置信息的场景,比如滴滴叫车;
  • Stream(5.0 版新增):消息队列,相比于基于 List 类型实现的消息队列,有这两个特有的特性:自动生成全局唯一消息 ID,支持以消费组形式消费数据。

针对 Redis 是否适合做消息队列,关键看你的业务场景:

  • 如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。
  • 如果你的业务有海量消息,消息积压的概率比较大,并且不能接受数据丢失,那么还是用专业的消息队列中间件吧。

参考资料

Redis 集群

Redis 集群(Redis Cluster) 是 Redis 官方提供的“分布式数据库”方案

Redis Cluster 既然被设计分布式系统,自然需要具备分布式系统的基本特性:伸缩性、高可用、一致性。

  • 伸缩性 - Redis Cluster 通过划分虚拟 hash 槽来进行“分区”,以实现集群的伸缩性。
  • 高可用 - Redis Cluster 采用主从架构,支持“复制”和“自动故障转移”,以保证 Redis Cluster 的高可用。
  • 一致性 - 根据 CAP 理论,Consistency、Availability、Partition tolerance 三者不可兼得。而 Redis Cluster 的选择是 AP,即不保证“强一致性”,尽力达到“最终一致性”。

Redis Cluster 应用了 Raft 协议 协议和 Gossip 协议。

关键词:高可用监控选主故障转移分区RaftGossip

Redis Cluster 分区

集群节点

Redis Cluster 由多个节点组成,节点刚启动时,彼此是相互独立的。节点通过握手( CLUSTER MEET 命令)来将其他节点添加到自己所处的集群中

向一个节点发送 CLUSTER MEET 命令,可以让当前节点与指定 IP、PORT 的节点进行三次握手,握手成功时,当前节点会将指定节点加入所在集群。

集群节点保存键值对以及过期时间的方式与单机 Redis 服务完全相同

Redis Cluster 节点分为主节点(master)和从节点(slave):

  • 主节点用于处理槽。
  • 从节点用于复制主节点, 并在主节点下线时, 代替主节点继续处理命令请求。

分配 Hash 槽

分布式存储需要解决的首要问题是把整个数据集按照“分区规则” 到多个节点,即每个节点负责整体数据的一个 子集

Redis Cluster 将整个数据库规划为 “16384” 个虚拟的哈希槽,数据库中的每个键都属于其中一个槽。每个节点都会记录哪些槽指派给了自己, 而哪些槽又被指派给了其他节点

如果数据库中有任何一个槽没有得到分配,那么集群处于“下线”状态

通过向节点发送 CLUSTER ADDSLOTS 命令,可以将一个或多个槽指派给节点负责。

1
2
> CLUSTER ADDSLOTS 1 2 3
OK

集群中的每个节点负责一部分哈希槽,比如集群中有3个节点,则:

  • 节点A存储的哈希槽范围是:0 – 5500
  • 节点B存储的哈希槽范围是:5501 – 11000
  • 节点C存储的哈希槽范围是:11001 – 16384

路由

当客户端向节点发送与数据库键有关的命令时,接受命令的节点会计算出命令要处理的数据库属于哪个槽,并检查这个槽是否指派给了自己

  • 如果键所在的槽正好指派给了当前节点,那么当前节点直接执行命令。
  • 如果键所在的槽没有指派给当前节点,那么节点会向客户端返回一个 MOVED 错误,指引客户端重定向至正确的节点。

计算键属于哪个槽

决定一个 key 应该分配到那个槽的算法是:计算该 key 的 CRC16 结果再模 16834

1
HASH_SLOT = CRC16(KEY) mod 16384

当节点计算出 key 所属的槽为 i 之后,节点会根据以下条件判断槽是否由自己负责:

1
clusterState.slots[i] == clusterState.myself

MOVED 错误

节点在接到一个命令请求时,会先检查这个命令请求要处理的键所在的槽是否由自己负责, 如果不是的话, 节点将向客户端返回一个 MOVED 错误, MOVED 错误携带的信息可以指引客户端转向至正在负责相关槽的节点。

MOVED 错误的格式为:

1
MOVED <slot> <ip>:<port>

提示:MOVED 命令的作用有点类似 HTTP 协议中的重定向。

重新分区

对 Redis Cluster 的重新分片工作是由客户端(redis-trib)执行的, 重新分片的关键是将属于某个槽的所有键值对从一个节点转移至另一个节点

重新分区操作可以“在线”进行,在重新分区的过程中,集群不需要下线,并且源节点和目标节点都可以继续处理命令请求。

重新分区的实现原理如下图所示:

img

ASK 错误

如果节点 A 正在迁移槽 i 至节点 B , 那么当节点 A 没能在自己的数据库中找到命令指定的数据库键时, 节点 A 会向客户端返回一个 ASK 错误, 指引客户端到节点 B 继续查找指定的数据库键。

ASK 错误与 MOVED 的区别在于:

  • MOVED 错误表示槽的负责权已经从一个节点转移到了另一个节点;
  • ASK 错误只是两个节点在迁移槽的过程中使用的一种临时措施。

判断 ASK 错误的过程如下图所示:

img

Redis Cluster 复制

Redis Cluster 中的节点分为主节点和从节点,其中主节点用于处理槽,而从节点则用于复制某个主节点,并在被复制的主节点下线时,代替下线主节点继续处理命令请求。

向一个节点发送命令 CLUSTER REPLICATE <node_id> 可以让接收命令的节点成为 node_id 所指定节点的从节点,并开始对主节点进行复制。

Redis Cluster 节点间的复制是“异步”的。

Redis Cluster 故障转移

故障检测

集群中每个节点都会定期向集群中的其他节点发送 PING 消息,以此来检测对方是否在线

节点的状态信息可以分为:

  • 在线状态;
  • 疑似下线状态(PFAIL) - 即在规定的时间内,没有应答 PING 消息
  • 已下线状态(FAIL) - 半数以上负责处理槽的主节点都将某个主节点视为“疑似下线”,则这个主节点将被标记为“已下线”

故障转移

  1. 下线主节点的所有从节点中,会有一个从节点被选中。
  2. 被选中的从节点会执行 SLAVEOF no one 命令,成为新的主节点。
  3. 新的主节点会撤销所有对已下线主节点的槽指派,并将这些槽全部指派给自己。
  4. 新的主节点向集群广播一条 PONG 消息,告知其他节点这个从节点已变成主节点。
  5. 新的主节点开始接收和自己负责处理的槽有关的命令请求,故障转移完成。

选主

Redis Sentinel 和 Redis Cluster 的选主流程非常相似,二者都基于Raft 协议 实现。

  1. 从节点发现自己的主节点状态为 FAIL
  2. 从节点将自己记录的纪元(epoch)加 1,并广播消息,要求所有收到消息且有投票权的主节点都为自己投票。——这里的纪元(epoch),相当于 Raft 协议中的选期(term)。因个人习惯,后面统一将纪元描述为选期。
  3. 如果某主节点具有投票权(它正在负责处理槽),并且这个主节点尚未投票,那么主节点就返回一条确认消息,表示支持该从节点成为新的主节点。
  4. 每个参与选举的从节点都会根据收到的确认消息,统计自己所得的选票。
  5. 假设集群中存在 N 个具有投票权的主节点,那么当某从节点得到“半数以上”(N / 2 + 1)的选票,则该从节点当选为新的主节点
  6. 由于每个选期中,任意具有投票权的主节点“只能投一票”,所以获得“半数以上”选票的从节点只能有一个。
  7. 如果在一个选期中,没有从节点能获得“半数以上”投票,则本次选期作废,开始进入下一个选期,直到选出新的主节点为止。

Redis Cluster 通信

集群中的节点通过发送和接收消息来进行通信。Redis Cluster 各实例之间的通信方式采用 Gossip 协议来实现。

Redis Cluster 采用 Gossip 协议基于两个主要目标:去中心化以及失败检测

Redis Cluster 中,每个节点之间都会同步信息,但是每个节点的信息不保证实时的,即无法保证数据强一致性,但是保证“数据最终一致性”——当集群中发生节点增减、故障、主从关系变化、槽信息变更等事件时,通过不断的通信,在经过一段时间后,所有的节点都会同步集群全部节点的最新状态。

Redis Cluster 节点发送的消息主要有以下五种:

  • MEET - 请求接收方加入发送方所在的集群。
  • PING - 集群中每个节点每隔一段时间(默认为一秒)从已知节点列表中随机选出五个节点,然后对这五个节点中最久没联系的节点发送 PING 消息,以此检测被选中的节点是否在线。
  • PONG - 当接收方收到发送方发来的 MEET 消息或 PING 消息时,会返回一条 PONG 消息作为应答。
  • FAIL - 当一个主节点 A 判断另一个主节点 B 已经进入 FAIL 状态时,节点 A 会向集群广播一条关于节点 B 的 FAIL 消息,所有收到这条消息的节点都会立即将节点 B 标记为已下线。
  • PUBLISH - 当节点收到一个 PUBLISH 命令时,节点会执行这个命令,并向集群广播一条 PUBLISH 消息,所有接受到这条消息的节点都会执行相同的 PUBLISH 命令。

Redis Cluster 应用

集群功能限制

Redis Cluster 相对 单机,存在一些功能限制,需要 开发人员 提前了解,在使用时做好规避。

  • key 批量操作 支持有限:类似 msetmget 操作,目前只支持对具有相同 slot 值的 key 执行 批量操作。对于 映射为不同 slot 值的 key 由于执行 mgetmget 等操作可能存在于多个节点上,因此不被支持。
  • key 事务操作 支持有限:只支持 key同一节点上事务操作,当多个 key 分布在 不同 的节点上时 无法 使用事务功能。
  • key 作为 数据分区 的最小粒度,不能将一个 大的键值 对象如 hashlist 等映射到 不同的节点
  • 不支持 多数据库空间单机 下的 Redis 可以支持 16 个数据库(db0 ~ db15),集群模式 下只能使用 一个 数据库空间,即 db0
  • 复制结构 只支持一层:从节点 只能复制 主节点,不支持 嵌套树状复制 结构。

集群规模限制

Redis Cluster 的优点是易于使用。分区、主从复制、弹性扩容这些功能都可以做到自动化,通过简单的部署就可以获得一个大容量、高可靠、高可用的 Redis 集群,并且对于应用来说,近乎于是透明的。

所以,Redis Cluster 非常适合构建中小规模 Redis 集群,这里的中小规模指的是,大概几个到几十个节点这样规模的 Redis 集群。

但是 Redis Cluster 不太适合构建超大规模集群,主要原因是,它采用了去中心化的设计。

Redis 的每个节点上,都保存了所有槽和节点的映射关系表,客户端可以访问任意一个节点,再通过重定向命令,找到数据所在的那个节点。那么,这个映射关系表是如何更新的呢?Redis Cluster 采用了一种去中心化的流言 (Gossip) 协议来传播集群配置的变化。

Gossip 协议的优点是去中心化;缺点是传播速度慢,并且是集群规模越大,传播的越慢。

集群配置

我们后面会部署一个 Redis Cluster 作为例子,在那之前,先介绍一下集群在 redis.conf 中的参数。

  • cluster-enabled <yes/no> - 如果配置”yes”则开启集群功能,此 redis 实例作为集群的一个节点,否则,它是一个普通的单一的 redis 实例。
  • cluster-config-file <filename> - 注意:虽然此配置的名字叫“集群配置文件”,但是此配置文件不能人工编辑,它是集群节点自动维护的文件,主要用于记录集群中有哪些节点、他们的状态以及一些持久化参数等,方便在重启时恢复这些状态。通常是在收到请求之后这个文件就会被更新。
  • cluster-node-timeout <milliseconds> - 这是集群中的节点能够失联的最大时间,超过这个时间,该节点就会被认为故障。如果主节点超过这个时间还是不可达,则用它的从节点将启动故障迁移,升级成主节点。注意,任何一个节点在这个时间之内如果还是没有连上大部分的主节点,则此节点将停止接收任何请求。
  • cluster-slave-validity-factor <factor> - 如果设置成0,则无论从节点与主节点失联多久,从节点都会尝试升级成主节点。如果设置成正数,则 cluster-node-timeout 乘以 cluster-slave-validity-factor 得到的时间,是从节点与主节点失联后,此从节点数据有效的最长时间,超过这个时间,从节点不会启动故障迁移。假设 cluster-node-timeout=5,cluster-slave-validity-factor=10,则如果从节点跟主节点失联超过 50 秒,此从节点不能成为主节点。注意,如果此参数配置为非 0,将可能出现由于某主节点失联却没有从节点能顶上的情况,从而导致集群不能正常工作,在这种情况下,只有等到原来的主节点重新回归到集群,集群才恢复运作。
  • cluster-migration-barrier <count> - 主节点需要的最小从节点数,只有达到这个数,主节点失败时,它从节点才会进行迁移。更详细介绍可以看本教程后面关于副本迁移到部分。
  • cluster-require-full-coverage <yes/no> - 在部分 key 所在的节点不可用时,如果此参数设置为”yes”(默认值), 则整个集群停止接受操作;如果此参数设置为”no”,则集群依然为可达节点上的 key 提供读操作。

其他 Redis 集群方案

Redis Cluster 不太适合用于大规模集群,所以,如果要构建超大 Redis 集群,需要选择替代方案。一般有三种方案类型:

  • 客户端分区方案
  • 代理分区方案
  • 查询路由方案

客户端分区方案

客户端 就已经决定数据会被 存储 到哪个 Redis 节点或者从哪个 Redis 节点 读取数据。其主要思想是采用 哈希算法 将 Redis 数据的 key 进行散列,通过 hash 函数,特定的 key映射 到特定的 Redis 节点上。

客户端分区方案 的代表为 Redis Sharding,Redis Sharding 是 Redis Cluster 出来之前,业界普遍使用的 Redis 多实例集群 方法。Java 的 Redis 客户端驱动库 Jedis,支持 Redis Sharding 功能,即 ShardedJedis 以及 结合缓存池 的 ShardedJedisPool。

  • 优点:不使用 第三方中间件分区逻辑 可控,配置 简单,节点之间无关联,容易 线性扩展,灵活性强。
  • 缺点客户端 无法 动态增删 服务节点,客户端需要自行维护 分发逻辑,客户端之间 无连接共享,会造成 连接浪费

代理分区方案

客户端 发送请求到一个 代理组件代理 解析 客户端 的数据,并将请求转发至正确的节点,最后将结果回复给客户端。

  • 优点:简化 客户端 的分布式逻辑,客户端 透明接入,切换成本低,代理的 转发存储 分离。
  • 缺点:多了一层 代理层,加重了 架构部署复杂度性能损耗

代理分区 主流实现的有方案有 TwemproxyCodis

Twemproxy

Twemproxy 也叫 nutcraker,是 Twitter 开源的一个 Redis 和 Memcache 的 中间代理服务器 程序。

Twemproxy 作为 代理,可接受来自多个程序的访问,按照 路由规则,转发给后台的各个 Redis 服务器,再原路返回。**Twemproxy** 存在 单点故障 问题,需要结合 Lvs 和 Keepalived 做 高可用方案

  • 优点:应用范围广,稳定性较高,中间代理层 高可用
  • 缺点:无法平滑地 水平扩容/缩容,无 可视化管理界面,运维不友好,出现故障,不能 自动转移

Codis

Codis 是一个 分布式 Redis 解决方案,对于上层应用来说,连接 Codis-Proxy 和直接连接 原生的 Redis-Server 没有的区别。Codis 底层会 处理请求的转发,不停机的进行 数据迁移 等工作。Codis 采用了无状态的 代理层,对于 客户端 来说,一切都是透明的。

  • 优点:实现了上层 Proxy 和底层 Redis 的 高可用数据分区自动平衡,提供 命令行接口 和 RESTful API,提供 监控管理 界面,可以动态 添加删除 Redis 节点。
  • 缺点部署架构配置 复杂,不支持 跨机房多租户,不支持 鉴权管理

查询路由方案

客户端随机地 请求任意一个 Redis 实例,然后由 Redis 将请求 转发正确 的 Redis 节点。Redis Cluster 实现了一种 混合形式查询路由,但并不是 直接 将请求从一个 Redis 节点 转发 到另一个 Redis 节点,而是在 客户端 的帮助下直接 重定向redirected)到正确的 Redis 节点。

  • 优点去中心化,数据按照 存储分布在多个 Redis 实例上,可以平滑的进行节点 扩容/缩容,支持 高可用自动故障转移,运维成本低。
  • 缺点:重度依赖 Redis-trib 工具,缺乏 监控管理,需要依赖 Smart Client (维护连接缓存路由表MultiOpPipeline 支持)。Failover 节点的 检测过慢,不如有 中心节点 的集群及时(如 ZooKeeper)。Gossip 消息采用广播方式,集群规模越大,开销越大。无法根据统计区分 冷热数据

参考资料

Redis 运维

Redis 是一个高性能的 key-value 数据库。

SET 操作每秒钟 110000 次;GET 操作每秒钟 81000 次。

Redis 安装

Window 下安装

下载地址:https://github.com/MSOpenTech/redis/releases

Redis 支持 32 位和 64 位。这个需要根据你系统平台的实际情况选择,这里我们下载 Redis-x64-xxx.zip压缩包到 C 盘,解压后,将文件夹重新命名为 redis

打开一个 cmd 窗口 使用 cd 命令切换目录到 C:\redis 运行 redis-server.exe redis.windows.conf

如果想方便的话,可以把 redis 的路径加到系统的环境变量里,这样就省得再输路径了,后面的那个 redis.windows.conf 可以省略,如果省略,会启用默认的。

这时候另启一个 cmd 窗口,原来的不要关闭,不然就无法访问服务端了。

切换到 redis 目录下运行 redis-cli.exe -h 127.0.0.1 -p 6379

Linux 下安装

下载地址: http://redis.io/download,下载最新文档版本。

下载、解压、编译 Redis

1
2
3
4
wget http://download.redis.io/releases/redis-5.0.4.tar.gz
tar xzf redis-5.0.4.tar.gz
cd redis-5.0.4
make

为了编译 Redis 源码,你需要 gcc-c++和 tcl。如果你的系统是 CentOS,可以直接执行命令:yum install -y gcc-c++ tcl 来安装。

进入到解压后的 src 目录,通过如下命令启动 Redis:

1
src/redis-server

您可以使用内置的客户端与 Redis 进行交互:

1
2
3
4
5
$ src/redis-cli
redis> set foo bar
OK
redis> get foo
"bar"

Ubuntu 下安装

在 Ubuntu 系统安装 Redis 可以使用以下命令:

1
2
sudo apt-get update
sudo apt-get install redis-server

开机启动

  • 开机启动配置:echo "/usr/local/bin/redis-server /etc/redis.conf" >> /etc/rc.local

开放防火墙端口

  • 添加规则:iptables -I INPUT -p tcp -m tcp --dport 6379 -j ACCEPT
  • 保存规则:service iptables save
  • 重启 iptables:service iptables restart

Redis 安装脚本

CentOS7 环境安装脚本:软件运维配置脚本集合

安装说明

  • 采用编译方式安装 Redis, 并将其注册为 systemd 服务
  • 安装路径为:/usr/local/redis
  • 默认下载安装 5.0.4 版本,端口号为:6379,密码为空

使用方法

  • 默认安装 - 执行以下任意命令即可:
1
2
curl -o- https://gitee.com/turnon/linux-tutorial/raw/master/codes/linux/soft/redis-install.sh | bash
wget -qO- https://gitee.com/turnon/linux-tutorial/raw/master/codes/linux/soft/redis-install.sh | bash
  • 自定义安装 - 下载脚本到本地,并按照以下格式执行:
1
sh redis-install.sh [version] [port] [password]

参数说明:

  • version - redis 版本号
  • port - redis 服务端口号
  • password - 访问密码

Redis 单机使用和配置

启动 Redis

启动 redis 服务

1
2
cd /usr/local/redis/src
./redis-server

启动 redis 客户端

1
2
cd /usr/local/redis/src
./redis-cli

查看 redis 是否启动

1
redis-cli

以上命令将打开以下终端:

1
redis 127.0.0.1:6379>

127.0.0.1 是本机 IP ,6379 是 redis 服务端口。现在我们输入 PING 命令。

1
2
redis 127.0.0.1:6379> ping
PONG

以上说明我们已经成功启动了 redis。

Redis 常见配置

Redis 默认的配置文件是根目录下的 redis.conf 文件。

如果需要指定特定文件作为配置文件,需要使用命令: ./redis-server -c xxx.conf

每次修改配置后,需要重启才能生效。

Redis 官方默认配置:

自 Redis2.6 起就可以直接通过命令行传递 Redis 配置参数。这种方法可以用于测试。自 Redis2.6 起就可以直接通过命令行传递 Redis 配置参数。这种方法可以用于测试。

设为守护进程

Redis 默认以非守护进程方式启动,而通常我们会将 Redis 设为守护进程启动方式,配置:daemonize yes

远程访问

Redis 默认绑定 127.0.0.1,这样就只能本机才能访问,若要 Redis 允许远程访问,需要配置:bind 0.0.0.0

设置密码

Redis 默认访问不需要密码,如果需要设置密码,需要如下配置:

  • protected-mode yes
  • requirepass <密码>

配置参数表

配置项 说明
daemonize no Redis 默认不是以守护进程的方式运行,可以通过该配置项修改,使用 yes 启用守护进程(Windows 不支持守护线程的配置为 no )
pidfile /var/run/redis.pid 当 Redis 以守护进程方式运行时,Redis 默认会把 pid 写入 /var/run/redis.pid 文件,可以通过 pidfile 指定
port 6379 指定 Redis 监听端口,默认端口为 6379,作者在自己的一篇博文中解释了为什么选用 6379 作为默认端口,因为 6379 在手机按键上 MERZ 对应的号码,而 MERZ 取自意大利歌女 Alessia Merz 的名字
bind 127.0.0.1 绑定的主机地址
timeout 300 当客户端闲置多长时间后关闭连接,如果指定为 0,表示关闭该功能
loglevel notice 指定日志记录级别,Redis 总共支持四个级别:debug、verbose、notice、warning,默认为 notice
logfile stdout 日志记录方式,默认为标准输出,如果配置 Redis 为守护进程方式运行,而这里又配置为日志记录方式为标准输出,则日志将会发送给 /dev/null
databases 16 设置数据库的数量,默认数据库为 0,可以使用 SELECT 命令在连接上指定数据库 id
save <seconds> <changes> Redis 默认配置文件中提供了三个条件:save 900 1save 300 10save 60 10000 分别表示 900 秒(15 分钟)内有 1 个更改,300 秒(5 分钟)内有 10 个更改以及 60 秒内有 10000 个更改。 指定在多长时间内,有多少次更新操作,就将数据同步到数据文件,可以多个条件配合
rdbcompression yes 指定存储至本地数据库时是否压缩数据,默认为 yes,Redis 采用 LZF 压缩,如果为了节省 CPU 时间,可以关闭该选项,但会导致数据库文件变的巨大
dbfilename dump.rdb 指定本地数据库文件名,默认值为 dump.rdb
dir ./ 指定本地数据库存放目录
slaveof <masterip> <masterport> 设置当本机为 slav 服务时,设置 master 服务的 IP 地址及端口,在 Redis 启动时,它会自动从 master 进行数据同步
masterauth <master-password> 当 master 服务设置了密码保护时,slav 服务连接 master 的密码
requirepass foobared 设置 Redis 连接密码,如果配置了连接密码,客户端在连接 Redis 时需要通过 AUTH <password> 命令提供密码,默认关闭
maxclients 128 设置同一时间最大客户端连接数,默认无限制,Redis 可以同时打开的客户端连接数为 Redis 进程可以打开的最大文件描述符数,如果设置 maxclients 0,表示不作限制。当客户端连接数到达限制时,Redis 会关闭新的连接并向客户端返回 max number of clients reached 错误信息
maxmemory <bytes> 指定 Redis 最大内存限制,Redis 在启动时会把数据加载到内存中,达到最大内存后,Redis 会先尝试清除已到期或即将到期的 Key,当此方法处理 后,仍然到达最大内存设置,将无法再进行写入操作,但仍然可以进行读取操作。Redis 新的 vm 机制,会把 Key 存放内存,Value 会存放在 swap 区
appendonly no 指定是否在每次更新操作后进行日志记录,Redis 在默认情况下是异步的把数据写入磁盘,如果不开启,可能会在断电时导致一段时间内的数据丢失。因为 redis 本身同步数据文件是按上面 save 条件来同步的,所以有的数据会在一段时间内只存在于内存中。默认为 no
appendfilename appendonly.aof 指定更新日志文件名,默认为 appendonly.aof
appendfsync everysec 指定更新日志条件,共有 3 个可选值:no:表示等操作系统进行数据缓存同步到磁盘(快)always:表示每次更新操作后手动调用 fsync() 将数据写到磁盘(慢,安全)everysec:表示每秒同步一次(折中,默认值)
vm-enabled no 指定是否启用虚拟内存机制,默认值为 no,简单的介绍一下,VM 机制将数据分页存放,由 Redis 将访问量较少的页即冷数据 swap 到磁盘上,访问多的页面由磁盘自动换出到内存中(在后面的文章我会仔细分析 Redis 的 VM 机制)
vm-swap-file /tmp/redis.swap 虚拟内存文件路径,默认值为 /tmp/redis.swap,不可多个 Redis 实例共享
vm-max-memory 0 将所有大于 vm-max-memory 的数据存入虚拟内存,无论 vm-max-memory 设置多小,所有索引数据都是内存存储的(Redis 的索引数据 就是 keys),也就是说,当 vm-max-memory 设置为 0 的时候,其实是所有 value 都存在于磁盘。默认值为 0
vm-page-size 32 Redis swap 文件分成了很多的 page,一个对象可以保存在多个 page 上面,但一个 page 上不能被多个对象共享,vm-page-size 是要根据存储的 数据大小来设定的,作者建议如果存储很多小对象,page 大小最好设置为 32 或者 64bytes;如果存储很大大对象,则可以使用更大的 page,如果不确定,就使用默认值
vm-pages 134217728 设置 swap 文件中的 page 数量,由于页表(一种表示页面空闲或使用的 bitmap)是在放在内存中的,,在磁盘上每 8 个 pages 将消耗 1byte 的内存。
vm-max-threads 4 设置访问 swap 文件的线程数,最好不要超过机器的核数,如果设置为 0,那么所有对 swap 文件的操作都是串行的,可能会造成比较长时间的延迟。默认值为 4
glueoutputbuf yes 设置在向客户端应答时,是否把较小的包合并为一个包发送,默认为开启
hash-max-zipmap-entries 64 hash-max-zipmap-value 512 指定在超过一定的数量或者最大的元素超过某一临界值时,采用一种特殊的哈希算法
activerehashing yes 指定是否激活重置哈希,默认为开启(后面在介绍 Redis 的哈希算法时具体介绍)
include /path/to/local.conf 指定包含其它的配置文件,可以在同一主机上多个 Redis 实例之间使用同一份配置文件,而同时各个实例又拥有自己的特定配置文件

压力测试

参考官方文档:How fast is Redis?

Redis 自带了一个性能测试工具:redis-benchmark

(1)基本测试

1
redis-benchmark -q -n 100000
  • -q 表示静默(quiet)执行
  • -n 100000 请求 10 万次

(2)测试指定读写指令

1
2
3
$ redis-benchmark -t set,lpush -n 100000 -q
SET: 74239.05 requests per second
LPUSH: 79239.30 requests per second

(3)测试 pipeline 模式下指定读写指令

1
2
3
redis-benchmark -n 1000000 -t set,get -P 16 -q
SET: 403063.28 requests per second
GET: 508388.41 requests per second

Redis 集群使用和配置

Redis 3.0 后支持集群模式。

集群规划

Redis 集群一般由 多个节点 组成,节点数量至少为 6 个,才能保证组成 完整高可用 的集群。

理想情况当然是所有节点各自在不同的机器上,首先于资源,本人在部署 Redis 集群时,只得到 3 台服务器。所以,我计划每台服务器部署 2 个 Redis 节点。

【示例】最简高可用 Redis 集群规划

机器配置:16G 内存 + 8 核 CPU + 1T 磁盘

Redis 进程分配 10 G 内存。一般线上生产环境,Redis 的内存尽量不要超过 10g,超过 10g 可能会有问题。

集群拓扑:三主三从;三哨兵,每个哨兵监听所有主节点。

估算性能:

  • 容量:三主,占用 30 G 内存,所以最大存储容量为 30 G。假设每条数据记录平均 大小为 10 K,则最大能存储 300 万条数据。
  • 吞吐量:单机一般 TPS/QPS 为 五万到八万左右。假设为五万,那么三主三从架构理论上能达到 TPS 15 万,QPS 30 万。

部署集群

Redis 集群节点的安装与单节点服务相同,差异仅在于部署方式。

注意:为了演示方便,本示例将所有 Redis 集群节点都部署在一台机器上,实际生产环境中,基本都会将节点部署在不同机器上。要求更高的,可能还要考虑多机房部署。

(1)创建节点目录

我个人偏好将软件放在 /opt 目录下,在我的机器中,Redis 都安装在 /usr/local/redis 目录下。所以,下面的命令和配置都假设 Redis 安装目录为 /usr/local/redis

确保机器上已经安装了 Redis 后,执行以下命令,创建 Redis 集群节点实例目录:

1
2
3
4
5
6
sudo mkdir -p /usr/local/redis/conf/7001
sudo mkdir -p /usr/local/redis/conf/7002
sudo mkdir -p /usr/local/redis/conf/7003
sudo mkdir -p /usr/local/redis/conf/7004
sudo mkdir -p /usr/local/redis/conf/7005
sudo mkdir -p /usr/local/redis/conf/7006

(2)配置集群节点

每个实例目录下,新建 redis.conf 配置文件。

实例配置模板以 7001 节点为例(其他节点,完全替换配置中的端口号 7001 即可),如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 端口号
port 7001
# 绑定的主机端口(0.0.0.0 表示允许远程访问)
bind 0.0.0.0
# 以守护进程方式启动
daemonize yes

# 开启集群模式
cluster-enabled yes
# 集群的配置,配置文件首次启动自动生成
cluster-config-file /usr/local/redis/conf/7001/7001.conf
# 请求超时时间,设置 10 秒
cluster-node-timeout 10000

# 开启 AOF 持久化
appendonly yes
# 数据存放目录
dir /usr/local/redis/conf/7001
# 进程文件
pidfile /usr/local/redis/conf/7001/7001.pid
# 日志文件
logfile /usr/local/redis/conf/7001/7001.log

(3)批量启动 Redis 节点

Redis 的 utils/create-cluster 目录下自带了一个名为 create-cluster 的脚本工具,可以利用它来新建、启动、停止、重启 Redis 节点。

脚本中有几个关键参数:

  • PORT=30000 - 初始端口号
  • TIMEOUT=2000 - 超时时间
  • NODES=6 - 节点数
  • REPLICAS=1 - 备份数

脚本中的每个命令项会根据初始端口号,以及设置的节点数,遍历的去执行操作。

由于前面的规划中,节点端口是从 7001 ~ 7006,所以需要将 PORT 变量设为 7000。

脚本中启动每个 Redis 节点是通过指定命令行参数来配置属性。所以,我们需要改一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
PORT=7000
TIMEOUT=2000
NODES=6
ENDPORT=$((PORT+NODES))

# ...

if [ "$1" == "start" ]
then
while [ $((PORT < ENDPORT)) != "0" ]; do
PORT=$((PORT+1))
echo "Starting $PORT"
/usr/local/redis/src/redis-server /usr/local/redis/conf/${PORT}/redis.conf
done
exit 0
fi

好了,在每台服务器上,都执行 ./create-cluster start 来启动节点。

然后,通过 ps 命令来确认 Redis 进程是否已经工作:

1
2
3
4
5
6
7
8
# root @ dbClusterDev01 in /usr/local/redis/conf [11:07:55]
$ ps -ef | grep redis
root 4604 1 0 11:07 ? 00:00:00 /opt/redis/src/redis-server 0.0.0.0:7001 [cluster]
root 4609 1 0 11:07 ? 00:00:00 /opt/redis/src/redis-server 0.0.0.0:7002 [cluster]
root 4614 1 0 11:07 ? 00:00:00 /opt/redis/src/redis-server 0.0.0.0:7003 [cluster]
root 4619 1 0 11:07 ? 00:00:00 /opt/redis/src/redis-server 0.0.0.0:7004 [cluster]
root 4624 1 0 11:07 ? 00:00:00 /opt/redis/src/redis-server 0.0.0.0:7005 [cluster]
root 4629 1 0 11:07 ? 00:00:00 /opt/redis/src/redis-server 0.0.0.0:7006 [cluster]

(4)启动集群

通过 redis-cli --cluster create 命令可以自动配置集群,如下:

1
./redis-cli --cluster create 127.0.0.1:7001 127.0.0.1:7002 127.0.0.2:7003 127.0.0.2:7004 127.0.0.3:7005 127.0.0.3:7006 --cluster-replicas 1

redis-cluster 会根据设置的节点数和副本数自动分片(分配 Hash 虚拟槽 slot),如果满意,输入 yes ,直接开始分片。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
>>> Performing hash slots allocation on 6 nodes...
Master[0] -> Slots 0 - 5460
Master[1] -> Slots 5461 - 10922
Master[2] -> Slots 10923 - 16383
Adding replica 127.0.0.2:7004 to 127.0.0.1:7001
Adding replica 127.0.0.3:7006 to 127.0.0.2:7003
Adding replica 127.0.0.1:7002 to 127.0.0.3:7005
M: b721235997deb6b9a7a2be690b5b9663db8057c6 127.0.0.1:7001
slots:[0-5460] (5461 slots) master
S: bda9b7036df0bbefe601bda4ce45d3787a2e9bd9 127.0.0.1:7002
replicates 3623fff69b5243ed18c02a2fbb6f53069b0f1505
M: 91523c0391a044da6cc9f53bb965aabe89502187 127.0.0.2:7003
slots:[5461-10922] (5462 slots) master
S: 9d899cbe49dead7b8c4f769920cdb75714a441ae 127.0.0.2:7004
replicates b721235997deb6b9a7a2be690b5b9663db8057c6
M: 3623fff69b5243ed18c02a2fbb6f53069b0f1505 127.0.0.3:7005
slots:[10923-16383] (5461 slots) master
S: a2869dc153ea4977ca790b76483574a5d56cb40e 127.0.0.3:7006
replicates 91523c0391a044da6cc9f53bb965aabe89502187
Can I set the above configuration? (type 'yes' to accept): yes
>>> Nodes configuration updated
>>> Assign a different config epoch to each node
>>> Sending CLUSTER MEET messages to join the cluster
Waiting for the cluster to join
....
>>> Performing Cluster Check (using node 127.0.0.1:7001)
M: b721235997deb6b9a7a2be690b5b9663db8057c6 127.0.0.1:7001
slots:[0-5460] (5461 slots) master
1 additional replica(s)
S: a2869dc153ea4977ca790b76483574a5d56cb40e 127.0.0.1:7006
slots: (0 slots) slave
replicates 91523c0391a044da6cc9f53bb965aabe89502187
M: 91523c0391a044da6cc9f53bb965aabe89502187 127.0.0.1:7003
slots:[5461-10922] (5462 slots) master
1 additional replica(s)
M: 3623fff69b5243ed18c02a2fbb6f53069b0f1505 127.0.0.1:7005
slots:[10923-16383] (5461 slots) master
1 additional replica(s)
S: 9d899cbe49dead7b8c4f769920cdb75714a441ae 127.0.0.1:7004
slots: (0 slots) slave
replicates b721235997deb6b9a7a2be690b5b9663db8057c6
S: bda9b7036df0bbefe601bda4ce45d3787a2e9bd9 127.0.0.1:7002
slots: (0 slots) slave
replicates 3623fff69b5243ed18c02a2fbb6f53069b0f1505
[OK] All nodes agree about slots configuration.
>>> Check for open slots...
>>> Check slots coverage...
[OK] All 16384 slots covered.

(5)日常维护操作

  • 关闭集群 - ./create-cluster stop
  • 检查集群是否健康(指定任意节点即可):./redis-cli --cluster check <ip:port>
  • 尝试修复集群节点:./redis-cli --cluster fix <ip:port>

部署哨兵

redis-cluster 实现了 Redis 的分片、复制。

但 redis-cluster 没有解决故障转移问题,一旦任意分片的 Master 节点宕机、网络不通,就会导致 redis-cluster 的集群不能工作。为了解决高可用的问题,Redis 提供了 Redis 哨兵来监控 Redis 节点状态,并且会在 Master 宕机时,发起选举,将这个 Master 的一个 Slave 节点选举为 Master。

(1)创建节点目录

我个人偏好将软件放在 /opt 目录下,在我的机器中,Redis 都安装在 /usr/local/redis 目录下。所以,下面的命令和配置都假设 Redis 安装目录为 /usr/local/redis

确保机器上已经安装了 Redis 后,执行以下命令,创建 Redis 集群节点实例目录:

1
2
3
sudo mkdir -p /usr/local/redis/conf/27001
sudo mkdir -p /usr/local/redis/conf/27002
sudo mkdir -p /usr/local/redis/conf/27003

(2)配置集群节点

每个实例目录下,新建 redis.conf 配置文件。

实例配置模板以 7001 节点为例(其他节点,完全替换配置中的端口号 7001 即可),如下:

1
2
3
4
5
6
7
8
port 27001
daemonize yes
sentinel monitor redis-master 172.22.6.3 7001 2
sentinel down-after-milliseconds redis-master 5000
sentinel failover-timeout redis-master 900000
sentinel parallel-syncs redis-master 1
#sentinel auth-pass redis-master 123456
logfile /usr/local/redis/conf/27001/27001.log

(3)批量启动哨兵节点

1
2
3
/opt/redis/src/redis-sentinel /usr/local/redis/conf/27001/sentinel.conf
/opt/redis/src/redis-sentinel /usr/local/redis/conf/27002/sentinel.conf
/opt/redis/src/redis-sentinel /usr/local/redis/conf/27003/sentinel.conf

扩容

(1)查看信息

进入任意节点

1
./redis-cli -h 172.22.6.3 -p 7001

cluster info 查看集群节点状态

1
2
3
4
5
6
7
172.22.6.3:7001> cluster nodes
f158bf70bb2767cac271ce4efcfc14ba0b7ca98b 172.22.6.3:7006@17006 slave e7aa182e756b76ec85b471797db9b66e4b2da725 0 1594528179000 6 connected
f348e67648460c7a800120d69b4977bf2e4524cb 172.22.6.3:7001@17001 myself,master - 0 1594528179000 1 connected 0-5460
52601e2d4af0e64b83f4cc6d20e8316d0ac38b99 172.22.6.3:7004@17004 slave 4802fafe897160c46392c6e569d6f5e466cca696 0 1594528178000 4 connected
c6c6a68674ae8aac3c6ec792c8af4dc1228c6c31 172.22.6.3:7005@17005 slave f348e67648460c7a800120d69b4977bf2e4524cb 0 1594528179852 5 connected
e7aa182e756b76ec85b471797db9b66e4b2da725 172.22.6.3:7002@17002 master - 0 1594528178000 2 connected 5461-10922
4802fafe897160c46392c6e569d6f5e466cca696 172.22.6.3:7003@17003 master - 0 1594528178000 3 connected 10923-16383

cluster info 查看集群信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
172.22.6.3:7001> cluster info
cluster_state:ok
cluster_slots_assigned:16384
cluster_slots_ok:16384
cluster_slots_pfail:0
cluster_slots_fail:0
cluster_known_nodes:6
cluster_size:3
cluster_current_epoch:6
cluster_my_epoch:1
cluster_stats_messages_ping_sent:3406
cluster_stats_messages_pong_sent:3569
cluster_stats_messages_publish_sent:5035
cluster_stats_messages_sent:12010
cluster_stats_messages_ping_received:3564
cluster_stats_messages_pong_received:3406
cluster_stats_messages_meet_received:5
cluster_stats_messages_publish_received:5033
cluster_stats_messages_received:12008

(2)添加节点到集群

将已启动的节点实例添加到集群中

1
redis-cli --cluster add-node 127.0.0.1:7007 127.0.0.1:7008

添加主节点

添加一组主节点

1
2
3
./redis-cli --cluster add-node 172.22.6.3:7007 172.22.6.3:7001
./redis-cli --cluster add-node 172.22.6.3:7008 172.22.6.3:7001
./redis-cli --cluster add-node 172.22.6.3:7009 172.22.6.3:7001

查看节点状态

1
2
3
4
5
6
7
8
9
10
172.22.6.3:7001> cluster nodes
f158bf70bb2767cac271ce4efcfc14ba0b7ca98b 172.22.6.3:7006@17006 slave e7aa182e756b76ec85b471797db9b66e4b2da725 0 1594529342575 6 connected
f348e67648460c7a800120d69b4977bf2e4524cb 172.22.6.3:7001@17001 myself,master - 0 1594529340000 1 connected 0-5460
55cacf121662833a4a19dbeb4a5df712cfedf77f 172.22.6.3:7009@17009 master - 0 1594529342000 0 connected
c6c6a68674ae8aac3c6ec792c8af4dc1228c6c31 172.22.6.3:7005@17005 slave f348e67648460c7a800120d69b4977bf2e4524cb 0 1594529341573 5 connected
4802fafe897160c46392c6e569d6f5e466cca696 172.22.6.3:7003@17003 master - 0 1594529343577 3 connected 10923-16383
e7aa182e756b76ec85b471797db9b66e4b2da725 172.22.6.3:7002@17002 master - 0 1594529342000 2 connected 5461-10922
e5ba78fe629115977a74fbbe1478caf8868d6d55 172.22.6.3:7007@17007 master - 0 1594529341000 0 connected
52601e2d4af0e64b83f4cc6d20e8316d0ac38b99 172.22.6.3:7004@17004 slave 4802fafe897160c46392c6e569d6f5e466cca696 0 1594529340000 4 connected
79d4fffc2cec210556c3b4c44e63ab506e87eda3 172.22.6.3:7008@17008 master - 0 1594529340000 7 connected

可以发现,新加入的三个主节点,还没有分配哈希槽,所以,暂时还无法访问。

添加从节点

–slave:设置该参数,则新节点以 slave 的角色加入集群
–master-id:这个参数需要设置了–slave 才能生效,–master-id 用来指定新节点的 master 节点。如果不设置该参数,则会随机为节点选择 master 节点。

语法

1
2
redis-cli --cluster add-node  新节点IP地址:端口    存在节点IP:端口 --cluster-slave (从节点) --cluster-master-id (master节点的ID)
redis-cli --cluster add-node 10.42.141.119:6379 10.42.166.105:6379 --cluster-slave --cluster-master-id dfa238fff8a7a49230cff7eb74f573f5645c8ec5

示例

1
2
3
./redis-cli --cluster add-node 172.22.6.3:7010 172.22.6.3:7007 --cluster-slave
./redis-cli --cluster add-node 172.22.6.3:7011 172.22.6.3:7008 --cluster-slave
./redis-cli --cluster add-node 172.22.6.3:7012 172.22.6.3:7009 --cluster-slave

查看状态

1
2
3
4
5
6
7
8
9
10
11
12
13
172.22.6.3:7001> cluster nodes
ef5c1b9ce4cc795dc12b2c1e8736a572647b4c3e 172.22.6.3:7011@17011 slave 79d4fffc2cec210556c3b4c44e63ab506e87eda3 0 1594529492043 7 connected
f158bf70bb2767cac271ce4efcfc14ba0b7ca98b 172.22.6.3:7006@17006 slave e7aa182e756b76ec85b471797db9b66e4b2da725 0 1594529491943 6 connected
f348e67648460c7a800120d69b4977bf2e4524cb 172.22.6.3:7001@17001 myself,master - 0 1594529488000 1 connected 0-5460
5140d1129ed850df59c51cf818c4eb74545d9959 172.22.6.3:7010@17010 slave e5ba78fe629115977a74fbbe1478caf8868d6d55 0 1594529488000 0 connected
55cacf121662833a4a19dbeb4a5df712cfedf77f 172.22.6.3:7009@17009 master - 0 1594529488000 8 connected
c6c6a68674ae8aac3c6ec792c8af4dc1228c6c31 172.22.6.3:7005@17005 slave f348e67648460c7a800120d69b4977bf2e4524cb 0 1594529490000 5 connected
4802fafe897160c46392c6e569d6f5e466cca696 172.22.6.3:7003@17003 master - 0 1594529489939 3 connected 10923-16383
e7aa182e756b76ec85b471797db9b66e4b2da725 172.22.6.3:7002@17002 master - 0 1594529491000 2 connected 5461-10922
e5ba78fe629115977a74fbbe1478caf8868d6d55 172.22.6.3:7007@17007 master - 0 1594529490942 0 connected
52601e2d4af0e64b83f4cc6d20e8316d0ac38b99 172.22.6.3:7004@17004 slave 4802fafe897160c46392c6e569d6f5e466cca696 0 1594529491000 4 connected
02e9f57b5b45c350dc57acf1c8efa8db136db7b7 172.22.6.3:7012@17012 master - 0 1594529489000 0 connected
79d4fffc2cec210556c3b4c44e63ab506e87eda3 172.22.6.3:7008@17008 master - 0 1594529489000 7 connected

分配哈希槽

执行 ./redis-cli --cluster rebalance 172.22.6.3:7001 --cluster-threshold 1 --cluster-use-empty-masters

参数说明:

rebalance:表明让 Redis 自动根据节点数进行均衡哈希槽分配。

–cluster-use-empty-masters:表明

img

执行结束后,查看状态:

img

Redis 命令

通用命令

命令详细用法,请参考 Redis 命令官方文档

搬迁两张 cheat sheet 图,原址:https://www.cheatography.com/tasjaevan/cheat-sheets/redis/

集群命令

  • 集群
    • cluster info - 打印集群的信息
    • cluster nodes - 列出集群当前已知的所有节点( node),以及这些节点的相关信息。
  • 节点
    • cluster meet <ip> <port> - 将 ip 和 port 所指定的节点添加到集群当中,让它成为集群的一份子。
    • cluster forget <node_id> - 从集群中移除 node_id 指定的节点。
    • cluster replicate <node_id> - 将当前节点设置为 node_id 指定的节点的从节点。
    • cluster saveconfig - 将节点的配置文件保存到硬盘里面。
  • 槽(slot)
    • cluster addslots <slot> [slot ...] - 将一个或多个槽( slot)指派( assign)给当前节点。
    • cluster delslots <slot> [slot ...] - 移除一个或多个槽对当前节点的指派。
    • cluster flushslots - 移除指派给当前节点的所有槽,让当前节点变成一个没有指派任何槽的节点。
    • cluster setslot <slot> node <node_id> - 将槽 slot 指派给 node_id 指定的节点,如果槽已经指派给另一个节点,那么先让另一个节点删除该槽>,然后再进行指派。
    • cluster setslot <slot> migrating <node_id> - 将本节点的槽 slot 迁移到 node_id 指定的节点中。
    • cluster setslot <slot> importing <node_id> - 从 node_id 指定的节点中导入槽 slot 到本节点。
    • cluster setslot <slot> stable - 取消对槽 slot 的导入( import)或者迁移( migrate)。
    • cluster keyslot <key> - 计算键 key 应该被放置在哪个槽上。
    • cluster countkeysinslot <slot> - 返回槽 slot 目前包含的键值对数量。
    • cluster getkeysinslot <slot> <count> - 返回 count 个 slot 槽中的键。

重新分片

添加节点:./redis-cli –cluster add-node 192.168.1.136:7007 192.168.1.136:7001 –cluster-slave

redis-cli –cluster reshard 172.22.6.3 7001

客户端

推荐使用 RedisDesktopManager

参考资料

Flink 入门

Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

img

概念

处理无界和有界数据

任何类型的数据都可以形成一种事件流。

数据可以被作为 无界 或者 有界 流来处理。

  • 无界流 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
  • 有界流 有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。

img

核心概念

显而易见,(数据)流是流处理的基本要素。然而,流也拥有着多种特征。这些特征决定了流如何以及何时被处理。Flink 是一个能够处理任何类型数据流的强大处理框架。

  • 有界无界 的数据流:流可以是无界的;也可以是有界的,例如固定大小的数据集。Flink 在无界的数据流处理上拥有诸多功能强大的特性,同时也针对有界的数据流开发了专用的高效算子。
  • 实时历史记录 的数据流:所有的数据都是以流的方式产生,但用户通常会使用两种截然不同的方法处理数据。或是在数据生成时进行实时的处理;亦或是先将数据流持久化到存储系统中——例如文件系统或对象存储,然后再进行批处理。Flink 的应用能够同时支持处理实时以及历史记录数据流。

状态

只有在每一个单独的事件上进行转换操作的应用才不需要状态,换言之,每一个具有一定复杂度的流处理应用都是有状态的。任何运行基本业务逻辑的流处理应用都需要在一定时间内存储所接收的事件或中间结果,以供后续的某个时间点(例如收到下一个事件或者经过一段特定时间)进行访问并进行后续处理。

img

应用状态是 Flink 中的一等公民,Flink 提供了许多状态管理相关的特性支持,其中包括:

  • 多种状态基础类型:Flink 为多种不同的数据结构提供了相对应的状态基础类型,例如原子值(value),列表(list)以及映射(map)。开发者可以基于处理函数对状态的访问方式,选择最高效、最适合的状态基础类型。
  • 插件化的 State Backend:State Backend 负责管理应用程序状态,并在需要的时候进行 checkpoint。Flink 支持多种 state backend,可以将状态存在内存或者 RocksDB。RocksDB 是一种高效的嵌入式、持久化键值存储引擎。Flink 也支持插件式的自定义 state backend 进行状态存储。
  • 精确一次语义:Flink 的 checkpoint 和故障恢复算法保证了故障发生后应用状态的一致性。因此,Flink 能够在应用程序发生故障时,对应用程序透明,不造成正确性的影响。
  • 超大数据量状态:Flink 能够利用其异步以及增量式的 checkpoint 算法,存储数 TB 级别的应用状态。
  • 可弹性伸缩的应用:Flink 能够通过在更多或更少的工作节点上对状态进行重新分布,支持有状态应用的分布式的横向伸缩。

时间

时间是流处理应用另一个重要的组成部分。因为事件总是在特定时间点发生,所以大多数的事件流都拥有事件本身所固有的时间语义。进一步而言,许多常见的流计算都基于时间语义,例如窗口聚合、会话计算、模式检测和基于时间的 join。流处理的一个重要方面是应用程序如何衡量时间,即区分事件时间(event-time)和处理时间(processing-time)。

Flink 提供了丰富的时间语义支持。

  • 事件时间模式:使用事件时间语义的流处理应用根据事件本身自带的时间戳进行结果的计算。因此,无论处理的是历史记录的事件还是实时的事件,事件时间模式的处理总能保证结果的准确性和一致性。
  • Watermark 支持:Flink 引入了 watermark 的概念,用以衡量事件时间进展。Watermark 也是一种平衡处理延时和完整性的灵活机制。
  • 迟到数据处理:当以带有 watermark 的事件时间模式处理数据流时,在计算完成之后仍会有相关数据到达。这样的事件被称为迟到事件。Flink 提供了多种处理迟到数据的选项,例如将这些数据重定向到旁路输出(side output)或者更新之前完成计算的结果。
  • 处理时间模式:除了事件时间模式,Flink 还支持处理时间语义。处理时间模式根据处理引擎的机器时钟触发计算,一般适用于有着严格的低延迟需求,并且能够容忍近似结果的流处理应用。
  • Flink 是基于事件驱动 (Event-driven) 的应用,能够同时支持流处理和批处理
  • 基于内存计算,能够保证高吞吐和低延迟,具有优越的性能表现;
  • 支持精确一次 (Exactly-once) 语意,能够完美地保证一致性和正确性;
  • 分层 API ,能够满足各个层次的开发需求;
  • 支持高可用配置,支持保存点机制,能够提供安全性和稳定性上的保证;
  • 多样化的部署方式,支持本地,远端,云端等多种部署方案;
  • 具有横向扩展架构,能够按照用户的需求进行动态扩容;
  • 活跃度极高的社区和完善的生态圈的支持。

分层架构

Flink 采用分层的架构设计,从而保证各层在功能和职责上的清晰。

自上而下,分别是 API & Libraries 层、Runtime 核心层以及物理部署层:

API & Libraries 层

这一层主要提供了编程 API 和 顶层类库:

  • 编程 API : 用于进行流处理的 DataStream API 和用于进行批处理的 DataSet API;
    • SQL & Table API - SQL & Table API 同时适用于批处理和流处理,这意味着你可以对有界数据流和无界数据流以相同的语义进行查询,并产生相同的结果。除了基本查询外, 它还支持自定义的标量函数,聚合函数以及表值函数,可以满足多样化的查询需求。
    • DataStream & DataSet API - DataStream & DataSet API 是 Flink 数据处理的核心 API,支持使用 Java 语言或 Scala 语言进行调用,提供了数据读取,数据转换和数据输出等一系列常用操作的封装。
    • Stateful Stream Processing - Stateful Stream Processing 是最低级别的抽象,它通过 Process Function 函数内嵌到 DataStream API 中。 Process Function 是 Flink 提供的最底层 API,具有最大的灵活性,允许开发者对于时间和状态进行细粒度的控制。
  • 顶层类库
    • 用于复杂事件处理的 CEP 库;
    • 用于结构化数据查询的 SQL & Table 库;
    • 基于批处理的机器学习库 FlinkML
    • 图形处理库 Gelly。

Runtime 核心层

这一层是 Flink 分布式计算框架的核心实现层,包括作业转换,任务调度,资源分配,任务执行等功能,基于这一层的实现,可以在流式引擎下同时运行流处理程序和批处理程序。

物理部署层

Flink 的物理部署层,用于支持在不同平台上部署运行 Flink 应用。

集群架构

核心组件

按照上面的介绍,Flink 核心架构的第二层是 Runtime 层, 该层采用标准的 Master - Slave 结构, 其中,Master 部分又包含了三个核心组件:Dispatcher、ResourceManager 和 JobManager,而 Slave 则主要是 TaskManager 进程。它们的功能分别如下:

  • JobManagers (也称为 masters) :JobManagers 接收由 Dispatcher 传递过来的执行程序,该执行程序包含了作业图 (JobGraph),逻辑数据流图 (logical dataflow graph) 及其所有的 classes 文件以及第三方类库 (libraries) 等等 。紧接着 JobManagers 会将 JobGraph 转换为执行图 (ExecutionGraph),然后向 ResourceManager 申请资源来执行该任务,一旦申请到资源,就将执行图分发给对应的 TaskManagers 。因此每个作业 (Job) 至少有一个 JobManager;高可用部署下可以有多个 JobManagers,其中一个作为 _leader_,其余的则处于 standby 状态。
  • TaskManagers (也称为 workers) : TaskManagers **负责实际的子任务 (subtasks) 的执行,每个 TaskManagers 都拥有一定数量的 slots。Slot 是一组固定大小的资源的合集 (如计算能力,存储空间)**。TaskManagers 启动后,会将其所拥有的 slots 注册到 ResourceManager 上,由 ResourceManager 进行统一管理。
  • Dispatcher:负责接收客户端提交的执行程序,并传递给 JobManager 。除此之外,它还提供了一个 WEB UI 界面,用于监控作业的执行情况。
  • ResourceManager :负责管理 slots 并协调集群资源。ResourceManager 接收来自 JobManager 的资源请求,并将存在空闲 slots 的 TaskManagers 分配给 JobManager 执行任务。Flink 基于不同的部署平台,如 YARN , Mesos,K8s 等提供了不同的资源管理器,当 TaskManagers 没有足够的 slots 来执行任务时,它会向第三方平台发起会话来请求额外的资源。

img

Task & SubTask

上面我们提到:TaskManagers 实际执行的是 SubTask,而不是 Task,这里解释一下两者的区别:

在执行分布式计算时,Flink 将可以链接的操作 (operators) 链接到一起,这就是 Task。之所以这样做, 是为了减少线程间切换和缓冲而导致的开销,在降低延迟的同时可以提高整体的吞吐量。 但不是所有的 operator 都可以被链接,如下 keyBy 等操作会导致网络 shuffle 和重分区,因此其就不能被链接,只能被单独作为一个 Task。 简单来说,一个 Task 就是一个可以链接的最小的操作链 (Operator Chains) 。如下图,source 和 map 算子被链接到一块,因此整个作业就只有三个 Task:

img

解释完 Task ,我们在解释一下什么是 SubTask,其准确的翻译是: _A subtask is one parallel slice of a task_,即一个 Task 可以按照其并行度拆分为多个 SubTask。如上图,source & map 具有两个并行度,KeyBy 具有两个并行度,Sink 具有一个并行度,因此整个虽然只有 3 个 Task,但是却有 5 个 SubTask。Jobmanager 负责定义和拆分这些 SubTask,并将其交给 Taskmanagers 来执行,每个 SubTask 都是一个单独的线程。

4.3 资源管理

理解了 SubTasks ,我们再来看看其与 Slots 的对应情况。一种可能的分配情况如下:

img

这时每个 SubTask 线程运行在一个独立的 TaskSlot, 它们共享所属的 TaskManager 进程的 TCP 连接(通过多路复用技术)和心跳信息 (heartbeat messages),从而可以降低整体的性能开销。此时看似是最好的情况,但是每个操作需要的资源都是不尽相同的,这里假设该作业 keyBy 操作所需资源的数量比 Sink 多很多 ,那么此时 Sink 所在 Slot 的资源就没有得到有效的利用。

基于这个原因,Flink 允许多个 subtasks 共享 slots,即使它们是不同 tasks 的 subtasks,但只要它们来自同一个 Job 就可以。假设上面 souce & map 和 keyBy 的并行度调整为 6,而 Slot 的数量不变,此时情况如下:

img

可以看到一个 Task Slot 中运行了多个 SubTask 子任务,此时每个子任务仍然在一个独立的线程中执行,只不过共享一组 Sot 资源而已。那么 Flink 到底如何确定一个 Job 至少需要多少个 Slot 呢?Flink 对于这个问题的处理很简单,默认情况一个 Job 所需要的 Slot 的数量就等于其 Operation 操作的最高并行度。如下, A,B,D 操作的并行度为 4,而 C,E 操作的并行度为 2,那么此时整个 Job 就需要至少四个 Slots 来完成。通过这个机制,Flink 就可以不必去关心一个 Job 到底会被拆分为多少个 Tasks 和 SubTasks。

img

4.4 组件通讯

Flink 的所有组件都基于 Actor System 来进行通讯。Actor system 是多种角色的 actor 的容器,它提供调度,配置,日志记录等多种服务,并包含一个可以启动所有 actor 的线程池,如果 actor 是本地的,则消息通过共享内存进行共享,但如果 actor 是远程的,则通过 RPC 的调用来传递消息。

按照统计维度的不同,Flink 中的窗口可以分为 时间窗口 (Time Windows) 和 计数窗口 (Count Windows) 。

时间窗口

时间窗口 (Time Windows) 用于以时间为维度来进行数据聚合,具体分为以下四类:

Tumbling Windows

滚动窗口 (Tumbling Windows) 是指彼此之间没有重叠的窗口。例如:每隔 1 小时统计过去 1 小时内的商品点击量,那么 1 天就只能分为 24 个窗口,每个窗口彼此之间是不存在重叠的,具体如下:

img

Sliding Windows

滑动窗口(Sliding Windows)用于滚动进行聚合分析,例如:每隔 6 分钟统计一次过去一小时内所有商品的点击量,那么统计窗口彼此之间就是存在重叠的,即 1 天可以分为 240 个窗口。图示如下:

img

可以看到 window 1 - 4 这四个窗口彼此之间都存在着时间相等的重叠部分。想要实现滑动窗口,只需要在使用 timeWindow 方法时额外传递第二个参数作为滚动时间即可,具体如下:

1
2
// 每隔3秒统计一次过去1分钟内的数据
timeWindow(Time.minutes(1),Time.seconds(3))

Session Windows

当用户在进行持续浏览时,可能每时每刻都会有点击数据,例如在活动区间内,用户可能频繁的将某类商品加入和移除购物车,而你只想知道用户本次浏览最终的购物车情况,此时就可以在用户持有的会话结束后再进行统计。想要实现这类统计,可以通过会话窗口(Session Windows) 来进行实现。

img

具体的实现代码如下:

1
2
3
4
// 以处理时间为衡量标准,如果10秒内没有任何数据输入,就认为会话已经关闭,此时触发统计
window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
// 以事件时间为衡量标准
window(EventTimeSessionWindows.withGap(Time.seconds(10)))

Global Windows

最后一个窗口是全局窗口(Global Windows), 全局窗口会将所有 key 相同的元素分配到同一个窗口中,其通常配合触发器 (trigger) 进行使用。如果没有相应触发器,则计算将不会被执行。

img

这里继续以上面词频统计的案例为例,示例代码如下:

1
2
// 当单词累计出现的次数每达到10次时,则触发计算,计算整个窗口内该单词出现的总数
window(GlobalWindows.create()).trigger(CountTrigger.of(10)).sum(1).print();

统计窗口

统计窗口(Count Windows)用于以数量为维度来进行数据聚合,同样也分为滚动窗口和滑动窗口,实现方式也和时间窗口完全一致,只是调用的 API 不同,具体如下:

1
2
3
4
// 滚动计数窗口,每1000次点击则计算一次
countWindow(1000)
// 滑动计数窗口,每10次点击发生后,则计算过去1000次点击的情况
countWindow(1000,10)

实际上计数窗口内部就是调用的我们上一部分介绍的全局窗口来实现的,其源码如下:

1
2
3
4
5
6
7
8
9
10
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}


public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}

Flink 一个比较重要的特性就是其支持有状态计算。即你可以将中间的计算结果进行保存,并提供给后续的计算使用:


img

具体而言,Flink 又将状态 (State) 分为 Keyed State 与 Operator State:

2.1 算子状态

算子状态 (Operator State):顾名思义,状态是和算子进行绑定的,一个算子的状态不能被其他算子所访问到。官方文档上对 Operator State 的解释是:_each operator state is bound to one parallel operator instance_,所以更为确切的说一个算子状态是与一个并发的算子实例所绑定的,即假设算子的并行度是 2,那么其应有两个对应的算子状态:

img

2.2 键控状态

键控状态 (Keyed State) :是一种特殊的算子状态,即状态是根据 key 值进行区分的,Flink 会为每类键值维护一个状态实例。如下图所示,每个颜色代表不同 key 值,对应四个不同的状态实例。需要注意的是键控状态只能在 KeyedStream 上进行使用,我们可以通过 stream.keyBy(...) 来得到 KeyedStream

img

事件驱动型应用

什么是事件驱动型应用?

事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。

事件驱动型应用是在计算存储分离的传统应用基础上进化而来。在传统架构中,应用需要读写远程事务型数据库。

相反,事件驱动型应用是基于状态化流处理来完成。在该设计中,数据和计算不会分离,应用只需访问本地(内存或磁盘)即可获取数据。系统容错性的实现依赖于定期向远程持久化存储写入 checkpoint。下图描述了传统应用和事件驱动型应用架构的区别。

img

事件驱动型应用的优势?

事件驱动型应用无须查询远程数据库,本地数据访问使得它具有更高的吞吐和更低的延迟。而由于定期向远程持久化存储的 checkpoint 工作可以异步、增量式完成,因此对于正常事件处理的影响甚微。事件驱动型应用的优势不仅限于本地数据访问。传统分层架构下,通常多个应用会共享同一个数据库,因而任何对数据库自身的更改(例如:由应用更新或服务扩容导致数据布局发生改变)都需要谨慎协调。反观事件驱动型应用,由于只需考虑自身数据,因此在更改数据表示或服务扩容时所需的协调工作将大大减少。

事件驱动型应用会受制于底层流处理系统对时间和状态的把控能力,Flink 诸多优秀特质都是围绕这些方面来设计的。它提供了一系列丰富的状态操作原语,允许以精确一次的一致性语义合并海量规模(TB 级别)的状态数据。此外,Flink 还支持事件时间和自由度极高的定制化窗口逻辑,而且它内置的 ProcessFunction 支持细粒度时间控制,方便实现一些高级业务逻辑。同时,Flink 还拥有一个复杂事件处理(CEP)类库,可以用来检测数据流中的模式。

Flink 中针对事件驱动应用的明星特性当属 savepoint。Savepoint 是一个一致性的状态映像,它可以用来初始化任意状态兼容的应用。在完成一次 savepoint 后,即可放心对应用升级或扩容,还可以启动多个版本的应用来完成 A/B 测试。

典型的事件驱动型应用实例

数据分析应用

什么是数据分析应用?

数据分析任务需要从原始数据中提取有价值的信息和指标。传统的分析方式通常是利用批查询,或将事件记录下来并基于此有限数据集构建应用来完成。为了得到最新数据的分析结果,必须先将它们加入分析数据集并重新执行查询或运行应用,随后将结果写入存储系统或生成报告。

借助一些先进的流处理引擎,还可以实时地进行数据分析。和传统模式下读取有限数据集不同,流式查询或应用会接入实时事件流,并随着事件消费持续产生和更新结果。这些结果数据可能会写入外部数据库系统或以内部状态的形式维护。仪表展示应用可以相应地从外部数据库读取数据或直接查询应用的内部状态。

如下图所示,Apache Flink 同时支持流式及批量分析应用。

img

流式分析应用的优势?

和批量分析相比,由于流式分析省掉了周期性的数据导入和查询过程,因此从事件中获取指标的延迟更低。不仅如此,批量查询必须处理那些由定期导入和输入有界性导致的人工数据边界,而流式查询则无须考虑该问题。

另一方面,流式分析会简化应用抽象。批量查询的流水线通常由多个独立部件组成,需要周期性地调度提取数据和执行查询。如此复杂的流水线操作起来并不容易,一旦某个组件出错将会影响流水线的后续步骤。而流式分析应用整体运行在 Flink 之类的高端流处理系统之上,涵盖了从数据接入到连续结果计算的所有步骤,因此可以依赖底层引擎提供的故障恢复机制。

Flink 为持续流式分析和批量分析都提供了良好的支持。具体而言,它内置了一个符合 ANSI 标准的 SQL 接口,将批、流查询的语义统一起来。无论是在记录事件的静态数据集上还是实时事件流上,相同 SQL 查询都会得到一致的结果。同时 Flink 还支持丰富的用户自定义函数,允许在 SQL 中执行定制化代码。如果还需进一步定制逻辑,可以利用 Flink DataStream API 和 DataSet API 进行更低层次的控制。此外,Flink 的 Gelly 库为基于批量数据集的大规模高性能图分析提供了算法和构建模块支持。

典型的数据分析应用实例

数据管道应用

什么是数据管道?

提取-转换-加载(ETL)是一种在存储系统之间进行数据转换和迁移的常用方法。ETL 作业通常会周期性地触发,将数据从事务型数据库拷贝到分析型数据库或数据仓库。

数据管道和 ETL 作业的用途相似,都可以转换、丰富数据,并将其从某个存储系统移动到另一个。但数据管道是以持续流模式运行,而非周期性触发。因此它支持从一个不断生成数据的源头读取记录,并将它们以低延迟移动到终点。例如:数据管道可以用来监控文件系统目录中的新文件,并将其数据写入事件日志;另一个应用可能会将事件流物化到数据库或增量构建和优化查询索引。

下图描述了周期性 ETL 作业和持续数据管道的差异。

img

数据管道的优势?

和周期性 ETL 作业相比,持续数据管道可以明显降低将数据移动到目的端的延迟。此外,由于它能够持续消费和发送数据,因此用途更广,支持用例更多。

很多常见的数据转换和增强操作可以利用 Flink 的 SQL 接口(或 Table API)及用户自定义函数解决。如果数据管道有更高级的需求,可以选择更通用的 DataStream API 来实现。Flink 为多种数据存储系统(如:Kafka、Kinesis、Elasticsearch、JDBC 数据库系统等)内置了连接器。同时它还提供了文件系统的连续型数据源及数据汇,可用来监控目录变化和以时间分区的方式写入文件。

典型的数据管道应用实例

参考资料

MapReduce

MapReduce 简介

MapReduce 是 Hadoop 项目中的分布式计算框架。它降低了分布式计算的门槛,可以让用户轻松编写程序,让其以可靠、容错的方式运行在大型集群上并行处理海量数据(TB 级)。

MapReduce 的设计思路是:

  • 分而治之,并行计算
  • 移动计算,而非移动数据

MapReduce 作业通过将输入的数据集拆分为独立的块,这些块由 map 任务以并行的方式处理。框架对 map 的输出进行排序,然后将其输入到 reduce 任务中。作业的输入和输出都存储在文件系统中。该框架负责调度任务、监控任务并重新执行失败的任务。

通常,计算节点和存储节点是相同的,即 MapReduce 框架和 HDFS 在同一组节点上运行。此配置允许框架在已存在数据的节点上有效地调度任务,从而在整个集群中实现非常高的聚合带宽。

MapReduce 框架由一个主 ResourceManager、每个集群节点一个工作程序 NodeManager 和每个应用程序的 MRAppMaster (YARN 组件) 组成。

MapReduce 框架仅对 <key、value> 对进行作,也就是说,框架将作业的输入视为一组 <key、value> 对,并生成一组 <key、value> 对作为作业的输出,可以想象是不同的类型。类必须可由框架序列化,因此需要实现 Writable 接口。此外,关键类必须实现 WritableComparable 接口,以便于按框架进行排序。

MapReduce 作业的 Input 和 Output 类型:

1
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

MapReduce 的特点

  • 计算跟着数据走
  • 良好的扩展性:计算能力随着节点数增加,近似线性递增
  • 高容错
  • 状态监控
  • 适合海量数据的离线批处理
  • 降低了分布式编程的门槛

MapReduce 应用场景

适用场景:

  • 数据统计,如:网站的 PV、UV 统计
  • 搜索引擎构建索引
  • 海量数据查询

不适用场景:

  • OLAP - 要求毫秒或秒级返回结果
  • 流计算 - 流计算的输入数据集是动态的,而 MapReduce 是静态的
  • DAG 计算
    • 多个作业存在依赖关系,后一个的输入是前一个的输出,构成有向无环图 DAG
    • 每个 MapReduce 作业的输出结果都会落盘,造成大量磁盘 IO,导致性能非常低下

MapReduce 工作流

MapReduce 编程模型:MapReduce 程序被分为 Map(映射)阶段和 Reduce(化简)阶段。

img

  1. input : 读取文本文件;
  2. splitting : 将文件按照行进行拆分,此时得到的 K1 行数,V1 表示对应行的文本内容;
  3. mapping : 并行将每一行按照空格进行拆分,拆分得到的 List(K2,V2),其中 K2 代表每一个单词,由于是做词频统计,所以 V2 的值为 1,代表出现 1 次;
  4. shuffling:由于 Mapping 操作可能是在不同的机器上并行处理的,所以需要通过 shuffling 将相同 key 值的数据分发到同一个节点上去合并,这样才能统计出最终的结果,此时得到 K2 为每一个单词,List(V2) 为可迭代集合,V2 就是 Mapping 中的 V2;
  5. Reducing : 这里的案例是统计单词出现的总次数,所以 ReducingList(V2) 进行归约求和操作,最终输出。

MapReduce 编程模型中 splittingshuffing 操作都是由框架实现的,需要我们自己编程实现的只有 mappingreducing,这也就是 MapReduce 这个称呼的来源。

MapReduce 组件

MapReduce 有以下核心组件:

  • Job - Job 表示 MapReduce 作业配置。Job 通常用于指定 Mapper、combiner(如果有)、PartitionerReducerInputFormatOutputFormat 实现。
  • Mapper - Mapper 负责将输入键值对映射到一组中间键值对。转换的中间记录不需要与输入记录具有相同的类型。一个给定的输入键值对可能映射到零个或多个输出键值对。
  • Combiner - combinermap 运算后的可选操作,它实际上是一个本地化的 reduce 操作。它执行中间输出的本地聚合,这有助于减少从 Mapper 传输到 Reducer 的数据量。
  • Reducer - Reducer 将共享一个 key 的一组中间值归并为一个小的数值集。Reducer 有 3 个主要子阶段:shuffle,sort 和 reduce。
    • shuffle - Reducer 的输入就是 mapper 的排序输出。在这个阶段,框架通过 HTTP 获取所有 mapper 输出的相关分区。
    • sort - 在这个阶段中,框架将按照 key (因为不同 mapper 的输出中可能会有相同的 key) 对 Reducer 的输入进行分组。shuffle 和 sort 两个阶段是同时发生的。
    • reduce - 对按键分组的数据进行聚合统计。
  • Partitioner - Partitioner 负责控制 map 中间输出结果的键的分区。
    • 键(或者键的子集)用于产生分区,通常通过一个散列函数。
    • 分区总数与作业的 reduce 任务数是一样的。因此,它控制中间输出结果(也就是这条记录)的键发送给 m 个 reduce 任务中的哪一个来进行 reduce 操作。
  • InputFormat - InputFormat 描述 MapReduce 作业的输入规范。MapReduce 框架依赖作业的 InputFormat 来完成以下工作:
    • 确认作业的输入规范。
    • 把输入文件分割成多个逻辑的 InputSplit 实例,然后将每个实例分配给一个单独的 Mapper。InputSplit 表示要由单个 Mapper 处理的数据。
    • 提供 RecordReader 的实现。RecordReaderInputSplit 中读取 <key, value> 对,并提供给 Mapper 实现进行处理。
  • OutputFormat - OutputFormat 描述 MapReduce 作业的输出规范。MapReduce 框架依赖作业的 OutputFormat 来完成以下工作:
    • 确认作业的输出规范,例如检查输出路径是否已经存在。
    • 提供 RecordWriter 实现。RecordWriter 将输出 <key, value> 对到文件系统。

img

参考资料

大数据学习路线

大数据简介

移动计算

传统的软件计算处理模型,都是“输入 -> 计算 -> 输出”模型。

如何解决 PB 级数据进行计算的问题呢?

采用分布式集群的解决方案,用数千台甚至上万台计算机构建一个大数据计算处理集群,利用更多的网络带宽、内存空间、磁盘容量、CPU 核心数去进行计算处理。

大数据计算处理通常针对的是网站的存量数据,网站大数据系统要做的就是将这些统计规律和关联关系计算出来,并由此进一步改善网站的用户体验和运营决策。

将程序分发到数据所在的地方进行计算,也就是所谓的移动计算比移动数据更划算。

大数据存储

大规模数据存储的核心问题:

  • 数据存储容量
  • 数据读写速度
  • 数据可靠性

解决方案:水平伸缩

大数据处理流程

img

1.1 数据采集

大数据处理的第一步是数据的收集。现在的中大型项目通常采用微服务架构进行分布式部署,所以数据的采集需要在多台服务器上进行,且采集过程不能影响正常业务的开展。基于这种需求,就衍生了多种日志收集工具,如 Flume 、Logstash、Kibana 等,它们都能通过简单的配置完成复杂的数据收集和数据聚合。

1.2 数据存储

收集到数据后,下一个问题就是:数据该如何进行存储?通常大家最为熟知是 MySQL、Oracle 等传统的关系型数据库,它们的优点是能够快速存储结构化的数据,并支持随机访问。但大数据的数据结构通常是半结构化(如日志数据)、甚至是非结构化的(如视频、音频数据),为了解决海量半结构化和非结构化数据的存储,衍生了 Hadoop HDFS 、KFS、GFS 等分布式文件系统,它们都能够支持结构化、半结构和非结构化数据的存储,并可以通过增加机器进行横向扩展。

分布式文件系统完美地解决了海量数据存储的问题,但是一个优秀的数据存储系统需要同时考虑数据存储和访问两方面的问题,比如你希望能够对数据进行随机访问,这是传统的关系型数据库所擅长的,但却不是分布式文件系统所擅长的,那么有没有一种存储方案能够同时兼具分布式文件系统和关系型数据库的优点,基于这种需求,就产生了 HBase、MongoDB。

1.3 数据分析

大数据处理最重要的环节就是数据分析,数据分析通常分为两种:批处理和流处理。

  • 批处理:对一段时间内海量的离线数据进行统一的处理,对应的处理框架有 Hadoop MapReduce、Spark、Flink 等;
  • 流处理:对运动中的数据进行处理,即在接收数据的同时就对其进行处理,对应的处理框架有 Storm、Spark Streaming、Flink Streaming 等。

批处理和流处理各有其适用的场景,时间不敏感或者硬件资源有限,可以采用批处理;时间敏感和实时性要求高就可以采用流处理。随着服务器硬件的价格越来越低和大家对及时性的要求越来越高,流处理越来越普遍,如股票价格预测和电商运营数据分析等。

上面的框架都是需要通过编程来进行数据分析,那么如果你不是一个后台工程师,是不是就不能进行数据的分析了?当然不是,大数据是一个非常完善的生态圈,有需求就有解决方案。为了能够让熟悉 SQL 的人员也能够进行数据的分析,查询分析框架应运而生,常用的有 Hive 、Spark SQL 、Flink SQL、 Pig、Phoenix 等。这些框架都能够使用标准的 SQL 或者 类 SQL 语法灵活地进行数据的查询分析。这些 SQL 经过解析优化后转换为对应的作业程序来运行,如 Hive 本质上就是将 SQL 转换为 MapReduce 作业,Spark SQL 将 SQL 转换为一系列的 RDDs 和转换关系(transformations),Phoenix 将 SQL 查询转换为一个或多个 HBase Scan。

1.4 数据应用

数据分析完成后,接下来就是数据应用的范畴,这取决于你实际的业务需求。比如你可以将数据进行可视化展现,或者将数据用于优化你的推荐算法,这种运用现在很普遍,比如短视频个性化推荐、电商商品推荐、头条新闻推荐等。当然你也可以将数据用于训练你的机器学习模型,这些都属于其他领域的范畴,都有着对应的框架和技术栈进行处理,这里就不一一赘述。

1.5 其他框架

上面是一个标准的大数据处理流程所用到的技术框架。但是实际的大数据处理流程比上面复杂很多,针对大数据处理中的各种复杂问题分别衍生了各类框架:

  • 单机的处理能力都是存在瓶颈的,所以大数据框架都是采用集群模式进行部署,为了更方便的进行集群的部署、监控和管理,衍生了 Ambari、Cloudera Manager 等集群管理工具;
  • 想要保证集群高可用,需要用到 ZooKeeper ,ZooKeeper 是最常用的分布式协调服务,它能够解决大多数集群问题,包括首领选举、失败恢复、元数据存储及其一致性保证。同时针对集群资源管理的需求,又衍生了 Hadoop YARN ;
  • 复杂大数据处理的另外一个显著的问题是,如何调度多个复杂的并且彼此之间存在依赖关系的作业?基于这种需求,产生了 Azkaban 和 Oozie 等工作流调度框架;
  • 大数据流处理中使用的比较多的另外一个框架是 Kafka,它可以用于消峰,避免在秒杀等场景下并发数据对流处理程序造成冲击;
  • 另一个常用的框架是 Sqoop ,主要是解决了数据迁移的问题,它能够通过简单的命令将关系型数据库中的数据导入到 HDFS 、Hive 或 HBase 中,或者从 HDFS 、Hive 导出到关系型数据库上。

大数据学习路线

框架分类

日志收集框架:Flume 、Logstash、Kibana

分布式文件存储系统:Hadoop HDFS

数据库系统:Mongodb、HBase

分布式计算框架

  • 批处理框架:Hadoop MapReduce
  • 流处理框架:Storm
  • 混合处理框架:Spark、Flink

查询分析框架:Hive 、Spark SQL 、Flink SQL、 Pig、Phoenix

集群资源管理器:Hadoop YARN

分布式协调服务:Zookeeper

数据迁移工具:Sqoop

任务调度框架:Azkaban、Oozie

集群部署和监控:Ambari、Cloudera Manager

上面列出的都是比较主流的大数据框架,社区都很活跃,学习资源也比较丰富。建议从 Hadoop 开始入门学习,因为它是整个大数据生态圈的基石,其它框架都直接或者间接依赖于 Hadoop 。接着就可以学习计算框架,Spark 和 Flink 都是比较主流的混合处理框架,Spark 出现得较早,所以其应用也比较广泛。 Flink 是当下最火热的新一代的混合处理框架,其凭借众多优异的特性得到了众多公司的青睐。两者可以按照你个人喜好或者实际工作需要进行学习。

img

学习资料

大数据最权威和最全面的学习资料就是官方文档。热门的大数据框架社区都比较活跃、版本更新迭代也比较快,所以其出版物都明显滞后于其实际版本,基于这个原因采用书本学习不是一个最好的方案。比较庆幸的是,大数据框架的官方文档都写的比较好,内容完善,重点突出,同时都采用了大量配图进行辅助讲解。当然也有一些优秀的书籍历经时间的检验,至今依然很经典,这里列出部分个人阅读过的经典书籍:

视频学习资料

上面我推荐的都是书籍学习资料,很少推荐视频学习资料,这里说明一下原因:因为书籍历经时间的考验,能够再版的或者豆瓣等平台评价高的证明都是被大众所认可的,从概率的角度上来说,其必然更加优秀,不容易浪费大家的学习时间和精力,所以我个人更倾向于官方文档或者书本的学习方式,而不是视频。因为视频学习资料,缺少一个公共的评价平台和完善的评价机制,所以其质量良莠不齐。但是视频任然有其不可替代的好处,学习起来更直观、印象也更深刻,所以对于习惯视频学习的小伙伴,这里我各推荐一个免费的和付费的视频学习资源,大家按需选择:

参考资料

Java 虚拟机之类加载

类加载机制

类是在运行期间动态加载的。

类的加载指的是将类的 .class 文件中的二进制数据读入到内存中,将其放在运行时数据区的方法区内,然后在堆区创建一个java.lang.Class对象,用来封装类在方法区内的数据结构。类的加载的最终产品是位于堆区中的Class对象,Class对象封装了类在方法区内的数据结构,并且向 Java 程序员提供了访问方法区内的数据结构的接口。

类加载器并不需要等到某个类被“首次主动使用”时再加载它,JVM 规范允许类加载器在预料某个类将要被使用时就预先加载它,如果在预先加载的过程中遇到了.class 文件缺失或存在错误,类加载器必须在程序首次主动使用该类时才报告错误(LinkageError 错误)如果这个类一直没有被程序主动使用,那么类加载器就不会报告错误。

类的生命周期

Java 类的完整生命周期包括以下几个阶段:

  • 加载(Loading)
  • 链接(Linking)
    • 验证(Verification)
    • 准备(Preparation)
    • 解析(Resolution)
  • 初始化(Initialization)
  • 使用(Using)
  • 卸载(Unloading)

加载、验证、准备、初始化和卸载这 5 个阶段的顺序是确定的,类的加载过程必须按照这种顺序按部就班地开始。而解析过程在某些情况下可以在初始化阶段之后再开始,这是为了支持 Java 的动态绑定

类加载过程是指加载、验证、准备、解析和初始化这 5 个阶段。

(一)加载

加载是类加载的一个阶段,注意不要混淆。

加载,是指查找字节流,并且据此创建类的过程

加载过程完成以下三件事:

  • 通过一个类的全限定名来获取定义此类的二进制字节流。
  • 将这个字节流所代表的静态存储结构转化为方法区的运行时存储结构。
  • 在内存中生成一个代表这个类的 Class 对象,作为方法区这个类的各种数据的访问入口。

其中二进制字节流可以从以下方式中获取:

  • 从 ZIP 包读取,这很常见,最终成为日后 JAR、EAR、WAR 格式的基础。
  • 从网络中获取,这种场景最典型的应用是 Applet。
  • 运行时计算生成,这种场景使用得最多得就是动态代理技术,在 java.lang.reflect.Proxy 中,就是用了 ProxyGenerator.generateProxyClass 的代理类的二进制字节流。
  • 由其他文件生成,典型场景是 JSP 应用,即由 JSP 文件生成对应的 Class 类。
  • 从数据库读取,这种场景相对少见,例如有些中间件服务器(如 SAP Netweaver)可以选择把程序安装到数据库中来完成程序代码在集群间的分发。

更详细内容会在 3. ClassLoader 介绍。

(二)验证

验证是链接阶段的第一步。验证的目标是确保 Class 文件的字节流中包含的信息符合当前虚拟机的要求,并且不会危害虚拟机自身的安全。

验证阶段大致会完成 4 个阶段的检验动作:

  • 文件格式验证 - 验证字节流是否符合 Class 文件格式的规范,并且能被当前版本的虚拟机处理。
  • 元数据验证 - 对字节码描述的信息进行语义分析,以保证其描述的信息符合 Java 语言规范的要求。
  • 字节码验证 - 通过数据流和控制流分析,确保程序语义是合法、符合逻辑的。
  • 符号引用验证 - 发生在虚拟机将符号引用转换为直接引用的时候,对类自身以外(常量池中的各种符号引用)的信息进行匹配性校验。

验证阶段是非常重要的,但不是必须的,它对程序运行期没有影响,如果所引用的类经过反复验证,那么可以考虑采用 -Xverifynone 参数来关闭大部分的类验证措施,以缩短虚拟机类加载的时间。

(三)准备

类变量是被 static 修饰的变量,准备阶段为 static 变量在方法区分配内存并初始化为默认值,使用的是方法区的内存。

实例变量不会在这阶段分配内存,它将会在对象实例化时随着对象一起分配在 Java 堆中。(实例化不是类加载的一个过程,类加载发生在所有实例化操作之前,并且类加载只进行一次,实例化可以进行多次)

初始值一般为 0 值,例如下面的类变量 value 被初始化为 0 而不是 123。

1
public static int value = 123;

如果类变量是常量,那么会按照表达式来进行初始化,而不是赋值为 0。

1
public static final int value = 123;

准备阶段有以下几点需要注意:

  • 这时候进行内存分配的仅包括类变量(static),而不包括实例变量,实例变量会在对象实例化时随着对象一块分配在 Java 堆中。
  • 这里所设置的初始值通常情况下是数据类型默认的零值(如 00Lnullfalse 等),而不是被在 Java 代码中被显式地赋予的值。

假设一个类变量的定义为:public static int value = 3

那么变量 value 在准备阶段过后的初始值为 0,而不是 3,因为这时候尚未开始执行任何 Java 方法,而把 value 赋值为 3 的public static指令是在程序编译后,存放于类构造器()方法之中的,所以把 value 赋值为 3 的动作将在初始化阶段才会执行。

这里还需要注意如下几点:

  • 对基本数据类型来说,对于类变量(static)和全局变量,如果不显式地对其赋值而直接使用,则系统会为其赋予默认的零值,而对于局部变量来说,在使用前必须显式地为其赋值,否则编译时不通过。
  • 对于同时被 static 和 final 修饰的常量,必须在声明的时候就为其显式地赋值,否则编译时不通过;而只被 final 修饰的常量则既可以在声明时显式地为其赋值,也可以在类初始化时显式地为其赋值,总之,在使用前必须为其显式地赋值,系统不会为其赋予默认零值。
  • 对于引用数据类型 reference 来说,如数组引用、对象引用等,如果没有对其进行显式地赋值而直接使用,系统都会为其赋予默认的零值,即 null。
  • 如果在数组初始化时没有对数组中的各元素赋值,那么其中的元素将根据对应的数据类型而被赋予默认的零值。
  • 如果类字段的字段属性表中存在ConstantValue属性,即同时被 final 和 static 修饰,那么在准备阶段变量 value 就会被初始化为 ConstValue 属性所指定的值。

假设上面的类变量 value 被定义为: public static final int value = 3

编译时 Javac 将会为 value 生成 ConstantValue 属性,在准备阶段虚拟机就会根据ConstantValue的设置将 value 赋值为 3。我们可以理解为 static final 常量在编译期就将其结果放入了调用它的类的常量池中

(四)解析

在 class 文件被加载至 Java 虚拟机之前,这个类无法知道其他类及其方法、字段所对应的具体地址,甚至不知道自己方法、字段的地址。因此,每当需要引用这些成员时,Java 编译器会生成一个符号引用。在运行阶段,这个符号引用一般都能够无歧义地定位到具体目标上。

举例来说,对于一个方法调用,编译器会生成一个包含目标方法所在类的名字、目标方法的名字、接收参数类型以及返回值类型的符号引用,来指代所要调用的方法。

解析阶段目标是将常量池的符号引用替换为直接引用的过程。解析动作主要针对类或接口、字段、类方法、接口方法、方法类型、方法句柄和调用点限定符 7 类符号引用进行。

  • 符号引用(Symbolic References) - 符号引用以一组符号来描述所引用的目标,符号可以是任何形式的字面量,只要使用时能无歧义地定位到目标即可。
  • 直接引用(Direct Reference) - 直接引用可以是直接指向目标的指针、相对偏移量或是一个能间接定位到目标的句柄。

(五)初始化

在 Java 代码中,如果要初始化一个静态字段,可以在声明时直接赋值,也可以在静态代码块中对其赋值。

如果直接赋值的静态字段被 final 所修饰,并且它的类型是基本类型或字符串时,那么该字段便会被 Java 编译器标记成常量值(ConstantValue),其初始化直接由 Java 虚拟机完成。除此之外的直接赋值操作,以及所有静态代码块中的代码,则会被 Java 编译器置于同一方法中,并把它命名为 < clinit >

初始化阶段才真正开始执行类中的定义的 Java 程序代码。初始化,为类的静态变量赋予正确的初始值,JVM 负责对类进行初始化,主要对类变量进行初始化

类初始化方式

  • 声明类变量时指定初始值
  • 使用静态代码块为类变量指定初始值

在准备阶段,类变量已经赋过一次系统要求的初始值,而在初始化阶段,根据程序员通过程序制定的主观计划去初始化类变量和其它资源。

类初始化步骤

  1. 如果类还没有被加载和链接,开始加载该类。
  2. 如果该类的直接父类还没有被初始化,先初始化其父类。
  3. 如果该类有初始化语句,则依次执行这些初始化语句。

类初始化时机

只有主动引用类的时候才会导致类的初始化。

(1)主动引用

类的主动引用包括以下六种:

  • 创建类的实例 - 也就是 new 对象
  • 访问静态变量 - 访问某个类或接口的静态变量,或者对该静态变量赋值
  • 访问静态方法
  • 反射 - 如Class.forName(“com.shengsiyuan.Test”)
  • 初始化子类 - 初始化某个类的子类,则其父类也会被初始化
  • 启动类 - Java 虚拟机启动时被标明为启动类的类(Java Test),直接使用java.exe命令来运行某个主类

(2)被动引用

以上 5 种场景中的行为称为对一个类进行主动引用。除此之外,所有引用类的方式都不会触发初始化,称为被动引用。被动引用的常见例子包括:

  • 通过子类引用父类的静态字段,不会导致子类初始化
1
System.out.println(SubClass.value); // value 字段在 SuperClass 中定义
  • 通过数组定义来引用类,不会触发此类的初始化。该过程会对数组类进行初始化,数组类是一个由虚拟机自动生成的、直接继承自 Object 的子类,其中包含了数组的属性和方法。
1
SuperClass[] sca = new SuperClass[10];
  • 常量在编译阶段会存入调用类的常量池中,本质上并没有直接引用到定义常量的类,因此不会触发定义常量的类的初始化
1
System.out.println(ConstClass.HELLOWORLD);

类初始化细节

类初始化 <clinit>() 方法的细节:

  • 是由编译器自动收集类中所有类变量的赋值动作和静态语句块(static{} 块)中的语句合并产生的,编译器收集的顺序由语句在源文件中出现的顺序决定。特别注意的是,静态语句块只能访问到定义在它之前的类变量,定义在它之后的类变量只能赋值,不能访问。例如以下代码:
1
2
3
4
5
6
7
public class Test {
static {
i = 0; // 给变量赋值可以正常编译通过
System.out.print(i); // 这句编译器会提示“非法向前引用”
}
static int i = 1;
}
  • 与类的构造函数(或者说实例构造器 <init>())不同,不需要显式的调用父类的构造器。虚拟机会自动保证在子类的 <clinit>() 方法运行之前,父类的 <clinit>() 方法已经执行结束。因此虚拟机中第一个执行 <clinit>() 方法的类肯定为 java.lang.Object
  • 由于父类的 <clinit>() 方法先执行,也就意味着父类中定义的静态语句块要优于子类的变量赋值操作。例如以下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
static class Parent {
public static int A = 1;
static {
A = 2;
}
}

static class Sub extends Parent {
public static int B = A;
}

public static void main(String[] args) {
System.out.println(Sub.B); // 输出结果是父类中的静态变量 A 的值,也就是 2。
}
  • <clinit>() 方法对于类或接口不是必须的,如果一个类中不包含静态语句块,也没有对类变量的赋值操作,编译器可以不为该类生成 <clinit>() 方法。
  • 接口中不可以使用静态语句块,但仍然有类变量初始化的赋值操作,因此接口与类一样都会生成 <clinit>() 方法。但接口与类不同的是,执行接口的 <clinit>() 方法不需要先执行父接口的 <clinit>() 方法。只有当父接口中定义的变量使用时,父接口才会初始化。另外,接口的实现类在初始化时也一样不会执行接口的 <clinit>() 方法。
  • 虚拟机会保证一个类的 <clinit>() 方法在多线程环境下被正确的加锁和同步,如果多个线程同时初始化一个类,只会有一个线程执行这个类的 <clinit>() 方法,其它线程都会阻塞等待,直到活动线程执行 <clinit>() 方法完毕。如果在一个类的 <clinit>() 方法中有耗时的操作,就可能造成多个线程阻塞,在实际过程中此种阻塞很隐蔽。

ClassLoader

ClassLoader 即类加载器,负责将类加载到 JVM。在 Java 虚拟机外部实现,以便让应用程序自己决定如何去获取所需要的类。

JVM 加载 class 文件到内存有两种方式:

  • 隐式加载 - JVM 自动加载需要的类到内存中。
  • 显示加载 - 通过使用 ClassLoader 来加载一个类到内存中。

类与类加载器

如何判断两个类是否相等:类本身相等,并且使用同一个类加载器进行加载。这是因为每一个 ClassLoader 都拥有一个独立的类名称空间

这里的相等,包括类的 Class 对象的 equals() 方法、isAssignableFrom() 方法、isInstance() 方法的返回结果为 true,也包括使用 instanceof 关键字做对象所属关系判定结果为 true。

类加载器分类

img

Bootstrap ClassLoader

Bootstrap ClassLoader ,即启动类加载器 ,负责加载 JVM 自身工作所需要的类

Bootstrap ClassLoader 会将存放在 <JAVA_HOME>\lib 目录中的,或者被 -Xbootclasspath 参数所指定的路径中的,并且是虚拟机识别的(仅按照文件名识别,如 rt.jar,名字不符合的类库即使放在 lib 目录中也不会被加载)类库加载到虚拟机内存中

Bootstrap ClassLoader 是由 C++ 实现的,它完全由 JVM 自己控制的,启动类加载器无法被 Java 程序直接引用,用户在编写自定义类加载器时,如果需要把加载请求委派给启动类加载器,直接使用 null 代替即可。

ExtClassLoader

ExtClassLoader,即扩展类加载器,这个类加载器是由 ExtClassLoader(sun.misc.Launcher\$ExtClassLoader)实现的。

ExtClassLoader 负责将 <JAVA_HOME>\lib\ext 或者被 java.ext.dir 系统变量所指定路径中的所有类库加载到内存中,开发者可以直接使用扩展类加载器

AppClassLoader

AppClassLoader,即应用程序类加载器,这个类加载器是由 AppClassLoader(sun.misc.Launcher\$AppClassLoader) 实现的。由于这个类加载器是 ClassLoader 中的 getSystemClassLoader() 方法的返回值,因此一般称为系统类加载器。

AppClassLoader 负责加载用户类路径(即 classpath)上所指定的类库,开发者可以直接使用这个类加载器,如果应用程序中没有自定义过自己的类加载器,一般情况下这个就是程序中默认的类加载器。

自定义类加载器

自定义类加载器可以做到如下几点:

  • 在执行非置信代码之前,自动验证数字签名。
  • 动态地创建符合用户特定需要的定制化构建类。
  • 从特定的场所取得 java class,例如数据库中和网络中。

假设,我们需要自定义一个名为 FileSystemClassLoader 的类加载器,继承自 java.lang.ClassLoader,用于加载文件系统上的类。它首先根据类的全名在文件系统上查找类的字节代码文件(.class 文件),然后读取该文件内容,最后通过 defineClass() 方法来把这些字节代码转换成 java.lang.Class 类的实例。

java.lang.ClassLoader 类的方法 loadClass() 实现了双亲委派模型的逻辑,因此自定义类加载器一般不去覆写它,而是通过覆写 findClass() 方法。

ClassLoader 常用的场景:

  • 容器 - 典型应用:Servlet 容器(如:Tomcat、Jetty)、udf (Mysql、Hive)等。加载解压 jar 包或 war 包后,加载其 Class 到指定的类加载器中运行(通常需要考虑空间隔离)。
  • 热部署、热插拔 - 应用启动后,动态获得某个类信息,然后加载到 JVM 中工作。很多著名的容器软件、框架(如:Spring 等),都使用 ClassLoader 来实现自身的热部署。

【示例】自定义一个类加载器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class FileSystemClassLoader extends ClassLoader {

private String rootDir;

public FileSystemClassLoader(String rootDir) {
this.rootDir = rootDir;
}

protected Class<?> findClass(String name) throws ClassNotFoundException {
byte[] classData = getClassData(name);
if (classData == null) {
throw new ClassNotFoundException();
} else {
return defineClass(name, classData, 0, classData.length);
}
}

private byte[] getClassData(String className) {
String path = classNameToPath(className);
try {
InputStream ins = new FileInputStream(path);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
int bufferSize = 4096;
byte[] buffer = new byte[bufferSize];
int bytesNumRead;
while ((bytesNumRead = ins.read(buffer)) != -1) {
baos.write(buffer, 0, bytesNumRead);
}
return baos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

private String classNameToPath(String className) {
return rootDir + File.separatorChar
+ className.replace('.', File.separatorChar) + ".class";
}
}

双亲委派

理解双亲委派之前,先让我们看一个示例。

【示例】寻找类加载示例

1
2
3
4
5
6
public static void main(String[] args) {
ClassLoader loader = Thread.currentThread().getContextClassLoader();
System.out.println(loader);
System.out.println(loader.getParent());
System.out.println(loader.getParent().getParent());
}

输出:

1
2
3
sun.misc.Launcher$AppClassLoader@18b4aac2
sun.misc.Launcher$ExtClassLoader@19e1023e
null

从上面的结果可以看出,并没有获取到 ExtClassLoader 的父 Loader,原因是 Bootstrap Loader(引导类加载器)是用 C 语言实现的,找不到一个确定的返回父 Loader 的方式,于是就返回 null。

下图展示的类加载器之间的层次关系,称为类加载器的双亲委派模型(Parents Delegation Model)该模型要求除了顶层的 Bootstrap ClassLoader 外,其余的类加载器都应有自己的父类加载器这里类加载器之间的父子关系一般通过组合(Composition)关系来实现,而不是通过继承(Inheritance)的关系实现

(1)工作过程

一个类加载器首先将类加载请求传送到父类加载器,只有当父类加载器无法完成类加载请求时才尝试加载

(2)好处

使得 Java 类随着它的类加载器一起具有一种带有优先级的层次关系,从而使得基础类得到统一:

  • 系统类防止内存中出现多份同样的字节码
  • 保证 Java 程序安全稳定运行

例如: java.lang.Object 存放在 rt.jar 中,如果编写另外一个 java.lang.Object 的类并放到 classpath 中,程序可以编译通过。因为双亲委派模型的存在,所以在 rt.jar 中的 Object 比在 classpath 中的 Object 优先级更高,因为 rt.jar 中的 Object 使用的是启动类加载器,而 classpath 中的 Object 使用的是应用程序类加载器。正因为 rt.jar 中的 Object 优先级更高,因为程序中所有的 Object 都是这个 Object

(3)实现

以下是抽象类 java.lang.ClassLoader 的代码片段,其中的 loadClass() 方法运行过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public abstract class ClassLoader {
// The parent class loader for delegation
private final ClassLoader parent;

public Class<?> loadClass(String name) throws ClassNotFoundException {
return loadClass(name, false);
}

protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
synchronized (getClassLoadingLock(name)) {
// 首先判断该类型是否已经被加载
Class<?> c = findLoadedClass(name);
if (c == null) {
// 如果没有被加载,就委托给父类加载或者委派给启动类加载器加载
try {
if (parent != null) {
// 如果存在父类加载器,就委派给父类加载器加载
c = parent.loadClass(name, false);
} else {
// 如果不存在父类加载器,就检查是否是由启动类加载器加载的类,通过调用本地方法native Class findBootstrapClass(String name)
c = findBootstrapClassOrNull(name);
}
} catch (ClassNotFoundException e) {
// 如果父类加载器加载失败,会抛出 ClassNotFoundException
}

if (c == null) {
// 如果父类加载器和启动类加载器都不能完成加载任务,才调用自身的加载功能
c = findClass(name);
}
}
if (resolve) {
resolveClass(c);
}
return c;
}
}

protected Class<?> findClass(String name) throws ClassNotFoundException {
throw new ClassNotFoundException(name);
}
}

【说明】

  • 先检查类是否已经加载过,如果没有则让父类加载器去加载。
  • 当父类加载器加载失败时抛出 ClassNotFoundException,此时尝试自己去加载。

ClassLoader 参数

在生产环境上启动 java 应用时,通常会指定一些 ClassLoader 参数,以加载应用所需要的 lib:

1
java -jar xxx.jar -classpath lib/*

ClassLoader 相关参数选项:

参数选项 ClassLoader 类型 说明
-Xbootclasspath Bootstrap ClassLoader 设置 Bootstrap ClassLoader 搜索路径。【不常用】
-Xbootclasspath/a Bootstrap ClassLoader 把路径添加到已存在的 Bootstrap ClassLoader 搜索路径后面。【常用】
-Xbootclasspath/p Bootstrap ClassLoader 把路径添加到已存在的 Bootstrap ClassLoader 搜索路径前面。【不常用】
-Djava.ext.dirs ExtClassLoader 设置 ExtClassLoader 搜索路径。
-Djava.class.path-cp-classpath AppClassLoader 设置 AppClassLoader 搜索路径。

类的加载

类加载方式

类加载有三种方式:

  • 命令行启动应用时候由 JVM 初始化加载
  • 通过 Class.forName() 方法动态加载
  • 通过 ClassLoader.loadClass() 方法动态加载

Class.forName()ClassLoader.loadClass() 区别

  • Class.forName() 将类的 .class 文件加载到 jvm 中之外,还会对类进行解释,执行类中的 static 块;
  • ClassLoader.loadClass() 只干一件事情,就是将 .class 文件加载到 jvm 中,不会执行 static 中的内容,只有在 newInstance 才会去执行 static 块。
  • Class.forName(name, initialize, loader) 带参函数也可控制是否加载 static 块。并且只有调用了 newInstance() 方法采用调用构造函数,创建类的对象 。

加载类错误

ClassNotFoundException

ClassNotFoundException 异常出镜率极高。**ClassNotFoundException 表示当前 classpath 下找不到指定类**。

常见问题原因:

  • 调用 ClassforName() 方法,未找到类。
  • 调用 ClassLoader 中的 loadClass() 方法,未找到类。
  • 调用 ClassLoader 中的 findSystemClass() 方法,未找到类。

【示例】执行以下代码,会抛出 ClassNotFoundException 异常:

1
2
3
4
5
6
7
8
9
public class ClassNotFoundExceptionDemo {
public static void main(String[] args) {
try {
Class.forName("NotFound");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}

解决方法:检查 classpath 下有没有相应的 class 文件。

NoClassDefFoundError

常见问题原因:

  • 类依赖的 Class 或者 jar 不存在。
  • 类文件存在,但是存在不同的域中。

解决方法:现代 Java 项目,一般使用 mavengradle 等构建工具管理项目,仔细检查找不到的类所在的 jar 包是否已添加为依赖。

UnsatisfiedLinkError

这个异常倒不是很常见,但是出错的话,通常是在 JVM 启动的时候如果一不小心将在 JVM 中的某个 lib 删除了,可能就会报这个错误了。

【示例】执行以下代码,会抛出 UnsatisfiedLinkError 错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class UnsatisfiedLinkErrorDemo {

public native void nativeMethod();

static {
System.loadLibrary("NoLib");
}

public static void main(String[] args) {
new UnsatisfiedLinkErrorDemo().nativeMethod();
}

}

【输出】

1
2
3
4
5
java.lang.UnsatisfiedLinkError: no NoLib in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at io.github.dunwu.javacore.jvm.classloader.exception.UnsatisfiedLinkErrorDemo.<clinit>(UnsatisfiedLinkErrorDemo.java:12)

ClassCastException

ClassCastException 异常通常是在程序中强制类型转换失败时出现。

【示例】执行以下代码,会抛出 ClassCastException 异常。

1
2
3
4
5
6
7
8
9
10
public class ClassCastExceptionDemo {

public static void main(String[] args) {
Object obj = new Object();
EmptyClass newObj = (EmptyClass) obj;
}

static class EmptyClass {}

}

【输出】

1
2
Exception in thread "main" java.lang.ClassCastException: java.lang.Object cannot be cast to io.github.dunwu.javacore.jvm.classloader.exception.ClassCastExceptionDemo$EmptyClass
at io.github.dunwu.javacore.jvm.classloader.exception.ClassCastExceptionDemo.main(ClassCastExceptionDemo.java:11)

参考资料

Elastic 快速入门

开源协议:Apache 2.0

简介

Elastic Stack 是什么

Elastic StackELK Stack

ELK 是指 Elastic 公司旗下三款产品 ElasticSearchLogstashKibana 的首字母组合。

  • Elasticsearch 是一个搜索和分析引擎。
  • Logstash 是服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到诸如 Elasticsearch 等“存储库”中。
  • Kibana 则可以让用户在 Elasticsearch 中使用图形和图表对数据进行可视化。

而 Elastic Stack 是 ELK Stack 的更新换代产品,最新产品引入了轻量型的单一功能数据采集器,并把它们叫做 Beats

为什么使用 Elastic Stack

对于有一定规模的公司来说,通常会很多个应用,并部署在大量的服务器上。运维和开发人员常常需要通过查看日志来定位问题。如果应用是集群化部署,试想如果登录一台台服务器去查看日志,是多么费时费力。

而通过 ELK 这套解决方案,可以同时实现日志收集、日志搜索和日志分析的功能。

Elastic Stack 架构

img

说明

以上是 Elastic Stack 的一个架构图。从图中可以清楚的看到数据流向。

  • Beats 是单一用途的数据传输平台,它可以将多台机器的数据发送到 Logstash 或 ElasticSearch。但 Beats 并不是不可或缺的一环,所以本文中暂不介绍。
  • Logstash 是一个动态数据收集管道。支持以 TCP/UDP/HTTP 多种方式收集数据(也可以接受 Beats 传输来的数据),并对数据做进一步丰富或提取字段处理。
  • ElasticSearch 是一个基于 JSON 的分布式的搜索和分析引擎。作为 ELK 的核心,它集中存储数据。
  • Kibana 是 ELK 的用户界面。它将收集的数据进行可视化展示(各种报表、图形化数据),并提供配置、管理 ELK 的界面。

ElasticSearch

Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。 作为 Elastic Stack 的核心,它集中存储您的数据,帮助您发现意料之中以及意料之外的情况。

ElasticSearch 简介

Elasticsearch 基于搜索库 Lucene 开发。ElasticSearch 隐藏了 Lucene 的复杂性,提供了简单易用的 REST API / Java API 接口(另外还有其他语言的 API 接口)。

ElasticSearch 可以视为一个文档存储,它将复杂数据结构序列化为 JSON 存储

ElasticSearch 是近乎于实时的全文搜素,这是指:

  • 从写入数据到数据可以被搜索,存在较小的延迟(大概是 1s)
  • 基于 ES 执行搜索和分析可以达到秒级

2.1.1. 核心概念

  • 索引(Index) 可以认为是文档(document)的优化集合。
  • 每个 文档(document) 都是字段(field)的集合。
  • 字段(field) 是包含数据的键值对。
  • 默认情况下,Elasticsearch 对每个字段中的所有数据建立索引,并且每个索引字段都具有专用的优化数据结构。
  • 每个索引里可以有一个或者多个类型(type)。类型(type) 是 index 的一个逻辑分类,
  • 当单台机器不足以存储大量数据时,Elasticsearch 可以将一个索引中的数据切分为多个 分片(shard)分片(shard) 分布在多台服务器上存储。有了 shard 就可以横向扩展,存储更多数据,让搜索和分析等操作分布到多台服务器上去执行,提升吞吐量和性能。每个 shard 都是一个 lucene index。
  • 任何一个服务器随时可能故障或宕机,此时 shard 可能就会丢失,因此可以为每个 shard 创建多个 **副本(replica)**。replica 可以在 shard 故障时提供备用服务,保证数据不丢失,多个 replica 还可以提升搜索操作的吞吐量和性能。primary shard(建立索引时一次设置,不能修改,默认 5 个),replica shard(随时修改数量,默认 1 个),默认每个索引 10 个 shard,5 个 primary shard,5 个 replica shard,最小的高可用配置,是 2 台服务器。

ElasticSearch 原理

2.2.1. ES 写数据过程

  • 客户端选择一个 node 发送请求过去,这个 node 就是 coordinating node(协调节点)。
  • coordinating node 对 document 进行路由,将请求转发给对应的 node(有 primary shard)。
  • 实际的 node 上的 primary shard 处理请求,然后将数据同步到 replica node
  • coordinating node 如果发现 primary node 和所有 replica node 都搞定之后,就返回响应结果给客户端。

es-write

2.2.2. es 读数据过程

可以通过 doc id 来查询,会根据 doc id 进行 hash,判断出来当时把 doc id 分配到了哪个 shard 上面去,从那个 shard 去查询。

  • 客户端发送请求到任意一个 node,成为 coordinate node
  • coordinate nodedoc id 进行哈希路由,将请求转发到对应的 node,此时会使用 round-robin 随机轮询算法,在 primary shard 以及其所有 replica 中随机选择一个,让读请求负载均衡。
  • 接收请求的 node 返回 document 给 coordinate node
  • coordinate node 返回 document 给客户端。

2.2.3. 写数据底层原理

es-write-detail

先写入内存 buffer,在 buffer 里的时候数据是搜索不到的;同时将数据写入 translog 日志文件。

如果 buffer 快满了,或者到一定时间,就会将内存 buffer 数据 refresh 到一个新的 segment file 中,但是此时数据不是直接进入 segment file 磁盘文件,而是先进入 os cache 。这个过程就是 refresh

每隔 1 秒钟,es 将 buffer 中的数据写入一个新的 segment file,每秒钟会产生一个新的磁盘文件 segment file,这个 segment file 中就存储最近 1 秒内 buffer 中写入的数据。

但是如果 buffer 里面此时没有数据,那当然不会执行 refresh 操作,如果 buffer 里面有数据,默认 1 秒钟执行一次 refresh 操作,刷入一个新的 segment file 中。

操作系统里面,磁盘文件其实都有一个东西,叫做 os cache,即操作系统缓存,就是说数据写入磁盘文件之前,会先进入 os cache,先进入操作系统级别的一个内存缓存中去。只要 buffer 中的数据被 refresh 操作刷入 os cache中,这个数据就可以被搜索到了。

为什么叫 es 是准实时的? NRT,全称 near real-time。默认是每隔 1 秒 refresh 一次的,所以 es 是准实时的,因为写入的数据 1 秒之后才能被看到。可以通过 es 的 restful api 或者 java api手动执行一次 refresh 操作,就是手动将 buffer 中的数据刷入 os cache中,让数据立马就可以被搜索到。只要数据被输入 os cache 中,buffer 就会被清空了,因为不需要保留 buffer 了,数据在 translog 里面已经持久化到磁盘去一份了。

重复上面的步骤,新的数据不断进入 buffer 和 translog,不断将 buffer 数据写入一个又一个新的 segment file 中去,每次 refresh 完 buffer 清空,translog 保留。随着这个过程推进,translog 会变得越来越大。当 translog 达到一定长度的时候,就会触发 commit 操作。

commit 操作发生第一步,就是将 buffer 中现有数据 refreshos cache 中去,清空 buffer。然后,将一个 commit point 写入磁盘文件,里面标识着这个 commit point 对应的所有 segment file,同时强行将 os cache 中目前所有的数据都 fsync 到磁盘文件中去。最后清空 现有 translog 日志文件,重启一个 translog,此时 commit 操作完成。

这个 commit 操作叫做 flush。默认 30 分钟自动执行一次 flush,但如果 translog 过大,也会触发 flush。flush 操作就对应着 commit 的全过程,我们可以通过 es api,手动执行 flush 操作,手动将 os cache 中的数据 fsync 强刷到磁盘上去。

translog 日志文件的作用是什么?你执行 commit 操作之前,数据要么是停留在 buffer 中,要么是停留在 os cache 中,无论是 buffer 还是 os cache 都是内存,一旦这台机器死了,内存中的数据就全丢了。所以需要将数据对应的操作写入一个专门的日志文件 translog 中,一旦此时机器宕机,再次重启的时候,es 会自动读取 translog 日志文件中的数据,恢复到内存 buffer 和 os cache 中去。

translog 其实也是先写入 os cache 的,默认每隔 5 秒刷一次到磁盘中去,所以默认情况下,可能有 5 秒的数据会仅仅停留在 buffer 或者 translog 文件的 os cache 中,如果此时机器挂了,会丢失 5 秒钟的数据。但是这样性能比较好,最多丢 5 秒的数据。也可以将 translog 设置成每次写操作必须是直接 fsync 到磁盘,但是性能会差很多。

实际上你在这里,如果面试官没有问你 es 丢数据的问题,你可以在这里给面试官炫一把,你说,其实 es 第一是准实时的,数据写入 1 秒后可以搜索到;可能会丢失数据的。有 5 秒的数据,停留在 buffer、translog os cache、segment file os cache 中,而不在磁盘上,此时如果宕机,会导致 5 秒的数据丢失

总结一下,数据先写入内存 buffer,然后每隔 1s,将数据 refresh 到 os cache,到了 os cache 数据就能被搜索到(所以我们才说 es 从写入到能被搜索到,中间有 1s 的延迟)。每隔 5s,将数据写入 translog 文件(这样如果机器宕机,内存数据全没,最多会有 5s 的数据丢失),translog 大到一定程度,或者默认每隔 30mins,会触发 commit 操作,将缓冲区的数据都 flush 到 segment file 磁盘文件中。

数据写入 segment file 之后,同时就建立好了倒排索引。

2.2.4. 删除/更新数据底层原理

如果是删除操作,commit 的时候会生成一个 .del 文件,里面将某个 doc 标识为 deleted 状态,那么搜索的时候根据 .del 文件就知道这个 doc 是否被删除了。

如果是更新操作,就是将原来的 doc 标识为 deleted 状态,然后新写入一条数据。

buffer 每 refresh 一次,就会产生一个 segment file,所以默认情况下是 1 秒钟一个 segment file,这样下来 segment file 会越来越多,此时会定期执行 merge。每次 merge 的时候,会将多个 segment file 合并成一个,同时这里会将标识为 deleted 的 doc 给物理删除掉,然后将新的 segment file 写入磁盘,这里会写一个 commit point,标识所有新的 segment file,然后打开 segment file 供搜索使用,同时删除旧的 segment file

2.2.5. 底层 lucene

简单来说,lucene 就是一个 jar 包,里面包含了封装好的各种建立倒排索引的算法代码。我们用 Java 开发的时候,引入 lucene jar,然后基于 lucene 的 api 去开发就可以了。

通过 lucene,我们可以将已有的数据建立索引,lucene 会在本地磁盘上面,给我们组织索引的数据结构。

2.2.6. 倒排索引

在搜索引擎中,每个文档都有一个对应的文档 ID,文档内容被表示为一系列关键词的集合。例如,文档 1 经过分词,提取了 20 个关键词,每个关键词都会记录它在文档中出现的次数和出现位置。

那么,倒排索引就是关键词到文档 ID 的映射,每个关键词都对应着一系列的文件,这些文件中都出现了关键词。

举个栗子。

有以下文档:

DocId Doc
1 谷歌地图之父跳槽 Facebook
2 谷歌地图之父加盟 Facebook
3 谷歌地图创始人拉斯离开谷歌加盟 Facebook
4 谷歌地图之父跳槽 Facebook 与 Wave 项目取消有关
5 谷歌地图之父拉斯加盟社交网站 Facebook

对文档进行分词之后,得到以下倒排索引

WordId Word DocIds
1 谷歌 1,2,3,4,5
2 地图 1,2,3,4,5
3 之父 1,2,4,5
4 跳槽 1,4
5 Facebook 1,2,3,4,5
6 加盟 2,3,5
7 创始人 3
8 拉斯 3,5
9 离开 3
10 4
.. .. ..

另外,实用的倒排索引还可以记录更多的信息,比如文档频率信息,表示在文档集合中有多少个文档包含某个单词。

那么,有了倒排索引,搜索引擎可以很方便地响应用户的查询。比如用户输入查询 Facebook,搜索系统查找倒排索引,从中读出包含这个单词的文档,这些文档就是提供给用户的搜索结果。

要注意倒排索引的两个重要细节:

  • 倒排索引中的所有词项对应一个或多个文档;
  • 倒排索引中的词项根据字典顺序升序排列

上面只是一个简单的栗子,并没有严格按照字典顺序升序排列。

Logstash

Logstash 是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。

Logstash 简介

Logstash 可以传输和处理你的日志、事务或其他数据。

Logstash 是 Elasticsearch 的最佳数据管道。

Logstash 是插件式管理模式,在输入、过滤、输出以及编码过程中都可以使用插件进行定制。Logstash 社区有超过 200 种可用插件。

Logstash 原理

Logstash 有两个必要元素:inputoutput ,一个可选元素:filter

这三个元素,分别代表 Logstash 事件处理的三个阶段:输入 > 过滤器 > 输出。

img

  • input - 负责从数据源采集数据。
  • filter - 将数据修改为你指定的格式或内容。
  • output - 将数据传输到目的地。

在实际应用场景中,通常输入、输出、过滤器不止一个。Logstash 的这三个元素都使用插件式管理方式,用户可以根据应用需要,灵活的选用各阶段需要的插件,并组合使用。

Beats

Beats 是安装在服务器上的数据中转代理

Beats 可以将数据直接传输到 Elasticsearch 或传输到 Logstash 。

img

Beats 有多种类型,可以根据实际应用需要选择合适的类型。

常用的类型有:

  • Packetbeat:网络数据包分析器,提供有关您的应用程序服务器之间交换的事务的信息。
  • Filebeat:从您的服务器发送日志文件。
  • Metricbeat:是一个服务器监视代理程序,它定期从服务器上运行的操作系统和服务收集指标。
  • Winlogbeat:提供 Windows 事件日志。

Filebeat 简介

_由于本人仅接触过 Filebeat,所以本文只介绍 Beats 组件中的 Filebeat_。

相比 Logstash,FileBeat 更加轻量化。

在任何环境下,应用程序都有停机的可能性。 Filebeat 读取并转发日志行,如果中断,则会记住所有事件恢复联机状态时所在位置。

Filebeat 带有内部模块(auditd,Apache,Nginx,System 和 MySQL),可通过一个指定命令来简化通用日志格式的收集,解析和可视化。

FileBeat 不会让你的管道超负荷。FileBeat 如果是向 Logstash 传输数据,当 Logstash 忙于处理数据,会通知 FileBeat 放慢读取速度。一旦拥塞得到解决,FileBeat 将恢复到原来的速度并继续传播。

img

Filebeat 原理

Filebeat 有两个主要组件:

  • harvester:负责读取一个文件的内容。它会逐行读取文件内容,并将内容发送到输出目的地。
  • prospector:负责管理 harvester 并找到所有需要读取的文件源。比如类型是日志,prospector 就会遍历制定路径下的所有匹配要求的文件。
1
2
3
4
5
filebeat.prospectors:
- type: log
paths:
- /var/log/*.log
- /var/path2/*.log

Filebeat 保持每个文件的状态,并经常刷新注册表文件中的磁盘状态。状态用于记住 harvester 正在读取的最后偏移量,并确保发送所有日志行。

Filebeat 将每个事件的传递状态存储在注册表文件中。所以它能保证事件至少传递一次到配置的输出,没有数据丢失。

参考资料