Dunwu Blog

大道至简,知易行难

Java I/O 之 BIO

BIO

BIO(blocking IO) 即阻塞 IO。指的主要是传统的 java.io 包,它基于流模型实现。流从概念上来说是一个连续的数据流。当程序需要读数据的时候就需要使用输入流读取数据,当需要往外写数据的时候就需要输出流。

java.io 包提供了我们最熟知的一些 IO 功能,比如 File 抽象、输入输出流等。交互方式是同步、阻塞的方式,也就是说,在读取输入流或者写入输出流时,在读、写动作完成之前,线程会一直阻塞在那里,它们之间的调用是可靠的线性顺序。很多时候,人们也把 java.net 下面提供的部分网络 API,比如 SocketServerSocketHttpURLConnection 也归类到同步阻塞 IO 类库,因为网络通信同样是 IO 行为。

BIO 中操作的流主要有两大类,字节流和字符流,两类根据流的方向都可以分为输入流和输出流。

  • 字节流
    • 输入字节流:InputStream
    • 输出字节流:OutputStream
  • 字符流
    • 输入字符流:Reader
    • 输出字符流:Writer

img

字节流

字节流主要操作字节数据或二进制对象。

字节流有两个核心抽象类:InputStreamOutputStream。所有的字节流类都继承自这两个抽象类。

img

InputStream

InputStream用于从源头(通常是文件)读取数据(字节信息)到内存中,java.io.InputStream抽象类是所有字节输入流的父类。

InputStream 常用方法:

  • read():返回输入流中下一个字节的数据。返回的值介于 0 到 255 之间。如果未读取任何字节,则代码返回 -1 ,表示文件结束。
  • read(byte b[ ]) : 从输入流中读取一些字节存储到数组 b 中。如果数组 b 的长度为零,则不读取。如果没有可用字节读取,返回 -1。如果有可用字节读取,则最多读取的字节数最多等于 b.length , 返回读取的字节数。这个方法等价于 read(b, 0, b.length)
  • read(byte b[], int off, int len):在read(byte b[ ]) 方法的基础上增加了 off 参数(偏移量)和 len 参数(要读取的最大字节数)。
  • skip(long n):忽略输入流中的 n 个字节 , 返回实际忽略的字节数。
  • available():返回输入流中可以读取的字节数。
  • close():关闭输入流释放相关的系统资源。

从 Java 9 开始,InputStream 新增加了多个实用的方法:

  • readAllBytes():读取输入流中的所有字节,返回字节数组。
  • readNBytes(byte[] b, int off, int len):阻塞直到读取 len 个字节。
  • transferTo(OutputStream out):将所有字节从一个输入流传递到一个输出流。

OutputStream

OutputStream 用于将数据(字节信息)写入到目的地(通常是文件),java.io.OutputStream抽象类是所有字节输出流的父类。

OutputStream 常用方法:

  • write(int b):将特定字节写入输出流。
  • write(byte b[ ]) : 将数组b 写入到输出流,等价于 write(b, 0, b.length)
  • write(byte[] b, int off, int len) : 在write(byte b[ ]) 方法的基础上增加了 off 参数(偏移量)和 len 参数(要读取的最大字节数)。
  • flush():刷新此输出流并强制写出所有缓冲的输出字节。
  • close():关闭输出流释放相关的系统资源。

文件字节流

FileOutputStreamFileInputStream 提供了读写字节到文件的能力。

文件流操作一般步骤:

  1. 使用 File 类绑定一个文件。
  2. File 对象绑定到流对象上。
  3. 进行读或写操作。
  4. 关闭流

FileOutputStreamFileInputStream 示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class FileStreamDemo {

private static final String FILEPATH = "temp.log";

public static void main(String[] args) throws Exception {
write(FILEPATH);
read(FILEPATH);
}

public static void write(String filepath) throws IOException {
// 第 1 步、使用 File 类找到一个文件
File f = new File(filepath);

// 第 2 步、通过子类实例化父类对象
OutputStream out = new FileOutputStream(f);
// 实例化时,默认为覆盖原文件内容方式;如果添加 true 参数,则变为对原文件追加内容的方式。
// OutputStream out = new FileOutputStream(f, true);

// 第 3 步、进行写操作
String str = "Hello World\n";
byte[] bytes = str.getBytes();
out.write(bytes);

// 第 4 步、关闭输出流
out.close();
}

public static void read(String filepath) throws IOException {
// 第 1 步、使用 File 类找到一个文件
File f = new File(filepath);

// 第 2 步、通过子类实例化父类对象
InputStream input = new FileInputStream(f);

// 第 3 步、进行读操作
// 有三种读取方式,体会其差异
byte[] bytes = new byte[(int) f.length()];
int len = input.read(bytes); // 读取内容
System.out.println("读入数据的长度:" + len);

// 第 4 步、关闭输入流
input.close();
System.out.println("内容为:\n" + new String(bytes));
}

}

内存字节流

ByteArrayInputStreamByteArrayOutputStream 是用来完成内存的输入和输出功能。

内存操作流一般在生成一些临时信息时才使用。 如果临时信息保存在文件中,还需要在有效期过后删除文件,这样比较麻烦。

ByteArrayInputStreamByteArrayOutputStream 示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ByteArrayStreamDemo {

public static void main(String[] args) {
String str = "HELLOWORLD"; // 定义一个字符串,全部由大写字母组成
ByteArrayInputStream bis = new ByteArrayInputStream(str.getBytes());
ByteArrayOutputStream bos = new ByteArrayOutputStream();
// 准备从内存 ByteArrayInputStream 中读取内容
int temp = 0;
while ((temp = bis.read()) != -1) {
char c = (char) temp; // 读取的数字变为字符
bos.write(Character.toLowerCase(c)); // 将字符变为小写
}
// 所有的数据就全部都在 ByteArrayOutputStream 中
String newStr = bos.toString(); // 取出内容
try {
bis.close();
bos.close();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(newStr);
}

}

管道流

管道流的主要作用是可以进行两个线程间的通信。

如果要进行管道通信,则必须把 PipedOutputStream 连接在 PipedInputStream 上。为此,PipedOutputStream 中提供了 connect() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class PipedStreamDemo {

public static void main(String[] args) {
Send s = new Send();
Receive r = new Receive();
try {
s.getPos().connect(r.getPis()); // 连接管道
} catch (IOException e) {
e.printStackTrace();
}
new Thread(s).start(); // 启动线程
new Thread(r).start(); // 启动线程
}

static class Send implements Runnable {

private PipedOutputStream pos = null;

Send() {
pos = new PipedOutputStream(); // 实例化输出流
}

@Override
public void run() {
String str = "Hello World!!!";
try {
pos.write(str.getBytes());
} catch (IOException e) {
e.printStackTrace();
}
try {
pos.close();
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 得到此线程的管道输出流
*/
PipedOutputStream getPos() {
return pos;
}

}

static class Receive implements Runnable {

private PipedInputStream pis = null;

Receive() {
pis = new PipedInputStream();
}

@Override
public void run() {
byte[] b = new byte[1024];
int len = 0;
try {
len = pis.read(b);
} catch (IOException e) {
e.printStackTrace();
}
try {
pis.close();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("接收的内容为:" + new String(b, 0, len));
}

/**
* 得到此线程的管道输入流
*/
PipedInputStream getPis() {
return pis;
}

}

}

对象字节流

ObjectInputStream 和 ObjectOutputStream 是对象输入输出流,一般用于对象序列化。

这里不展开叙述,想了解详细内容和示例可以参考:[Java 序列化](03.Java 序列化。md)

数据操作流

数据操作流提供了格式化读入和输出数据的方法,分别为 DataInputStreamDataOutputStream

DataInputStreamDataOutputStream 格式化读写数据示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class DataStreamDemo {

public static final String FILEPATH = "temp.log";

public static void main(String[] args) throws IOException {
write(FILEPATH);
read(FILEPATH);
}

private static void write(String filepath) throws IOException {
// 1. 使用 File 类绑定一个文件
File f = new File(filepath);

// 2. 把 File 对象绑定到流对象上
DataOutputStream dos = new DataOutputStream(new FileOutputStream(f));

// 3. 进行读或写操作
String[] names = { "衬衣", "手套", "围巾" };
float[] prices = { 98.3f, 30.3f, 50.5f };
int[] nums = { 3, 2, 1 };
for (int i = 0; i < names.length; i++) {
dos.writeChars(names[i]);
dos.writeChar('\t');
dos.writeFloat(prices[i]);
dos.writeChar('\t');
dos.writeInt(nums[i]);
dos.writeChar('\n');
}

// 4. 关闭流
dos.close();
}

private static void read(String filepath) throws IOException {
// 1. 使用 File 类绑定一个文件
File f = new File(filepath);

// 2. 把 File 对象绑定到流对象上
DataInputStream dis = new DataInputStream(new FileInputStream(f));

// 3. 进行读或写操作
String name = null; // 接收名称
float price = 0.0f; // 接收价格
int num = 0; // 接收数量
char[] temp = null; // 接收商品名称
int len = 0; // 保存读取数据的个数
char c = 0; // '\u0000'
try {
while (true) {
temp = new char[200]; // 开辟空间
len = 0;
while ((c = dis.readChar()) != '\t') { // 接收内容
temp[len] = c;
len++; // 读取长度加 1
}
name = new String(temp, 0, len); // 将字符数组变为 String
price = dis.readFloat(); // 读取价格
dis.readChar(); // 读取、t
num = dis.readInt(); // 读取 int
dis.readChar(); // 读取、n
System.out.printf("名称:%s;价格:%5.2f;数量:%d\n", name, price, num);
}
} catch (EOFException e) {
System.out.println("结束");
} catch (IOException e) {
e.printStackTrace();
}

// 4. 关闭流
dis.close();
}

}

合并流

合并流的主要功能是将多个 InputStream 合并为一个 InputStream 流。合并流的功能由 SequenceInputStream 完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class SequenceInputStreamDemo {

public static void main(String[] args) throws Exception {

InputStream is1 = new FileInputStream("temp1.log");
InputStream is2 = new FileInputStream("temp2.log");
SequenceInputStream sis = new SequenceInputStream(is1, is2);

int temp = 0; // 接收内容
OutputStream os = new FileOutputStream("temp3.logt");
while ((temp = sis.read()) != -1) { // 循环输出
os.write(temp); // 保存内容
}

sis.close(); // 关闭合并流
is1.close(); // 关闭输入流 1
is2.close(); // 关闭输入流 2
os.close(); // 关闭输出流
}

}

字符流

字符流主要操作字符,一般用于处理文本数据。

字符流有两个核心类:Reader 类和 Writer 。所有的字符流类都继承自这两个抽象类。

文件字符流

文件字符流 FileReaderFileWriter 可以向文件读写文本数据。

FileReaderFileWriter 读写文件示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class FileReadWriteDemo {

private static final String FILEPATH = "temp.log";

public static void main(String[] args) throws IOException {
write(FILEPATH);
System.out.println("内容为:" + new String(read(FILEPATH)));
}

public static void write(String filepath) throws IOException {
// 1. 使用 File 类绑定一个文件
File f = new File(filepath);

// 2. 把 File 对象绑定到流对象上
Writer out = new FileWriter(f);
// Writer out = new FileWriter(f, true); // 追加内容方式

// 3. 进行读或写操作
String str = "Hello World!!!\r\n";
out.write(str);

// 4. 关闭流
// 字符流操作时使用了缓冲区,并在关闭字符流时会强制将缓冲区内容输出
// 如果不关闭流,则缓冲区的内容是无法输出的
// 如果想在不关闭流时,将缓冲区内容输出,可以使用 flush 强制清空缓冲区
out.flush();
out.close();
}

public static char[] read(String filepath) throws IOException {
// 1. 使用 File 类绑定一个文件
File f = new File(filepath);

// 2. 把 File 对象绑定到流对象上
Reader input = new FileReader(f);

// 3. 进行读或写操作
int temp = 0; // 接收每一个内容
int len = 0; // 读取内容
char[] c = new char[1024];
while ((temp = input.read()) != -1) {
// 如果不是-1 就表示还有内容,可以继续读取
c[len] = (char) temp;
len++;
}
System.out.println("文件字符数为:" + len);

// 4. 关闭流
input.close();

return c;
}

}

字节流转换字符流

我们可以在程序中通过 InputStreamReader 从数据源中读取数据,然后也可以在程序中将数据通过 OutputStreamWriter 输出到目标媒介中

使用 InputStreamReader 可以将输入字节流转化为输入字符流;使用OutputStreamWriter可以将输出字节流转化为输出字符流。

OutputStreamWriter 示例:

1
2
3
4
5
6
7
8
9
10
public class OutputStreamWriterDemo {

public static void main(String[] args) throws IOException {
File f = new File("temp.log");
Writer out = new OutputStreamWriter(new FileOutputStream(f));
out.write("hello world!!");
out.close();
}

}

InputStreamReader 示例:

1
2
3
4
5
6
7
8
9
10
11
12
public class InputStreamReaderDemo {

public static void main(String[] args) throws IOException {
File f = new File("temp.log");
Reader reader = new InputStreamReader(new FileInputStream(f));
char[] c = new char[1024];
int len = reader.read(c);
reader.close();
System.out.println(new String(c, 0, len));
}

}

字节流 vs. 字符流

相同点:

字节流和字符流都有 read()write()flush()close() 这样的方法,这决定了它们的操作方式近似。

不同点:

  • 数据类型
    • 字节流的数据是字节(二进制对象)。主要核心类是 InputStream 类和 OutputStream 类。
    • 字符流的数据是字符。主要核心类是 Reader 类和 Writer 类。
  • 缓冲区
    • 字节流在操作时本身不会用到缓冲区(内存),是文件直接操作的。
    • 字符流在操作时是使用了缓冲区,通过缓冲区再操作文件。

选择:

所有的文件在硬盘或传输时都是以字节方式保存的,例如图片,影音文件等都是按字节方式存储的。字符流无法读写这些文件。

所以,除了纯文本数据文件使用字符流以外,其他文件类型都应该使用字节流方式。

I/O 工具类

File

File 类是 java.io 包中唯一对文件本身进行操作的类。它可以对文件、目录进行增删查操作。

createNewFille

可以使用 createNewFille() 方法创建一个新文件

注:

Windows 中使用反斜杠表示目录的分隔符 \。~~~~~~~~

Linux 中使用正斜杠表示目录的分隔符 /

最好的做法是使用 File.separator 静态常量,可以根据所在操作系统选取对应的分隔符。

【示例】创建文件

1
2
File f = new File(filename);
boolean flag = f.createNewFile();

mkdir

可以使用 mkdir() 来创建文件夹,但是如果要创建的目录的父路径不存在,则无法创建成功。

如果要解决这个问题,可以使用 mkdirs(),当父路径不存在时,会连同上级目录都一并创建。

【示例】创建目录

1
2
File f = new File(filename);
boolean flag = f.mkdir();

delete

可以使用 delete() 来删除文件或目录

需要注意的是,如果删除的是目录,且目录不为空,直接用 delete() 删除会失败。

【示例】删除文件或目录

1
2
File f = new File(filename);
boolean flag = f.delete();

list 和 listFiles

File 中给出了两种列出文件夹内容的方法:

  • list(): 列出全部名称,返回一个字符串数组
  • listFiles(): 列出完整的路径,返回一个 File 对象数组

list() 示例:

1
2
File f = new File(filename);
String str[] = f.list();

listFiles() 示例:

1
2
File f = new File(filename);
File files[] = f.listFiles();

RandomAccessFile

注:RandomAccessFile 类虽然可以实现对文件内容的读写操作,但是比较复杂。所以一般操作文件内容往往会使用字节流或字符流方式。

RandomAccessFile 类是随机读取类,它是一个完全独立的类。

它适用于由大小已知的记录组成的文件,所以我们可以使用 seek() 将记录从一处转移到另一处,然后读取或者修改记录。

文件中记录的大小不一定都相同,只要能够确定哪些记录有多大以及它们在文件中的位置即可。

RandomAccessFile 写操作

当用 rw 方式声明 RandomAccessFile 对象时,如果要写入的文件不存在,系统将自行创建。

r 为只读;w 为只写;rw 为读写。

【示例】文件随机读写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class RandomAccessFileDemo01 {

public static void main(String args[]) throws IOException {
File f = new File("d:" + File.separator + "test.txt"); // 指定要操作的文件
RandomAccessFile rdf = null; // 声明 RandomAccessFile 类的对象
rdf = new RandomAccessFile(f, "rw");// 读写模式,如果文件不存在,会自动创建
String name = null;
int age = 0;
name = "zhangsan"; // 字符串长度为 8
age = 30; // 数字的长度为 4
rdf.writeBytes(name); // 将姓名写入文件之中
rdf.writeInt(age); // 将年龄写入文件之中
name = "lisi "; // 字符串长度为 8
age = 31; // 数字的长度为 4
rdf.writeBytes(name); // 将姓名写入文件之中
rdf.writeInt(age); // 将年龄写入文件之中
name = "wangwu "; // 字符串长度为 8
age = 32; // 数字的长度为 4
rdf.writeBytes(name); // 将姓名写入文件之中
rdf.writeInt(age); // 将年龄写入文件之中
rdf.close(); // 关闭
}
}

RandomAccessFile 读操作

读取是直接使用 r 的模式即可,以只读的方式打开文件。

读取时所有的字符串只能按照 byte 数组方式读取出来,而且长度必须和写入时的固定大小相匹配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class RandomAccessFileDemo02 {

public static void main(String args[]) throws IOException {
File f = new File("d:" + File.separator + "test.txt"); // 指定要操作的文件
RandomAccessFile rdf = null; // 声明 RandomAccessFile 类的对象
rdf = new RandomAccessFile(f, "r");// 以只读的方式打开文件
String name = null;
int age = 0;
byte b[] = new byte[8]; // 开辟 byte 数组
// 读取第二个人的信息,意味着要空出第一个人的信息
rdf.skipBytes(12); // 跳过第一个人的信息
for (int i = 0; i < b.length; i++) {
b[i] = rdf.readByte(); // 读取一个字节
}
name = new String(b); // 将读取出来的 byte 数组变为字符串
age = rdf.readInt(); // 读取数字
System.out.println("第二个人的信息 --> 姓名:" + name + ";年龄:" + age);
// 读取第一个人的信息
rdf.seek(0); // 指针回到文件的开头
for (int i = 0; i < b.length; i++) {
b[i] = rdf.readByte(); // 读取一个字节
}
name = new String(b); // 将读取出来的 byte 数组变为字符串
age = rdf.readInt(); // 读取数字
System.out.println("第一个人的信息 --> 姓名:" + name + ";年龄:" + age);
rdf.skipBytes(12); // 空出第二个人的信息
for (int i = 0; i < b.length; i++) {
b[i] = rdf.readByte(); // 读取一个字节
}
name = new String(b); // 将读取出来的 byte 数组变为字符串
age = rdf.readInt(); // 读取数字
System.out.println("第三个人的信息 --> 姓名:" + name + ";年龄:" + age);
rdf.close(); // 关闭
}
}

System

System 类中提供了大量的静态方法,可以获取系统相关的信息或系统级操作,其中提供了三个常用于 IO 的静态成员:

  • System.out - 一个 PrintStream 流。System.out 一般会把你写到其中的数据输出到控制台上。System.out 通常仅用在类似命令行工具的控制台程序上。System.out 也经常用于打印程序的调试信息(尽管它可能并不是获取程序调试信息的最佳方式)。
  • System.err - 一个 PrintStream 流。System.err 与 System.out 的运行方式类似,但它更多的是用于打印错误文本。一些类似 Eclipse 的程序,为了让错误信息更加显眼,会将错误信息以红色文本的形式通过 System.err 输出到控制台上。
  • System.in - 一个典型的连接控制台程序和键盘输入的 InputStream 流。通常当数据通过命令行参数或者配置文件传递给命令行 Java 程序的时候,System.in 并不是很常用。图形界面程序通过界面传递参数给程序,这是一块单独的 Java IO 输入机制。

【示例】重定向 System.out 输出流

1
2
3
4
5
6
7
8
9
10
11
12
import java.io.*;
public class SystemOutDemo {

public static void main(String args[]) throws Exception {
OutputStream out = new FileOutputStream("d:\\test.txt");
PrintStream ps = new PrintStream(out);
System.setOut(ps);
System.out.println("人生若只如初见,何事秋风悲画扇");
ps.close();
out.close();
}
}

【示例】重定向 System.err 输出流

1
2
3
4
5
6
7
8
9
10
public class SystemErrDemo {

public static void main(String args[]) throws IOException {
OutputStream bos = new ByteArrayOutputStream(); // 实例化
PrintStream ps = new PrintStream(bos); // 实例化
System.setErr(ps); // 输出重定向
System.err.print("此处有误");
System.out.println(bos); // 输出内存中的数据
}
}

【示例】System.in 接受控制台输入信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.io.*;
public class SystemInDemo {

public static void main(String args[]) throws IOException {
InputStream input = System.in;
StringBuffer buf = new StringBuffer();
System.out.print("请输入内容:");
int temp = 0;
while ((temp = input.read()) != -1) {
char c = (char) temp;
if (c == '\n') {
break;
}
buf.append(c);
}
System.out.println("输入的内容为:" + buf);
input.close();
}
}

Scanner

Scanner 可以获取用户的输入,并对数据进行校验

【示例】校验输入数据是否格式正确

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.io.*;
public class ScannerDemo {

public static void main(String args[]) {
Scanner scan = new Scanner(System.in); // 从键盘接收数据
int i = 0;
float f = 0.0f;
System.out.print("输入整数:");
if (scan.hasNextInt()) { // 判断输入的是否是整数
i = scan.nextInt(); // 接收整数
System.out.println("整数数据:" + i);
} else {
System.out.println("输入的不是整数!");
}

System.out.print("输入小数:");
if (scan.hasNextFloat()) { // 判断输入的是否是小数
f = scan.nextFloat(); // 接收小数
System.out.println("小数数据:" + f);
} else {
System.out.println("输入的不是小数!");
}

Date date = null;
String str = null;
System.out.print("输入日期(yyyy-MM-dd):");
if (scan.hasNext("^\\d{4}-\\d{2}-\\d{2}$")) { // 判断
str = scan.next("^\\d{4}-\\d{2}-\\d{2}$"); // 接收
try {
date = new SimpleDateFormat("yyyy-MM-dd").parse(str);
} catch (Exception e) {}
} else {
System.out.println("输入的日期格式错误!");
}
System.out.println(date);
}
}

输出:

1
2
3
4
5
6
7
输入整数:20
整数数据:20
输入小数:3.2
小数数据:3.2
输入日期(yyyy-MM-dd):1988-13-1
输入的日期格式错误!
null

网络编程

关键词:SocketServerSocketDatagramPacketDatagramSocket

网络编程是指编写运行在多个设备(计算机)的程序,这些设备都通过网络连接起来。

java.net 包中提供了低层次的网络通信细节。你可以直接使用这些类和接口,来专注于解决问题,而不用关注通信细节。

java.net 包中提供了两种常见的网络协议的支持:

  • TCP - TCP 是传输控制协议的缩写,它保障了两个应用程序之间的可靠通信。通常用于互联网协议,被称 TCP/ IP。
  • UDP - UDP 是用户数据报协议的缩写,一个无连接的协议。提供了应用程序之间要发送的数据的数据包。

Socket 和 ServerSocket

套接字(Socket)使用 TCP 提供了两台计算机之间的通信机制。 客户端程序创建一个套接字,并尝试连接服务器的套接字。

Java 通过 Socket 和 ServerSocket 实现对 TCP 的支持。Java 中的 Socket 通信可以简单理解为:**java.net.Socket 代表客户端,java.net.ServerSocket 代表服务端**,二者可以建立连接,然后通信。

以下为 Socket 通信中建立建立的基本流程:

  • 服务器实例化一个 ServerSocket 对象,表示服务器绑定一个端口。
  • 服务器调用 ServerSocketaccept() 方法,该方法将一直等待,直到客户端连接到服务器的绑定端口(即监听端口)。
  • 服务器监听端口时,客户端实例化一个 Socket 对象,指定服务器名称和端口号来请求连接。
  • Socket 类的构造函数试图将客户端连接到指定的服务器和端口号。如果通信被建立,则在客户端创建一个 Socket 对象能够与服务器进行通信。
  • 在服务器端,accept() 方法返回服务器上一个新的 Socket 引用,该引用连接到客户端的 Socket

连接建立后,可以通过使用 IO 流进行通信。每一个 Socket 都有一个输出流和一个输入流。客户端的输出流连接到服务器端的输入流,而客户端的输入流连接到服务器端的输出流。

TCP 是一个双向的通信协议,因此数据可以通过两个数据流在同一时间发送,以下是一些类提供的一套完整的有用的方法来实现 sockets。

ServerSocket

服务器程序通过使用 java.net.ServerSocket 类以获取一个端口,并且监听客户端请求连接此端口的请求。

ServerSocket 构造方法

ServerSocket 有多个构造方法:

方法 描述
ServerSocket() 创建非绑定服务器套接字。
ServerSocket(int port) 创建绑定到特定端口的服务器套接字。
ServerSocket(int port, int backlog) 利用指定的 backlog 创建服务器套接字并将其绑定到指定的本地端口号。
ServerSocket(int port, int backlog, InetAddress address) 使用指定的端口、监听 backlog 和要绑定到的本地 IP 地址创建服务器。
ServerSocket 常用方法

创建非绑定服务器套接字。 如果 ServerSocket 构造方法没有抛出异常,就意味着你的应用程序已经成功绑定到指定的端口,并且侦听客户端请求。

这里有一些 ServerSocket 类的常用方法:

方法 描述
int getLocalPort() 返回此套接字在其上侦听的端口。
Socket accept() 监听并接受到此套接字的连接。
void setSoTimeout(int timeout) 通过指定超时值启用/禁用 SO_TIMEOUT,以毫秒为单位。
void bind(SocketAddress host, int backlog) ServerSocket 绑定到特定地址(IP 地址和端口号)。

Socket

java.net.Socket 类代表客户端和服务器都用来互相沟通的套接字。客户端要获取一个 Socket 对象通过实例化 ,而 服务器获得一个 Socket 对象则通过 accept() 方法 a 的返回值。

Socket 构造方法

Socket 类有 5 个构造方法:

方法 描述
Socket() 通过系统默认类型的 SocketImpl 创建未连接套接字
Socket(String host, int port) 创建一个流套接字并将其连接到指定主机上的指定端口号。
Socket(InetAddress host, int port) 创建一个流套接字并将其连接到指定 IP 地址的指定端口号。
Socket(String host, int port, InetAddress localAddress, int localPort) 创建一个套接字并将其连接到指定远程主机上的指定远程端口。
Socket(InetAddress host, int port, InetAddress localAddress, int localPort) 创建一个套接字并将其连接到指定远程地址上的指定远程端口。

当 Socket 构造方法返回,并没有简单的实例化了一个 Socket 对象,它实际上会尝试连接到指定的服务器和端口。

Socket 常用方法

下面列出了一些感兴趣的方法,注意客户端和服务器端都有一个 Socket 对象,所以无论客户端还是服务端都能够调用这些方法。

方法 描述
void connect(SocketAddress host, int timeout) 将此套接字连接到服务器,并指定一个超时值。
InetAddress getInetAddress() 返回套接字连接的地址。
int getPort() 返回此套接字连接到的远程端口。
int getLocalPort() 返回此套接字绑定到的本地端口。
SocketAddress getRemoteSocketAddress() 返回此套接字连接的端点的地址,如果未连接则返回 null。
InputStream getInputStream() 返回此套接字的输入流。
OutputStream getOutputStream() 返回此套接字的输出流。
void close() 关闭此套接字。

Socket 通信示例

服务端示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class HelloServer {

public static void main(String[] args) throws Exception {
// Socket 服务端
// 服务器在 8888 端口上监听
ServerSocket server = new ServerSocket(8888);
System.out.println("服务器运行中,等待客户端连接。");
// 得到连接,程序进入到阻塞状态
Socket client = server.accept();
// 打印流输出最方便
PrintStream out = new PrintStream(client.getOutputStream());
// 向客户端输出信息
out.println("hello world");
client.close();
server.close();
System.out.println("服务器已向客户端发送消息,退出。");
}

}

客户端示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class HelloClient {

public static void main(String[] args) throws Exception {
// Socket 客户端
Socket client = new Socket("localhost", 8888);
InputStreamReader inputStreamReader = new InputStreamReader(client.getInputStream());
// 一次性接收完成
BufferedReader buf = new BufferedReader(inputStreamReader);
String str = buf.readLine();
buf.close();
client.close();
System.out.println("客户端接收到服务器消息:" + str + ",退出");
}

}

DatagramSocket 和 DatagramPacket

Java 通过 DatagramSocketDatagramPacket 实现对 UDP 协议的支持。

  • DatagramPacket:数据包类
  • DatagramSocket:通信类

UDP 服务端示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class UDPServer {

public static void main(String[] args) throws Exception { // 所有异常抛出
String str = "hello World!!!";
DatagramSocket ds = new DatagramSocket(3000); // 服务端在 3000 端口上等待服务器发送信息
DatagramPacket dp =
new DatagramPacket(str.getBytes(), str.length(), InetAddress.getByName("localhost"), 9000); // 所有的信息使用 buf 保存
System.out.println("发送信息。");
ds.send(dp); // 发送信息出去
ds.close();
}

}

UDP 客户端示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class UDPClient {

public static void main(String[] args) throws Exception { // 所有异常抛出
byte[] buf = new byte[1024]; // 开辟空间,以接收数据
DatagramSocket ds = new DatagramSocket(9000); // 客户端在 9000 端口上等待服务器发送信息
DatagramPacket dp = new DatagramPacket(buf, 1024); // 所有的信息使用 buf 保存
ds.receive(dp); // 接收数据
String str = new String(dp.getData(), 0, dp.getLength()) + "from " + dp.getAddress().getHostAddress() + ":"
+ dp.getPort();
System.out.println(str); // 输出内容
}

}

InetAddress

InetAddress 类表示互联网协议 (IP) 地址。

没有公有的构造函数,只能通过静态方法来创建实例。

1
2
InetAddress.getByName(String host);
InetAddress.getByAddress(byte[] address);

URL

可以直接从 URL 中读取字节流数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws IOException {

URL url = new URL("http://www.baidu.com");

/* 字节流 */
InputStream is = url.openStream();

/* 字符流 */
InputStreamReader isr = new InputStreamReader(is, "utf-8");

/* 提供缓存功能 */
BufferedReader br = new BufferedReader(isr);

String line;
while ((line = br.readLine()) != null) {
System.out.println(line);
}

br.close();
}

参考资料

Java 虚拟机之内存区域

运行时数据区域

JVM 在执行 Java 程序的过程中会把它所管理的内存划分为若干个不同的数据区域。这些区域都有各自的用途,以及创建和销毁的时间,有的区域随着虚拟机进程的启动而存在,有些区域则依赖用户线程的启动和结束而建立和销毁。如下图所示:

程序计数器

程序计数器(Program Counter Register) 是一块较小的内存空间,它可以看做是当前线程所执行的字节码的行号指示器。字节码解释器工作时就是通过改变这个计数器的值来选取下一条需要执行的字节码指令,它是程序控制流的指示器,分支、循环、跳转、异常处理、线程恢复等基础功能都需要依赖这个计数器来完成。

由于 Java 虚拟机的多线程是通过线程轮流切换、分配处理器执行时间的方式来实现的,在任何一个确定的时刻,一个处理器都只会执行一条线程中的指令。因此,为了线程切换后能恢复到正确的执行位置,每条线程都需要有一个独立的程序计数器,各线程之间计数器互不影响,独立存储,我们称这类内存区域为“线程私有”的内存。

如果线程正在执行的是一个 Java 方法,这个计数器记录的是正在执行的虚拟机字节码指令的地址;如果正在执行的是本地(Native)方法,这个计数器值则应为空(Undefined)。

🔔 注意:程序计数器是 JVM 中没有规定任何 OutOfMemoryError 情况的唯一区域。

Java 虚拟机栈

Java 虚拟机栈(Java Virtual Machine Stacks) 也是线程私有的,它的生命周期与线程相同。Java 虚拟机栈以方法作为最基本的执行单元,描述的是 Java 方法执行的线程内存模型每个方法被执行的时候,JVM 都会同步创建一个栈帧(Stack Frame),栈帧是用于支持虚拟机进行方法调用和方法执行背后的数据结构。栈帧存储了局部变量表、操作数栈、动态连接、方法返回地址等信息。每一个方法从调用开始至执行结束的过程,都对应着一个栈帧在虚拟机栈里面从入栈到出栈的过程。

一个线程中的方法调用链可能会很长,以 Java 程序的角度来看,同一时刻、同一条线程里面,在 调用堆栈的所有方法都同时处于执行状态。而对于执行引擎来讲,在活动线程中,只有位于栈顶的方 法才是在运行的,只有位于栈顶的栈帧才是生效的,其被称为“当前栈帧”(Current Stack Frame),与 这个栈帧所关联的方法被称为“当前方法”(Current Method)。执行引擎所运行的所有字节码指令都只 针对当前栈帧进行操作。

  • 局部变量表 - 用于存放方法参数和方法内部定义的局部变量。
  • 操作数栈 - 主要作为方法调用的中转站使用,用于存放方法执行过程中产生的中间计算结果。另外,计算过程中产生的临时变量也会放在操作数栈中。
  • 动态连接 - 用于一个方法调用其他方法的场景。Class 文件的常量池中有大量的符号引用,字节码中的方法调用指令就以常量池中指向方法的符号引用为参数。这些符号引用一部分会在类加载阶段或第一次使用的时候转化为直接引用,这种转化称为静态解析;另一部分将在每一次的运行期间转化为直接应用,这部分称为动态连接
  • 方法返回地址 - 用于返回方法被调用的位置,恢复上层方法的局部变量和操作数栈。Java 方法有两种返回方式,一种是 return 语句正常返回,一种是抛出异常。无论采用何种退出方式,都会导致栈帧被弹出。也就是说,栈帧随着方法调用而创建,随着方法结束而销毁。无论方法正常完成还是异常完成都算作方法结束。

🔔 注意:

该区域可能抛出以下异常:

  • 如果线程请求的栈深度大于虚拟机所允许的深度,将抛出 StackOverflowError 异常;
  • 如果虚拟机栈进行动态扩展时,无法申请到足够内存,就会抛出 OutOfMemoryError 异常。

💡 提示:

可以通过 -Xss 这个虚拟机参数来指定一个程序的 Java 虚拟机栈内存大小:

1
java -Xss=512M HackTheJava

局部变量表

局部变量表(Local Variables Table)是一组变量值的存储空间,用于存放方法参数和方法内部定义的局部变量。在 Java 程序被编译为 Class 文件时,就在方法的 Code 属性的 max_locals 数据项中确定了该方法所需分配的局部变量表的最大容量。

局部变量表存放了编译期可知的各种 Java 虚拟机基本数据类型、对象引用(reference 类型,它不同于对象本身,可能是一个指向对象起始地址的引用指针,也可能是指向一个代表对象的句柄或其他与此对象相关的位置)和 returnAddress 类型(指向了一条字节码指令的地址)。这些数据类型在局部变量表中的存储空间以局部变量槽(Variable Slot)来表示,其中 64 位长度的 longdouble 类型的数据会占用两个变量槽,其余的数据类型只占用一个。局部变量表所需的内存空间在编译期间完成分配,当进入一个方法时,这个方法需要在栈帧中分配多大的局部变量空间是完全确定的,在方法运行期间不会改变局部变量表的大小。

操作数栈

操作数栈(Operand Stack)也常被称为操作栈,它是一个后入先出(Last In First Out,LIFO) 栈。操作数栈主要作为方法调用的中转站使用,用于存放方法执行过程中产生的中间计算结果。另外,计算过程中产生的临时变量也会放在操作数栈中。

当一个方法刚刚开始执行的时候,这个方法的操作数栈是空的,在方法的执行过程中,会有各种字节码指令往操作数栈中写入和提取内容,也就是出栈和入栈操作。譬如在做算术运算的时候是通过将运算涉及的操作数栈压入栈顶后调用运算指令来进行的,又譬如在调用其他方法的时候是通过操作数栈来进行方法参数的传递。

操作数栈中元素的数据类型必须与字节码指令的序列严格匹配,在编译程序代码的时候,编译器 必须要严格保证这一点,在类校验阶段的数据流分析中还要再次验证这一点。

另外在概念模型中,两个不同栈帧作为不同方法的虚拟机栈的元素,是完全相互独立的。但是在大多虚拟机的实现里都会进行一些优化处理,令两个栈帧出现一部分重叠。让下面栈帧的部分操作数栈与上面栈帧的部分局部变量表重叠在一起,这样做不仅节约了一些空间,更重要的是在进行方法调用时就可以直接共用一部分数据,无须进行额外的参数复制传递了。

动态连接

用于一个方法调用其他方法的场景。Class 文件的常量池中有大量的符号引用,字节码中的方法调用指令就以常量池中指向方法的符号引用为参数。这些符号引用一部分会在类加载阶段或第一次使用的时候转化为直接引用,这种转化称为静态解析;另一部分将在每一次的运行期间转化为直接应用,这部分称为动态连接

每个栈帧都包含一个指向运行时常量池中该栈帧所属方法的引用,持有这个引用是为了支持方 法调用过程中的动态连接(Dynamic Linking)。通过第 6 章的讲解,我们知道 Class 文件的常量池中存 有大量的符号引用,字节码中的方法调用指令就以常量池里指向方法的符号引用作为参数。这些符号 引用一部分会在类加载阶段或者第一次使用的时候就被转化为直接引用,这种转化被称为静态解析。 另外一部分将在每一次运行期间都转化为直接引用,这部分就称为动态连接。关于这两个转化过程的 具体过程,将在 8.3 节中再详细讲解。

方法返回地址

方法返回地址用于返回方法被调用的位置,恢复上层方法的局部变量和操作数栈。

Java 方法有两种返回方式,一种是 return 语句正常返回,这时候可能会有返回值传递给上层的方法调用者,方法是否有返回值以及返回值的类型将根据遇到何种方法返回指令来决定;一种是遇到了异常,并且这个异常没有在方法体内得到妥善处理。无论采用何种退出方式,都会导致栈帧被弹出。也就是说,栈帧随着方法调用而创建,随着方法结束而销毁。无论方法正常完成还是异常完成都算作方法结束。

方法返回时可能需要在栈帧中保存一些信息,用来帮助恢复它的上层主调方法的执行状态。 一般来说,方法正常退出时,主调方法的 PC 计数器的值就可以作为返回地址,栈帧中很可能会保存这个计数器值。而方法异常退出时,返回地址是要通过异常处理器表来确定的,栈帧中就一般不会保存这部分信息。方法退出的过程实际上等同于把当前栈帧出栈,因此退出时可能执行的操作有:恢复上层方法的局部变量表和操作数栈,把返回值(如果有的话)压入调用者栈帧的操作数栈中,调整 PC 计数器的值以指向方法调用指令后面的一条指令等。

本地方法栈

本地方法栈(Native Method Stack) 与虚拟机栈的作用非常相似,二者区别仅在于:虚拟机栈为 Java 方法服务;本地方法栈为 Native 方法服务

🔔 注意:本地方法栈也会在栈深度溢出或者栈扩展失败时分别抛出 StackOverflowErrorOutOfMemoryError 异常。

Java 堆

Java 堆(Java Heap) 的作用就是存放对象实例,几乎所有的对象实例都是在这里分配内存

注:由于即时编译技术的进步,尤其是逃逸分析技术的日渐强大,栈上分配、标量替换优化手段已经导致一些微妙的变化悄然发生,所以说 Java 对象实例都分配在堆上也渐渐变得不是那么绝对了。

Java 堆是垃圾收集器管理的内存区域(因此也被叫做”GC 堆”)。现代的垃圾收集器大部分都是采用分代收集理论设计的,该力量的思想是针对不同的对象采取不同的垃圾回收算法。

在 JDK 7 及之前版本,堆内存被通常分为下面三部分:

  • 新生代(Young Generation)
  • 老年代(Old Generation)
  • 永久代(Permanent Generation)

JDK 8 版本之后 PermGen(永久代) 已被 Metaspace(元空间) 取代,元空间使用的是本地内存

大部分情况,对象都会首先在 Eden 区域分配,在一次新生代垃圾回收后,如果对象还存活,则会进入 S0 或者 S1,并且对象的年龄还会加 1(Eden 区->Survivor 区后对象的初始年龄变为 1),当它的年龄增加到一定程度(默认为 15 岁),就会被晋升到老年代中。对象晋升到老年代的年龄阈值,可以通过参数 -XX:MaxTenuringThreshold 来设置。不过,设置的值应该在 0-15,否则会爆出以下错误:

1
MaxTenuringThreshold of 20 is invalid; must be between 0 and 15

为什么年龄只能是 0-15?

因为记录年龄的区域在对象头中,这个区域的大小通常是 4 位。这 4 位可以表示的最大二进制数字是 1111,即十进制的 15。因此,对象的年龄被限制为 0 到 15。

这里我们简单结合对象布局来详细介绍一下。

在 HotSpot 虚拟机中,对象在内存中存储的布局可以分为 3 块区域:对象头(Header)、实例数据(Instance Data)和对齐填充(Padding)。其中,对象头包括两部分:标记字段(Mark Word)和类型指针(Klass Word)。关于对象内存布局的详细介绍,后文会介绍到,这里就不重复提了。

这个年龄信息就是在标记字段中存放的(标记字段还存放了对象自身的其他信息比如哈希码、锁状态信息等等)。

如果从分配内存的角度看,所有线程共享的 Java 堆中可以划分出多个线程私有的分配缓冲区 (Thread Local Allocation Buffer,TLAB),以提升对象分配效率。不过无论从什么角度,无论如何划分,都不会改变 Java 堆中存储内容的共性,无论是哪个区域,存储的都只能是对象的实例,将 Java 堆细分的目的只是为了更好地回收内存,或者更快地分配内存。

🔔 注意:Java 堆既可以被实现成固定大小的,也可以是可扩展的,不过当前主流的 Java 虚拟机都是按照可扩展来实现的,扩展失败会抛出 OutOfMemoryError 异常。

可以通过 -Xms-Xmx 两个虚拟机参数来指定一个程序的 Java 堆内存大小,第一个参数设置初始值,第二个参数设置最大值。

1
java -Xms=1M -Xmx=2M HackTheJava

方法区

方法区(Method Area)是各个线程共享的内存区域。方法区用于存放已被加载的类信息、常量、静态变量、即时编译器编译后的代码等数据

在 JDK8 以前,方法区常被称为永久代,但这种说法是不准确的:仅仅是因为当时的 HotSpot 虚拟机使用永久代来实现方法区而已。对于其他虚拟机而言,是不存在永久代概念的。永久代这种设计,导致了 Java 应用更容易遇到内存溢出的问题(永久代有 -XX:MaxPermSize 的上限,即使不设置也有默认大小)。

  • JDK7 之前,HotSpot 虚拟机把它当成永久代来进行垃圾回收,可通过参数 -XX:PermSize-XX:MaxPermSize 设置。
  • JDK8 之后,取消了永久代,用 **metaspace(元空间)**替代,可通过参数 -XX:MaxMetaspaceSize 设置。

方法区的内存回收目标主要是针对常量池的回收和对类型的卸载,一般来说这个区域的回收效果比较难令人满意,尤其是类型的卸载,条件相当苛刻。

🔔 注意:方法区和 Java 堆一样不需要连续的内存,并且可以动态扩展,动态扩展失败一样会抛出 OutOfMemoryError 异常。

运行时常量池

运行时常量池(Runtime Constant Pool) 是方法区的一部分,Class 文件中除了有类的版本、字段、方法、接口等描述信息,还有一项信息是常量池表(Constant Pool Table),用于存放编译器生成的各种字面量和符号引用,这部分内容将在类加载后写入。

  • 字面量 - 文本字符串、声明为 final 的常量值等。
  • 符号引用 - 类和接口的完全限定名(Fully Qualified Name)、字段的名称和描述符(Descriptor)、方法的名称和描述符。

运行时常量池相对于 Class 文件常量池的另外一个重要特征是具备动态性,Java 语言并不要求常量 一定只有编译期才能产生,也就是说,并非预置入 Class 文件中常量池的内容才能进入方法区运行时常量池,运行期间也可以将新的常量放入池中,这种特性被开发人员利用得比较多的便是 String 类的 intern() 方法。

🔔 注意:当常量池无法再申请到内存时会抛出 OutOfMemoryError 异常。

直接内存

直接内存(Direct Memory)并不是虚拟机运行时数据区的一部分,也不是 JVM 规范中定义的内存区域。

JDK4 中新加入了 NIO,它可以使用 Native 函数库直接分配堆外内存,然后通过一个存储在 Java 堆里的 DirectByteBuffer 对象作为这块内存的引用进行操作。这样能在一些场景中显著提高性能,因为避免了在 Java 堆和 Native 堆中来回复制数据。

直接内存容量可通过 -XX:MaxDirectMemorySize 指定,如果不指定,则默认与 Java 堆最大值(-Xmx 指定)一样。

🔔 注意:直接内存这部分也被频繁的使用,且也可能导致 OutOfMemoryError 异常。

Java 内存区域对比

内存区域 内存作用范围 异常
程序计数器 线程私有
Java 虚拟机栈 线程私有 StackOverflowErrorOutOfMemoryError
本地方法栈 线程私有 StackOverflowErrorOutOfMemoryError
Java 堆 线程共享 OutOfMemoryError
方法区 线程共享 OutOfMemoryError
运行时常量池 线程共享 OutOfMemoryError
直接内存 非运行时数据区 OutOfMemoryError

虚拟机对象

对象的创建

当 Java 虚拟机遇到一条字节码 new 指令时,首先在常量池中尝试定位类的符号引用,并检查这个类是否已被类加载,如果没有,则必须先执行相应的类加载过程

在类加载检查通过后,接下来虚拟机将为新生对象分配内存。对象所需内存的大小在类加载完成后便可完全确定,为对象分配空间的任务实际上便等同于把一块确定大小的内存块从 Java 堆中划分出来。分配对象内存有两种方式:

指针碰撞(Bump The Pointer) - 如果 Java 堆中内存是规整的,所有被使用过的内存都被放在一 边,空闲的内存被放在另一边,中间放着一个指针作为分界点的指示器,那所分配内存就仅仅是把那个指针向空闲空间方向挪动一段与对象大小相等的距离。

空闲列表(Free List) - 如果 Java 堆中的内存是不规整的,已被使用的内存和空闲的内存相互交错在一起,那就没有办法简单地进行指针碰撞了,虚拟机就必须维护一个列表,记录上哪些内存块是可用的,在分配的时候从列表中找到一块足够大的空间划分给对象实例,并更新列表上的记录。

选择哪种分配方式由 Java 堆是否规整决定,而 Java 堆是否规整又由所采用的垃圾收集器是否采用标记-压缩算法决定。因此,当使用 Serial、ParNew 等带压缩整理过程的收集器时,系统采用的分配算法是指针碰撞,既简单又高效;而当使用 CMS 这种基于清除 (Sweep)算法的收集器时,理论上就只能采用较为复杂的空闲列表来分配内存。

对象创建在虚拟机中是非常频繁的行为,因此还需要考虑分配内存空间的并发安全问题。一般有两种方案:

  • CAS 同步 - 对分配内存空间的动作进行同步处理——实际上虚拟机是采用 CAS 配上失败 重试的方式保证更新操作的原子性;
  • TLAB - 另外一种是把内存分配的动作按照线程划分在不同的空间之中进行,即每个线程在 Java 堆中预先分配一小块内存,称为本地线程分配缓冲(Thread Local Allocation Buffer,TLAB),哪个线程要分配内存,就在哪个线程的本地缓冲区中分配,只有本地缓冲区用完了,分配新的缓存区时才需要同步锁定。

接下来,需要执行类的构造函数(即 <init>() 方法)对对象进行初始化。

对象的内存布局

在 HotSpot 虚拟机里,对象在堆内存中的存储布局可以划分为三个部分:

  • 对象头(Header) - HotSpot 虚拟机对象的对象头部分包括两类信息。
    • Mark Word - 用于存储对象自身的运行时数据。如哈 希码(HashCode)、GC 分代年龄、锁状态标志、线程持有的锁、偏向线程 ID、偏向时间戳等,这部分数据的长度在 32 位和 64 位的虚拟机(未开启压缩指针)中分别为 32 个比特和 64 个比特。
    • 类型指针 - 对象指向它的类型元数据的指针,Java 虚拟机通过这个指针来确定该对象是哪个类的实例。并不是所有的虚拟机实现都必须在对象数据上保留类型指针,换句话说,查找对象的元数据信息并不一定要经过对象本身。此外,如果对象是一个 Java 数组,那在对象头中还必须有一块用于记录数组长度的数据,因为虚拟机可以通过普通 Java 对象的元数据信息确定 Java 对象的大小,但是如果数组的长度是不确定的,将无法通过元数据中的信息推断出数组的大小。
  • 实例数据(Instance Data) - 对象真正存储的有效信息,即我们在程序代码里面所定义的各种类型的字段内容,无论是从父类继承下来的,还是在子类中定义的字段都必须记录起来。
  • 对齐填充(Padding) - 并不是必然存在的,也没有特别的含义,它仅仅起着占位符的作用。

对象的访问定位

Java 程序会通过栈上的 reference 数据来操作堆上的具体对象。主流的对象访问方式主要有使用句柄和直接指针两种:使用句柄访问和使用直接指针访问。

使用句柄访问

Java 堆中将可能会划分出一块内存来作为句柄池,reference 中存储的就是对象的句柄地址,而句柄中包含了对象实例数据与类型数据各自具体的地址信息。使用句柄来访问的最大好处就是 reference 中存储的是稳定句柄地 址,在对象被移动(垃圾收集时移动对象是非常普遍的行为)时只会改变句柄中的实例数据指针,而 reference 本身不需要被修改。

使用直接指针访问

Java 堆中对象的内存布局就必须考虑如何放置访问类型数据的相关信息,reference 中存储的直接就是对象地址,如果只是访问对象本身的话,就不需要多一次间接访问的开销。使用直接指针来访问最大的好处就是速度更快,它节省了一次指针定位的时间开销。HotSpot 主要使用第二种方式进行对象访问。

内存分配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class JVMCase {

// 常量
public final static String MAN_SEX_TYPE = "man";

// 静态变量
public static String WOMAN_SEX_TYPE = "woman";

public static void main(String[] args) {

Student stu = new Student();
stu.setName("nick");
stu.setSexType(MAN_SEX_TYPE);
stu.setAge(20);

JVMCase jvmcase = new JVMCase();

// 调用静态方法
print(stu);
// 调用非静态方法
jvmcase.sayHello(stu);
}

// 常规静态方法
public static void print(Student stu) {
System.out.println("name: " + stu.getName() + "; sex:" + stu.getSexType() + "; age:" + stu.getAge());
}

// 非静态方法
public void sayHello(Student stu) {
System.out.println(stu.getName() + "say: hello");
}
}

class Student{
String name;
String sexType;
int age;

public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}

public String getSexType() {
return sexType;
}
public void setSexType(String sexType) {
this.sexType = sexType;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}

运行以上代码时,JVM 处理过程如下:

(1)JVM 向操作系统申请内存,JVM 第一步就是通过配置参数或者默认配置参数向操作系统申请内存空间,根据内存大小找到具体的内存分配表,然后把内存段的起始地址和终止地址分配给 JVM,接下来 JVM 就进行内部分配。

(2)JVM 获得内存空间后,会根据配置参数分配堆、栈以及方法区的内存大小。

(3)class 文件加载、验证、准备以及解析,其中准备阶段会为类的静态变量分配内存,初始化为系统的初始值(这部分我在第 21 讲还会详细介绍)。

img

(4)完成上一个步骤后,将会进行最后一个初始化阶段。在这个阶段中,JVM 首先会执行构造器 <clinit> 方法,编译器会在 .java 文件被编译成 .class 文件时,收集所有类的初始化代码,包括静态变量赋值语句、静态代码块、静态方法,收集在一起成为 <clinit>() 方法。

img

(5)执行方法。启动 main 线程,执行 main 方法,开始执行第一行代码。此时堆内存中会创建一个 student 对象,对象引用 student 就存放在栈中。

img

(6)此时再次创建一个 JVMCase 对象,调用 sayHello 非静态方法,sayHello 方法属于对象 JVMCase,此时 sayHello 方法入栈,并通过栈中的 student 引用调用堆中的 Student 对象;之后,调用静态方法 print,print 静态方法属于 JVMCase 类,是从静态方法中获取,之后放入到栈中,也是通过 student 引用调用堆中的 student 对象。

img

内存溢出

OutOfMemoryError

OutOfMemoryError 简称为 OOM。Java 中对 OOM 的解释是,没有空闲内存,并且垃圾收集器也无法提供更多内存。通俗的解释是:JVM 内存不足了。

在 JVM 规范中,除了程序计数器区域外,其他运行时区域都可能发生 OutOfMemoryError 异常(简称 OOM)

下面逐一介绍 OOM 发生场景。

堆空间溢出

java.lang.OutOfMemoryError: Java heap space 意味着:堆空间溢出

更细致的说法是:Java 堆内存已经达到 -Xmx 设置的最大值。Java 堆用于存储对象实例,只要不断地创建对象,并且保证 GC Roots 到对象之间有可达路径来避免垃圾收集器回收这些对象,那么当堆空间到达最大容量限制后就会产生 OOM。

堆空间溢出有可能是内存泄漏(Memory Leak)内存溢出(Memory Overflow)

Java heap space 分析步骤
  1. 使用 jmap-XX:+HeapDumpOnOutOfMemoryError 获取堆快照。
  2. 使用内存分析工具(VisualVM、MAT、JProfile 等)对堆快照文件进行分析。
  3. 根据分析图,重点是确认内存中的对象是否是必要的,分清究竟是是内存泄漏还是内存溢出。
内存泄漏

内存泄漏(Memory Leak)是指由于疏忽或错误造成程序未能释放已经不再使用的内存的情况

内存泄漏并非指内存在物理上的消失,而是应用程序分配某段内存后,由于设计错误,失去了对该段内存的控制,因而造成了内存的浪费。内存泄漏随着被执行的次数不断增加,最终会导致内存溢出。

内存泄漏常见场景:

  • 静态容器
    • 声明为静态(static)的 HashMapVector 等集合
    • 通俗来讲 A 中有 B,当前只把 B 设置为空,A 没有设置为空,回收时 B 无法回收。因为被 A 引用。
  • 监听器
    • 监听器被注册后释放对象时没有删除监听器
  • 物理连接
    • 各种连接池建立了连接,必须通过 close() 关闭链接
  • 内部类和外部模块等的引用
    • 发现它的方式同内存溢出,可再加个实时观察
    • jstat -gcutil 7362 2500 70

重点关注:

  • FGC — 从应用程序启动到采样时发生 Full GC 的次数。
  • FGCT — 从应用程序启动到采样时 Full GC 所用的时间(单位秒)。
  • FGC 次数越多,FGCT 所需时间越多,越有可能存在内存泄漏。

如果是内存泄漏,可以进一步查看泄漏对象到 GC Roots 的对象引用链。这样就能找到泄漏对象是怎样与 GC Roots 关联并导致 GC 无法回收它们的。掌握了这些原因,就可以较准确的定位出引起内存泄漏的代码。

导致内存泄漏的常见原因是使用容器,且不断向容器中添加元素,但没有清理,导致容器内存不断膨胀。

【示例】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 内存泄漏示例
* 错误现象:java.lang.OutOfMemoryError: Java heap space
* VM Args:-verbose:gc -Xms10M -Xmx10M -XX:+HeapDumpOnOutOfMemoryError
*/
public class HeapMemoryLeakOOM {

public static void main(String[] args) {
List<OomObject> list = new ArrayList<>();
while (true) {
list.add(new OomObject());
}
}

static class OomObject {}

}
内存溢出

如果不存在内存泄漏,即内存中的对象确实都必须存活着,则应当检查虚拟机的堆参数(-Xmx-Xms),与机器物理内存进行对比,看看是否可以调大。并从代码上检查是否存在某些对象生命周期过长、持有时间过长的情况,尝试减少程序运行期的内存消耗。

【示例】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 堆溢出示例
* <p>
* 错误现象:java.lang.OutOfMemoryError: Java heap space
* <p>
* VM Args:-verbose:gc -Xms10M -Xmx10M
*/
public class HeapOutOfMemoryOOM {

public static void main(String[] args) {
Double[] array = new Double[999999999];
System.out.println("array length = [" + array.length + "]");
}

}

上面的例子是一个极端的例子,试图创建一个维度很大的数组,堆内存无法分配这么大的内存,从而报错:Java heap space

但如果在现实中,代码并没有问题,仅仅是因为堆内存不足,可以通过 -Xms-Xmx 适当调整堆内存大小。

GC 开销超过限制

java.lang.OutOfMemoryError: GC overhead limit exceeded 这个错误,官方给出的定义是:超过 98% 的时间用来做 GC 并且回收了不到 2% 的堆内存时会抛出此异常。这意味着,发生在 GC 占用大量时间为释放很小空间的时候发生的,是一种保护机制。导致异常的原因:一般是因为堆太小,没有足够的内存。

【示例】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* GC overhead limit exceeded 示例
* 错误现象:java.lang.OutOfMemoryError: GC overhead limit exceeded
* 发生在 GC 占用大量时间为释放很小空间的时候发生的,是一种保护机制。导致异常的原因:一般是因为堆太小,没有足够的内存。
* 官方对此的定义:超过 98%的时间用来做 GC 并且回收了不到 2%的堆内存时会抛出此异常。
* VM Args: -Xms10M -Xmx10M
*/
public class GcOverheadLimitExceededOOM {

public static void main(String[] args) {
List<Double> list = new ArrayList<>();
double d = 0.0;
while (true) {
list.add(d++);
}
}

}

【处理】

Java heap space 错误处理方法类似,先判断是否存在内存泄漏。如果有,则修正代码;如果没有,则通过 -Xms-Xmx 适当调整堆内存大小。

永久代空间不足

【错误】

1
java.lang.OutOfMemoryError: PermGen space

【原因】

Perm (永久代)空间主要用于存放 Class 和 Meta 信息,包括类的名称和字段,带有方法字节码的方法,常量池信息,与类关联的对象数组和类型数组以及即时编译器优化。GC 在主程序运行期间不会对永久代空间进行清理,默认是 64M 大小。

根据上面的定义,可以得出 PermGen 大小要求取决于加载的类的数量以及此类声明的大小。因此,可以说造成该错误的主要原因是永久代中装入了太多的类或太大的类。

在 JDK8 之前的版本中,可以通过 -XX:PermSize-XX:MaxPermSize 设置永久代空间大小,从而限制方法区大小,并间接限制其中常量池的容量。

初始化时永久代空间不足

【示例】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 永久代内存空间不足示例
* <p>
* 错误现象:
* <ul>
* <li>java.lang.OutOfMemoryError: PermGen space (JDK8 以前版本)</li>
* <li>java.lang.OutOfMemoryError: Metaspace (JDK8 及以后版本)</li>
* </ul>
* VM Args:
* <ul>
* <li>-Xmx100M -XX:MaxPermSize=16M (JDK8 以前版本)</li>
* <li>-Xmx100M -XX:MaxMetaspaceSize=16M (JDK8 及以后版本)</li>
* </ul>
*/
public class PermGenSpaceOOM {

public static void main(String[] args) throws Exception {
for (int i = 0; i < 100_000_000; i++) {
generate("eu.plumbr.demo.Generated" + i);
}
}

public static Class generate(String name) throws Exception {
ClassPool pool = ClassPool.getDefault();
return pool.makeClass(name).toClass();
}

}

在此示例中,源代码遍历循环并在运行时生成类。javassist 库正在处理类生成的复杂性。

重部署时永久代空间不足

对于更复杂,更实际的示例,让我们逐步介绍一下在应用程序重新部署期间发生的 PermGen 空间错误。重新部署应用程序时,你希望垃圾回收会摆脱引用所有先前加载的类的加载器,并被加载新类的类加载器取代。

不幸的是,许多第三方库以及对线程,JDBC 驱动程序或文件系统句柄等资源的不良处理使得无法卸载以前使用的类加载器。反过来,这意味着在每次重新部署期间,所有先前版本的类仍将驻留在 PermGen 中,从而在每次重新部署期间生成数十兆的垃圾。

让我们想象一个使用 JDBC 驱动程序连接到关系数据库的示例应用程序。启动应用程序时,初始化代码将加载 JDBC 驱动程序以连接到数据库。对应于规范,JDBC 驱动程序向 java.sql.DriverManager 进行注册。该注册包括将对驱动程序实例的引用存储在 DriverManager 的静态字段中。

现在,当从应用程序服务器取消部署应用程序时,java.sql.DriverManager 仍将保留该引用。我们最终获得了对驱动程序类的实时引用,而驱动程序类又保留了用于加载应用程序的 java.lang.Classloader 实例的引用。反过来,这意味着垃圾回收算法无法回收空间。

而且该 java.lang.ClassLoader 实例仍引用应用程序的所有类,通常在 PermGen 中占据数十兆字节。这意味着只需少量重新部署即可填充通常大小的 PermGen。

PermGen space 解决方案

(1)解决初始化时的 OutOfMemoryError

在应用程序启动期间触发由于 PermGen 耗尽导致的 OutOfMemoryError 时,解决方案很简单。该应用程序仅需要更多空间才能将所有类加载到 PermGen 区域,因此我们只需要增加其大小即可。为此,更改你的应用程序启动配置并添加(或增加,如果存在)-XX:MaxPermSize 参数,类似于以下示例:

1
java -XX:MaxPermSize=512m com.yourcompany.YourClass

上面的配置将告诉 JVM,PermGen 可以增长到 512MB。

清理应用程序中 WEB-INF/lib 下的 jar,用不上的 jar 删除掉,多个应用公共的 jar 移动到 Tomcat 的 lib 目录,减少重复加载。

🔔 注意:-XX:PermSize 一般设为 64M

(2)解决重新部署时的 OutOfMemoryError

重新部署应用程序后立即发生 OutOfMemoryError 时,应用程序会遭受类加载器泄漏的困扰。在这种情况下,解决问题的最简单,继续进行堆转储分析–使用类似于以下命令的重新部署后进行堆转储:

1
jmap -dump:format=b,file=dump.hprof <process-id>

然后使用你最喜欢的堆转储分析器打开转储(Eclipse MAT 是一个很好的工具)。在分析器中可以查找重复的类,尤其是那些正在加载应用程序类的类。从那里,你需要进行所有类加载器的查找,以找到当前活动的类加载器。

对于非活动类加载器,你需要通过从非活动类加载器收集到 GC 根的最短路径来确定阻止它们被垃圾收集的引用。有了此信息,你将找到根本原因。如果根本原因是在第三方库中,则可以进入 Google/StackOverflow 查看是否是已知问题以获取补丁/解决方法。

(3)解决运行时 OutOfMemoryError

第一步是检查是否允许 GC 从 PermGen 卸载类。在这方面,标准的 JVM 相当保守-类是天生的。因此,一旦加载,即使没有代码在使用它们,类也会保留在内存中。当应用程序动态创建许多类并且长时间不需要生成的类时,这可能会成为问题。在这种情况下,允许 JVM 卸载类定义可能会有所帮助。这可以通过在启动脚本中仅添加一个配置参数来实现:

1
-XX:+CMSClassUnloadingEnabled

默认情况下,此选项设置为 false,因此要启用此功能,你需要在 Java 选项中显式设置。如果启用 CMSClassUnloadingEnabled,GC 也会扫描 PermGen 并删除不再使用的类。请记住,只有同时使用 UseConcMarkSweepGC 时此选项才起作用。

1
-XX:+UseConcMarkSweepGC

在确保可以卸载类并且问题仍然存在之后,你应该继续进行堆转储分析–使用类似于以下命令的方法进行堆转储:

1
jmap -dump:file=dump.hprof,format=b <process-id>

然后,使用你最喜欢的堆转储分析器(例如 Eclipse MAT)打开转储,然后根据已加载的类数查找最昂贵的类加载器。从此类加载器中,你可以继续提取已加载的类,并按实例对此类进行排序,以使可疑对象排在首位。

然后,对于每个可疑者,就需要你手动将根本原因追溯到生成此类的应用程序代码。

元数据区空间不足

【错误】

1
Exception in thread "main" java.lang.OutOfMemoryError: Metaspace

【原因】

Java8 以后,JVM 内存空间发生了很大的变化。取消了永久代,转而变为元数据区。

元数据区的内存不足,即方法区和运行时常量池的空间不足

方法区用于存放 Class 的相关信息,如类名、访问修饰符、常量池、字段描述、方法描述等。

一个类要被垃圾收集器回收,判定条件是比较苛刻的。在经常动态生成大量 Class 的应用中,需要特别注意类的回收状况。这类常见除了 CGLib 字节码增强和动态语言以外,常见的还有:大量 JSP 或动态产生 JSP 文件的应用(JSP 第一次运行时需要编译为 Java 类)、基于 OSGi 的应用(即使是同一个类文件,被不同的加载器加载也会视为不同的类)等。

【示例】方法区出现 OutOfMemoryError

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class MethodAreaOutOfMemoryDemo {

public static void main(String[] args) {
while (true) {
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(Bean.class);
enhancer.setUseCache(false);
enhancer.setCallback(new MethodInterceptor() {
@Override
public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
return proxy.invokeSuper(obj, args);
}
});
enhancer.create();
}
}

static class Bean {}

}

【解决】

当由于元空间而面临 OutOfMemoryError 时,第一个解决方案应该是显而易见的。如果应用程序耗尽了内存中的 Metaspace 区域,则应增加 Metaspace 的大小。更改应用程序启动配置并增加以下内容:

1
-XX:MaxMetaspaceSize=512m

上面的配置示例告诉 JVM,允许 Metaspace 增长到 512 MB。

另一种解决方案甚至更简单。你可以通过删除此参数来完全解除对 Metaspace 大小的限制,JVM 默认对 Metaspace 的大小没有限制。但是请注意以下事实:这样做可能会导致大量交换或达到本机物理内存而分配失败。

无法新建本地线程

java.lang.OutOfMemoryError: Unable to create new native thread 这个错误意味着:Java 应用程序已达到其可以启动线程数的限制

【原因】

当发起一个线程的创建时,虚拟机会在 JVM 内存创建一个 Thread 对象同时创建一个操作系统线程,而这个系统线程的内存用的不是 JVM 内存,而是系统中剩下的内存。

那么,究竟能创建多少线程呢?这里有一个公式:

1
线程数 = (MaxProcessMemory - JVMMemory - ReservedOsMemory) / (ThreadStackSize)

【参数】

  • MaxProcessMemory - 一个进程的最大内存
  • JVMMemory - JVM 内存
  • ReservedOsMemory - 保留的操作系统内存
  • ThreadStackSize - 线程栈的大小

**给 JVM 分配的内存越多,那么能用来创建系统线程的内存就会越少,越容易发生 unable to create new native thread**。所以,JVM 内存不是分配的越大越好。

但是,通常导致 java.lang.OutOfMemoryError 的情况:无法创建新的本机线程需要经历以下阶段:

  1. JVM 内部运行的应用程序请求新的 Java 线程
  2. JVM 本机代码代理为操作系统创建新本机线程的请求
  3. 操作系统尝试创建一个新的本机线程,该线程需要将内存分配给该线程
  4. 操作系统将拒绝本机内存分配,原因是 32 位 Java 进程大小已耗尽其内存地址空间(例如,已达到(2-4)GB 进程大小限制)或操作系统的虚拟内存已完全耗尽
  5. 引发 java.lang.OutOfMemoryError: Unable to create new native thread 错误。

【示例】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class UnableCreateNativeThreadOOM {

public static void main(String[] args) {
while (true) {
new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MINUTES.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}

【处理】

可以通过增加操作系统级别的限制来绕过无法创建新的本机线程问题。例如,如果限制了 JVM 可在用户空间中产生的进程数,则应检查出并可能增加该限制:

1
2
3
4
[root@dev ~]# ulimit -a
core file size (blocks, -c) 0
--- cut for brevity ---
max user processes (-u) 1800

通常,OutOfMemoryError 对新的本机线程的限制表示编程错误。当应用程序产生数千个线程时,很可能出了一些问题—很少有应用程序可以从如此大量的线程中受益。

解决问题的一种方法是开始进行线程转储以了解情况。

直接内存溢出

直接内存(Direct Memory)的容量大小可通过 -XX:MaxDirectMemorySize 参数来指定,如果不指定,则默认与 Java 堆最大值(由 -Xmx 指定)一致。

由直接内存导致的内存溢出,一个明显的特征是在 Heap Dump 文件中不会看见有什么明显的异常情况,如果读者发现内存溢出之后产生的 Dump 文件很小,而程序中又直接或间接使用了 DirectMemory(典型的间接使用就是 NIO),那就可以考虑重点检查一下直接内存方面的原因了。

由直接内存导致的内存溢出,一个明显的特征是在 Heapdump 文件中不会看见明显的异常,如果发现 OOM 之后 Dump 文件很小,而程序中又直接或间接使用了 NIO,就可以考虑检查一下是不是这方面的原因。

【示例】直接内存 OutOfMemoryError

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 本机直接内存溢出示例
* 错误现象:java.lang.OutOfMemoryError
* VM Args:-Xmx20M -XX:MaxDirectMemorySize=10M
*/
public class DirectOutOfMemoryDemo {

private static final int _1MB = 1024 * 1024;

public static void main(String[] args) throws IllegalAccessException {
Field unsafeField = Unsafe.class.getDeclaredFields()[0];
unsafeField.setAccessible(true);
Unsafe unsafe = (Unsafe) unsafeField.get(null);
while (true) {
unsafe.allocateMemory(_1MB);
}
}

}

StackOverflowError

HotSpot 虚拟机中并不区分虚拟机栈和本地方法栈。

对于 HotSpot 虚拟机来说,栈容量只由 -Xss 参数来决定。

栈溢出的常见原因:

  • 递归函数调用层数太深 - 线程请求的栈深度大于虚拟机所允许的最大深度,将抛出 StackOverflowError 异常。

  • 大量循环或死循环 - 虚拟机的栈内存允许动态扩展,当扩展栈容量无法申请到足够的内存时,将抛出

    OutOfMemoryError 异常。

【示例】递归函数调用层数太深导致 StackOverflowError

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 以一个无限递归的示例方法来展示栈溢出
* <p>
* 栈溢出时,Java 会抛出 StackOverflowError ,出现此种情况是因为方法运行的时候栈的大小超过了虚拟机的上限所致。
* <p>
* Java 应用程序唤起一个方法调用时就会在调用栈上分配一个栈帧,这个栈帧包含引用方法的参数,本地参数,以及方法的返回地址。
* <p>
* 这个返回地址是被引用的方法返回后,程序能够继续执行的执行点。
* <p>
* 如果没有一个新的栈帧所需空间,Java 就会抛出 StackOverflowError。
* <p>
* VM 参数:
* <ul>
* <li>-Xss228k - 设置栈大小为 228k</li>
* </ul>
* <p>
*
*/
public class StackOverflowErrorDemo {

private int stackLength = 1;

public static void main(String[] args) {
StackOverflowErrorDemo obj = new StackOverflowErrorDemo();
try {
obj.recursion();
} catch (Throwable e) {
System.out.println("栈深度:" + obj.stackLength);
e.printStackTrace();
}
}

public void recursion() {
stackLength++;
recursion();
}

}

【示例】大量循环或死循环导致 StackOverflowError

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* 类成员循环依赖,导致 StackOverflowError
*
* VM 参数:
*
* -Xss228k - 设置栈大小为 228k
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @since 2019-06-25
*/
public class StackOverflowErrorDemo2 {

public static void main(String[] args) {
A obj = new A();
System.out.println(obj.toString());
}

static class A {

private int value;

private B instance;

public A() {
value = 0;
instance = new B();
}

@Override
public String toString() {
return "<" + value + ", " + instance + ">";
}

}

static class B {

private int value;

private A instance;

public B() {
value = 10;
instance = new A();
}

@Override
public String toString() {
return "<" + value + ", " + instance + ">";
}

}

}

参考资料

Redis 哨兵

Redis 2.8 版本,新增了哨兵模式,以支持“自动故障转移”,它是 Redis 的 HA 方案。

Redis 哨兵模式由一个或多个 Sentinel 实例组成 Sentinel 集群,可以监控任意多个主服务器,以及这些主服务器的所有从服务器;并在被监视的主服务器进入下线状态时,自动将下线主服务器的某个从服务器升级为新的主服务器,然后由新的主服务器代替已下线的主服务器继续处理命令请求。

关键词:高可用监控选主故障转移Raft

哨兵简介

Redis 的主从复制模式,虽然提供了一定程度的 高可用性(High Availability)。但是,当主节点出现故障时,只能通过手动操作将从节点晋升为主节点,这显然是比较低效的。为了解决这个问题,Redis 2.8 版本提供了哨兵模式(Sentinel)来支持“自动故障转移”。

Redis 哨兵模式由一个或多个 Sentinel 实例组成 Sentinel 集群,可以监控任意多个主服务器,以及这些主服务器的所有从服务器;并在被监视的主服务器进入下线状态时,自动将下线主服务器的某个从服务器升级为新的主服务器,然后由新的主服务器代替已下线的主服务器继续处理命令请求。

Sentinel 的主要功能如下:

  • 监控(Monitoring) - Sentinel 不断检查主从服务器是否正常在工作。
  • 通知(Notification) - Sentinel 可以通过一个 api 来通知系统管理员或者另外的应用程序,被监控的 Redis 实例有一些问题。
  • 自动故障转移(Automatic Failover) - 如果一个主服务器下线,Sentinel 会开始自动故障转移:把一个从节点提升为主节点,并重新配置其他的从节点使用新的主节点,使用 Redis 服务的应用程序在连接的时候也被通知新的地址。
  • 配置提供者(Configuration provider) - Sentinel 给客户端的服务发现提供来源:对于一个给定的服务,客户端连接到 Sentinels 来寻找当前主节点的地址。当故障转移发生的时候,Sentinel 将报告新的地址。

启动哨兵

启动一个 Sentinel 可以使用下面任意一条命令,两条命令效果完全相同。

1
2
redis-sentinel /path/to/sentinel.conf
redis-server /path/to/sentinel.conf --sentinel

当一个 Sentinel 启动时,它需要执行以下步骤:

  1. 初始化服务器。
  2. 使用 Sentinel 专用代码。
  3. 初始化 Sentinel 状态。
  4. 初始化 Sentinel 的主服务器列表。
  5. 创建连向被监视的主服务器的网络连接。

Sentinel 本质上是一个运行在“特殊模式”下的 Redis 服务器。Sentinel 模式下 Redis 服务器只支持 PINGSENTINELINFOSUBSCRIBEUNSUBSCRIBEPSUBSCRIBEPUNSUBSCRIBE 七个命令。

创建连向被监视的主服务器的网络连接,Sentinel 将成为主服务器的客户端,它可以向主服务器发送命令,并从命令回复中获取相关的信息。Sentinel 会读入用户指定的配置文件, 为每个要被监视的主服务器创建相应的实例结构, 并创建连向主服务器的命令连接和订阅连接:

  • 命令连接 - 专门用于向主服务器发送命令,并接受命令回复。
  • 订阅连接 - 专门用于订阅主服务器的 __sentinel__:hello 频道。

监控

获取服务器信息

默认情况下, Sentinel 以“每十秒一次”的频率向被监视的主服务器和从服务器发送 INFO 命令,并通过分析 INFO 命令的回复来获取服务器的当前信息。

  • 主服务器 - 可以获取主服务器自身信息,以及其所属从服务器的地址信息。
  • 从服务器 - 从服务器自身信息,以及其主服务器的了解状态和地址。

Sentinel 通过向主服务器发送 INFO 命令来获得主服务器属下所有从服务器的地址信息, 并为这些从服务器创建相应的实例结构, 以及连向这些从服务器的“命令连接”和“订阅连接”。

对于监视同一个主服务器和从服务器的多个 Sentinel 来说, 它们会以“每两秒一次”的频率, 通过向被监视服务器的 __sentinel__:hello 频道发送消息来向其他 Sentinel 宣告自己的存在。Sentinel 只会与主服务器和从服务器创建命令连接和订阅连接, Sentinel 与 Sentinel 之间则只创建命令连接。

判断下线

主观下线

默认,每个 Sentinel 以“每秒一次”的频率,向它所知的“所有实例”发送一个 PING 命令

  • “所知”是指,与 Sentinel 创建了命令连接的实例。
  • “所有实例”包括了主服务器、从服务器以及其他 Sentinel 实例。

如果,某实例在指定的时长( down-after-milliseconds 设置的值,单位毫秒)中,未向 Sentinel 发送有效回复, Sentinel 会将该实例判定为“主观下线”。

  • 一个有效的 PING 回复可以是:+PONG-LOADING 或者 -MASTERDOWN。如果服务器返回除以上三种回复之外的其他回复,又或者在 指定时间 内没有回复 PING 命令, 那么 Sentinel 认为服务器返回的回复无效。
  • “主观下线”适用于所有主节点和从节点。

客观下线

当一个“主服务器”被 Sentinel 标记为“主观下线”后,为了确认其是否真的下线,Sentinel 会向同样监听该主服务器的其他 Sentinel 发起询问。如果有“足够数量”的 Sentinel 在指定的时间范围内认为主服务器已下线,那么这个“主服务器”被标记为“客观下线”。

  • Sentinel 节点通过 sentinel is-master-down-by-addr 命令,向其它 Sentinel 节点询问对某主服务器的 状态判断
  • “足够数量”是指 Sentinel 配置中 quorum 参数所设的值。
  • 客观下线只适用于主节点。

注:默认情况下, Sentinel 以“每十秒一次”的频率向被监视的主服务器和从服务器发送 INFO 命令。当一个主服务器被 Sentinel 标记为“客观下线”时,Sentinel 向该主服务器的所有从服务器发送 INFO 命令的频率,会从“每十秒一次”改为“每秒一次”。

选主

Redis Sentinel 采用 Raft 协议 实现了其 Sentinel 选主流程。Raft 是一种共识性算法,想了解其原理,可以参考 深入剖析共识性算法 Raft

当一个“主服务器”被判断为“客观下线”时,监视该主服务器的各个 Sentinel 会进行“协商”,选举出一个领头的 Sentinel(Leader),并由领头 Sentinel 对下线主服务器执行“故障转移”操作

所有在线 Sentinel 都有资格被选为 Leader。

  1. 当一个 Sentinel 认定某主服务器是“客观下线”后,该 Sentinel 会先看看自己是否投过票。
    • 如果已投票给其他 Sentinel 了,在 2 倍故障转移的超时时间内,都不能竞选 Leader——相当于它是一个 Follower
    • 如果未投票,那么该 Sentinel 可以竞选 Leader,转为 Candidate
  2. 如 Raft 协议所描述的,Candidate 需要完成几件事情:
    1. 更新故障转移状态为 start
    2. 将当前纪元(epoch) 加 1,表明开始新一轮的选举——这里的 epoch 相当于 Raft 协议中的 term
    3. 将自身的超时时间设为当前时间加上一个随机值,随机值为 1s 内的随机毫秒数。
    4. 向其他节点发送 is-master-down-by-addr 命令,请求其他节点投票支持自己,命令会携带自己的 epoch
    5. Candidate 会投票给自己。在 Sentinel 中,投票的方式是把自己 master 结构体里的 leaderleader_epoch 改成投给的 Sentinel 和它的 epoch
  3. 其他 Sentinel 收到 Candidateis-master-down-by-addr 命令后,如果 Sentinel 当前 epochCandidate 传给他的 epoch 一样,说明他已经把自己 master 结构体里的 leaderleader_epoch 改成其他 Candidate,相当于把票投给了其他 Candidate。投票给其他 Sentinel 后,在当前 epoch 内,该 Sentinel 就只能成为 Follower
  4. Candidate 会不断的统计自己的票数,如果满足“当选投票条件”,则该 Candidate 当选 Leader
    1. 票数超过一半(监控主服务器的 Sentinel 的节点数的一半 + 1)
    2. 票数超过 Sentinel 配置的 quorum 参数——注:Raft 协议中没有这个限制,这是 Redis Sentinel 所独有的
  5. 如果在一个选举周期内(epoch),Candidate 没有满足“当选投票条件”(第 4 点描述的),则竞选失败。
  6. 如果在一个选举周期内(epoch),没有一个 Candidate 满足“当选投票条件”,说明所有 Candidate 都竞选失败,本轮选举作废。在等待超过 2 倍故障转移的超时时间后,开始新一轮的选举。
  7. 与 Raft 协议不同的是,Leader 并不会把自己成为 Leader 的消息发给其他 Sentinel。当 Leader 完成故障转移后,其他 Sentinel 检测到新的主服务器正常工作后,就会去掉“客观下线”的标识,从而不需要再发起选举。

故障转移

在选举产生出 Sentinel Leader 后,Sentinel Leader 将对已下线的主服务器执行故障转移操作。操作含以下三个步骤:

(1)选出新的主服务器

故障转移第一步,是 Sentinel Leader 在已下线主服务属下的所有从服务器中,挑选一个状态良好、数据完整的从服务器。然后,向这个从服务器发送 SLAVEOF no one 命令,将其转换为主服务器。

Sentinel Leader 如何选出新的主服务器:

  • 删除列表中所有处于下线或断线状态的从服务器。
  • 删除列表中所有最近五秒没有回复过 Sentinel Leader 的 INFO 命令的从服务器。
  • 删除所有与已下线主服务器连接断开超过 down-after-milliseconds * 10 毫秒的从服务器(down-after-milliseconds 指定了判断主服务器下线所需的时间)。
  • 之后, Sentinel Leader 先选出优先级最高的从服务器;如果优先级一样高,再选择复制偏移量最大的从服务器;如果结果还不唯一,则选出运行 ID 最小的从服务器。

(2)修改从服务器的复制目标

选出新的主服务器后,Sentinel Leader 会向所有从服务器发送 SLAVEOF 命令,让它们去复制新的主服务器。

(3)将旧的主服务器变为从服务器

Sentinel Leader 将旧的主服务器标记为从服务器。当旧的主服务器重新上线,Sentinel 会向它发送 SLAVEOF 命令,让其成为从服务器。

参考资料

Redis 复制

在 Redis 中,可以通过执行 SLAVEOF 命令或设置 slaveof 选项,让一个服务器去复制(replicate)另一个服务器,其中,后者叫主服务器(master),前者叫从服务器(slave)。

Redis 2.8 以前的复制不能高效处理断线后重复制的情况,而 Redis 2.8 新添的部分重同步可以解决这个问题。

关键词:SLAVEOFSYNCPSYNC命令传播心跳

复制简介

Redis 通过 slaveof host port 命令来让一个服务器成为另一个服务器的从服务器。

一个主服务器可以有多个从服务器。不仅主服务器可以有从服务器,从服务器也可以有自己的从服务器, 多个从服务器之间可以构成一个主从链。

一个从服务器只能有一个主服务器,并且不支持主主复制

可以通过复制功能来让主服务器免于执行持久化操作: 只要关闭主服务器的持久化功能, 然后由从服务器去执行持久化操作即可。

在使用 Redis 复制功能时的设置中,强烈建议在 master 和在 slave 中启用持久化。当不启用时,例如由于非常慢的磁盘性能而导致的延迟问题,应该配置实例来避免重置后自动重启

从 Redis 2.6 开始, 从服务器支持只读模式, 并且该模式为从服务器的默认模式。

  • 只读模式由 redis.conf 文件中的 slave-read-only 选项控制, 也可以通过 CONFIG SET parameter value 命令来开启或关闭这个模式。
  • 只读从服务器会拒绝执行任何写命令, 所以不会出现因为操作失误而将数据不小心写入到了从服务器的情况。

旧版复制

Redis 2.8 版本以前实现方式:SYNC 命令

Redis 的复制功能分为同步(sync)和命令传播(command propagate)两个操作:

  • 同步(sync) - 用于将从服务器的数据库状态更新至主服务器当前的数据库状态。
  • 命令传播(command propagate) - 当主服务器的数据库状态被修改,导致主从数据库状态不一致时,让主从服务器的数据库重新回到一致状态。

同步

SYNC 命令的执行步骤:

  1. 从服务器向主服务器发送 SYNC 命令。
  2. 收到 SYNC 命令的主服务器执行 BGSAVE 命令,在后台生成一个 RDB 文件,并使用一个缓冲区记录从现在开始执行的所有写命令。
  3. 主服务器执行 BGSAVE 完毕后,主服务器会将生成的 RDB 文件发送给从服务器。从服务器接收并载入 RDB 文件,更新自己的数据库状态。
  4. 主服务器将记录在缓冲区中的所有写命令发送给从服务器,从服务器执行这些写命令,更新自己的数据库状态。

命令传播

同步操作完成后,主从数据库的数据库状态将达到一致。每当主服务器执行客户端发送的写命令时,主从数据库状态不再一致。需要将写命令发送给从服务器执行,使得二者的数据库状态重新达到一致。

旧版复制的缺陷

从服务器对主服务器的复制存在两种情况:

  • 初次复制 - 从服务器以前没有复制过将要复制的主服务器。
  • 断线后重复制 - 处于命令传播阶段的主从服务器因为网络原因而中断了复制,当从服务器通过自动重连重新连上了主服务器后,继续复制主服务器。

对于初次复制,旧版复制功能可用很好完成任务;但是对于断线后重复制,由于每次任然需要生成 RDB 并传输,效率很低

🔔 注意:SYNC 命令是一个非常耗费资源的操作。

  • 主服务器执行 BGSAVE 命令生成 RDB 文件,这个操作会耗费主服务器大量的 CPU、内存和磁盘 I/O 资源。
  • 主服务器传输 RDB 文件给从服务器,这个操作会耗费主从服务器大量的网络资源,并对主服务器响应时延产生影响。
  • 从服务器载入 RDB 文件期间,会阻塞其他命令请求。

新版复制

Redis 2.8 版本以后的新实现方式:使用 PSYNC 命令替代 SYNC 命令。

PSYNC 命令具有完整重同步和部分重同步两种模式:

  • 完整重同步(full resychronization) - 用于初次复制。执行步骤与 SYNC 命令基本一致。
  • 部分重同步(partial resychronization) - 用于断线后重复制。如果条件允许,主服务器可以将主从服务器连接断开期间执行的写命令发送给从服务器,从服务器只需接收并执行这些写命令,即可将主从服务器的数据库状态保持一致。

部分重同步

部分重同步功能实现由三个部分构成:

  • 主从服务器的复制偏移量(replication offset)
  • 主服务器的复制积压缓冲区(replication backlog)
  • 服务器的运行 ID

复制偏移量

主服务器和从服务器会分别维护一个复制偏移量。

  • 如果主从服务器的复制偏移量相同,则说明二者的数据库状态一致;
  • 反之,则说明二者的数据库状态不一致。

复制积压缓冲区

复制积压缓冲区是主服务器维护的一个固定长度的先进先出(FIFO)队列,默认大小为 1MB

复制积压缓冲区会保存一部分最近传播的写命令,并且复制积压缓冲区会为队列中的每个字节记录相应的复制偏移量。

当从服务器断线重连主服务时,从服务器会通过 PSYNC 命令将自己的复制偏移量 offset 发送给主服务器,主服务器会根据这个复制偏移量来决定对从服务器执行何种同步操作。

  • 如果 offset 之后的数据仍然在复制积压缓冲区,则主服务器对从服务器执行部分重同步操作。
  • 反之,则主服务器对从服务器执行完整重同步操作。

🔔 注意:合理调整复制积压缓冲区的大小

  • Redis 复制积压缓冲区默认大小为 1MB

  • 复制积压缓冲区的最小大小可以根据公式 second * write_size_per_second 估算。

服务器的运行 ID

  • 每个 Redis 服务器,都有运行 ID,用于唯一识别身份。
  • 运行 ID 在服务器启动时自动生成,由 40 个随机的十六进制字符组成。例如:132e358005e29741f8d7b0a42d666aace286edda

从服务器对主服务器进行初次复制时,主服务器会将自己的运行 ID 传送给从服务器,从服务器会将这个运行 ID 保存下来。

当从服务器断线重连一个主服务器时,从服务器会发送之前保存的运行 ID:

  • 如果保存的运行 ID 和当前主服务器的运行 ID 一致,则说明从服务器断线之前连接的就是这个主服务器,主服务器可以继续尝试执行部分重同步操作;
  • 反之,若运行 ID 不一致,则说明从服务器断线之前连接的不是这个主服务器,主服务器将对从服务器执行完整重同步操作。

PSYNC 命令

了解了部分重同步的实现,PSYNC 的实现就很容易理解了,它的基本工作原理大致如下:

当从服务接收到 SLAVEOF 命令时,先判断从服务器以前是否执行过复制操作。

  • 如果没有复制过任何主服务器,向要复制的主服务器发送 PSYNC ? -1 命令,主动请求进行完整重同步
  • 反之,向要复制的主服务器发送 PSYNC <runid> <offset> 命令。
    • runid 是上一次复制的主服务器的运行 ID。
    • offset 是复制偏移量。

接收到 PSYNC <runid> <offset> 命令的主服务会进行分析:

  • 假如主从服务器的 master run id 相同,并且指定的偏移量(offset)在内存缓冲区中还有效,复制就会从上次中断的点开始继续。
  • 如果其中一个条件不满足,就会进行完全重新同步(在 2.8 版本之前就是直接进行完全重新同步)。

心跳检测

命令传播阶段,从服务器默认会以每秒一次的频率,向主服务器发送命令:

1
REPLCONF ACK <replication_offset>

其中,replication_offset 是从服务器当前的复制偏移量。

发送 REPLCONF ACK 命令对于主从服务器有三个作用:

  • 检测主从服务器的网络连接状态。
  • 辅助实现 min-slaves 选项。
  • 检测命令丢失。

检测主从连接状态

可以通过发送和接收 REPLCONF ACK 命令来检查主从服务器之间的网络连接是否正常:如果主服务器超过一秒没有收到从服务器发来的 REPLCONF ACK 命令,那么主服务器就知道主从服务器之间的连接出现问题了。

可以通过向主服务器发送 INFO replication 命令,在列出的从服务器列表的 lag 一栏中,可以看到从服务器向主服务器发送 REPLCONF ACK 命令已经过去多少秒。

辅助实现 min-slaves 选项

Redis 的 min-slaves-to-writemin-slaves-max-lag 两个选项可以防止主服务器在不安全的情况下执行写命令

【示例】min-slaves 配置项

1
2
min-slaves-to-write 3
min-slaves-max-lag 10

以上配置表示:从服务器小于 3 个,或三个从服务器的延迟(lag)都大于等于 10 秒时,主服务器将拒绝执行写命令。

检测命令丢失

如果因为网络故障,主服务传播给从服务器的写命令丢失,那么从服务器定时向主服务器发送 REPLCONF ACK 命令时,主服务器将发觉从服务器的复制偏移量少于自己的。然后,主服务器就会根据从服务器提交的复制偏移量,在复制积压缓冲区中找到从服务器缺少的数据,并将这些数据重新发送给从服务器。

复制的流程

通过向从服务器发送如下 SLAVEOF 命令,可以让一个从服务器去复制一个主服务器。

1
SLAVEOF <master_ip> <master_port>

步骤 1. 设置主从服务器

配置一个从服务器非常简单, 只要在配置文件中增加以下的这一行就可以了:

1
slaveof 127.0.0.1 6379

当然, 你需要将代码中的 127.0.0.16379 替换成你的主服务器的 IP 和端口号。

另外一种方法是调用 SLAVEOF host port 命令, 输入主服务器的 IP 和端口, 然后同步就会开始:

1
2
127.0.0.1:6379> SLAVEOF 127.0.0.1 10086
OK

步骤 2. 主从服务器建立 TCP 连接。

步骤 3. 发送 PING 检查通信状态。

步骤 4. 身份验证。

如果主服务器没有设置 requirepass ,从服务器没有设置 masterauth,则不进行身份验证;反之,则需要进行身份验证。如果身份验证失败,则放弃执行复制工作。

如果主服务器通过 requirepass 选项设置了密码, 那么为了让从服务器的同步操作可以顺利进行, 我们也必须为从服务器进行相应的身份验证设置。

对于一个正在运行的服务器, 可以使用客户端输入以下命令:

1
config set masterauth <password>

要永久地设置这个密码, 那么可以将它加入到配置文件中:

1
masterauth <password>

另外还有几个选项, 它们和主服务器执行部分重同步时所使用的复制流缓冲区有关, 详细的信息可以参考 Redis 源码中附带的 redis.conf 示例文件。

步骤 5. 发送端口信息。

从服务器执行 REPLCONF listening-port <port-number> ,向主服务器发送从服务器的监听端口号。

步骤 6. 同步。

前文已介绍,此处不赘述。

步骤 7. 命令传播。

在命令传播阶段,从服务器默认会以每秒一次的频率,向主服务发送命令:

1
REPLCONF ACK <replication_coffset>

命令的作用:

  • 检测主从服务器的网络连接状态。
  • 辅助实现 min-slave 选项。
  • 检测命令丢失。

复制的配置项

从 Redis 2.8 开始, 为了保证数据的安全性, 可以通过配置, 让主服务器只在有至少 N 个当前已连接从服务器的情况下, 才执行写命令。

不过, 因为 Redis 使用异步复制, 所以主服务器发送的写数据并不一定会被从服务器接收到, 因此, 数据丢失的可能性仍然是存在的。

以下是这个特性的运作原理:

  • 从服务器以每秒一次的频率 PING 主服务器一次, 并报告复制流的处理情况。
  • 主服务器会记录各个从服务器最后一次向它发送 PING 的时间。
  • 用户可以通过配置, 指定网络延迟的最大值 min-slaves-max-lag , 以及执行写操作所需的至少从服务器数量 min-slaves-to-write

如果至少有 min-slaves-to-write 个从服务器, 并且这些服务器的延迟值都少于 min-slaves-max-lag秒, 那么主服务器就会执行客户端请求的写操作。

你可以将这个特性看作 CAP 理论中的 C 的条件放宽版本: 尽管不能保证写操作的持久性, 但起码丢失数据的窗口会被严格限制在指定的秒数中。

另一方面, 如果条件达不到 min-slaves-to-writemin-slaves-max-lag 所指定的条件, 那么写操作就不会被执行, 主服务器会向请求执行写操作的客户端返回一个错误。

以下是这个特性的两个选项和它们所需的参数:

  • min-slaves-to-write <number of slaves>
  • min-slaves-max-lag <number of seconds>

详细的信息可以参考 Redis 源码中附带的 redis.conf 示例文件。

参考资料

Redis 实战

缓存

缓存是 Redis 最常见的应用场景。

Redis 有多种数据类型,以及丰富的操作命令,并且有着高性能、高可用的特性,非常适合用于分布式缓存。

缓存应用的基本原理,请参考 缓存基本原理 第四 ~ 第六节内容。

BitMap 和 BloomFilter

Redis 除了 5 种基本数据类型外,还支持 BitMap 和 BloomFilter(即布隆过滤器,可以通过 Redis Module 支持)。

BitMap 和 BloomFilter 都可以用于解决缓存穿透问题。要点在于:过滤一些不可能存在的数据。

什么是缓存穿透,可以参考:缓存基本原理

小数据量可以用 BitMap,大数据量可以用布隆过滤器。

分布式锁

使用 Redis 作为分布式锁,基本要点如下:

  • 互斥性 - 使用 setnx 抢占锁。
  • 避免永远不释放锁 - 使用 expire 加一个过期时间,避免一直不释放锁,导致阻塞。
  • 原子性 - setnx 和 expire 必须合并为一个原子指令,避免 setnx 后,机器崩溃,没来得及设置 expire,从而导致锁永不释放。

更多分布式锁的实现方式及细节,请参考:分布式锁基本原理

根据 Redis 的特性,在实际应用中,存在一些应用小技巧。

keys 和 scan

使用 keys 指令可以扫出指定模式的 key 列表。

如果这个 redis 正在给线上的业务提供服务,那使用 keys 指令会有什么问题?

首先,Redis 是单线程的。keys 指令会导致线程阻塞一段时间,线上服务会停顿,直到指令执行完毕,服务才能恢复。

这个时候可以使用 scan 指令,scan 指令可以无阻塞的提取出指定模式的 key 列表,但是会有一定的重复概率,在客户端做一次去重就可以了,但是整体所花费的时间会比直接用 keys 指令长。

不过,增量式迭代命令也不是没有缺点的: 举个例子, 使用 SMEMBERS 命令可以返回集合键当前包含的所有元素, 但是对于 SCAN 这类增量式迭代命令来说, 因为在对键进行增量式迭代的过程中, 键可能会被修改, 所以增量式迭代命令只能对被返回的元素提供有限的保证 。

大 Key 如何处理

什么是 Redis Big Key?

Big Key 并不是指 key 的值很大,而是 key 对应的 value 很大。

一般而言,下面这两种情况被称为Big Key:

  • String 类型的值大于 10 KB;
  • Hash、List、Set、ZSet 类型的元素的个数超过 5000 个,或总大小超过 10MB

Big Key 会造成什么问题?

Big Key 会带来以下四种影响:

  • 内存分布不均:集群模型在 slot 分片均匀情况下,会出现数据和查询倾斜情况,部分有Big Key 的 Redis 节点占用内存多,QPS 也会比较大。
  • 命令阻塞:Redis 单线程模型,操作大 Key 耗时,阻塞其他命令。
  • 网络传输压力:每次获取Big Key 产生的网络流量较大,如果一个 key 的大小是 1 MB,每秒访问量为 1000,那么每秒会产生 1000MB 的流量,这对于普通千兆网卡的服务器来说是灾难性的。
  • 客户端超时:由于 Redis 执行命令是单线程处理,然后在操作Big Key 时会比较耗时,那么就会阻塞 Redis,从客户端这一视角看,就是很久很久都没有响应。

如何找到Big Key ?

(1)使用 redis-cli --bigkeys

命令

1
redis-cli -h 127.0.0.1 -p 6379 -a "password" --bigkeys

注意事项

  • 推荐在从节点执行(主节点执行可能阻塞业务)
  • 低峰期执行-i 参数控制扫描间隔(如 -i 0.1 表示每 100ms 扫描一次)

缺点

  • 只能返回每种数据类型最大的 1 个 Key(无法获取 Top N)
  • 对集合类型只统计元素个数,而非实际内存占用

(2)使用 SCAN + 内存分析命令

遍历所有 Key(避免 KEYS * 阻塞 Redis):

1
redis-cli --scan --pattern "*" | while read key; do ...; done

分析 Key 大小

  • StringSTRLEN $key(字节数)
  • 集合类型(List/Hash/Set/ZSet):
    • 方法 1LLEN/HLEN/SCARD/ZCARD(元素个数 × 预估元素大小)
    • 方法 2(Redis 4.0+):MEMORY USAGE $key(精确内存占用)

优点

  • 可自定义筛选条件(如大小 Top 10)
  • 精确计算内存占用

(3)使用 RdbTools 分析 RDB 文件

命令

1
rdb dump.rdb -c memory --bytes 10240 -f redis.csv  # 导出 >10KB 的 Key 到 CSV

适用场景

  • 离线分析,不影响线上 Redis
  • 精准统计所有 Key 的内存分布

缺点:需要 Redis 生成 RDB 快照

如何删除Big Key?

删除操作的本质是要释放键值对占用的内存空间,不要小瞧内存的释放过程。

释放内存只是第一步,为了更加高效地管理内存空间,在应用程序释放内存时,操作系统需要把释放掉的内存块插入一个空闲内存块的链表,以便后续进行管理和再分配。这个过程本身需要一定时间,而且会阻塞当前释放内存的应用程序。

所以,如果一下子释放了大量内存,空闲内存块链表操作时间就会增加,相应地就会造成 Redis 主线程的阻塞,如果主线程发生了阻塞,其他所有请求可能都会超时,超时越来越多,会造成 Redis 连接耗尽,产生各种异常。

因此,删除Big Key 这一个动作,我们要小心。具体要怎么做呢?这里给出两种方法:

  • 分批次删除
  • 异步删除(Redis 4.0 版本以上)

1、分批次删除

对于删除大 Hash,使用 hscan 命令,每次获取 100 个字段,再用 hdel 命令,每次删除 1 个字段。

Python 代码:

1
2
3
4
5
6
7
8
9
10
def del_large_hash():
r = redis.StrictRedis(host='redis-host1', port=6379)
large_hash_key ="xxx" #要删除的大hash键名
cursor = '0'
while cursor != 0:
# 使用 hscan 命令,每次获取 100 个字段
cursor, data = r.hscan(large_hash_key, cursor=cursor, count=100)
for item in data.items():
# 再用 hdel 命令,每次删除1个字段
r.hdel(large_hash_key, item[0])

对于删除大 List,通过 ltrim 命令,每次删除少量元素。

Python 代码:

1
2
3
4
5
6
def del_large_list():
r = redis.StrictRedis(host='redis-host1', port=6379)
large_list_key = 'xxx' #要删除的大list的键名
while r.llen(large_list_key)>0:
#每次只删除最右100个元素
r.ltrim(large_list_key, 0, -101)

对于删除大 Set,使用 sscan 命令,每次扫描集合中 100 个元素,再用 srem 命令每次删除一个键。

Python 代码:

1
2
3
4
5
6
7
8
9
10
def del_large_set():
r = redis.StrictRedis(host='redis-host1', port=6379)
large_set_key = 'xxx' # 要删除的大set的键名
cursor = '0'
while cursor != 0:
# 使用 sscan 命令,每次扫描集合中 100 个元素
cursor, data = r.sscan(large_set_key, cursor=cursor, count=100)
for item in data:
# 再用 srem 命令每次删除一个键
r.srem(large_size_key, item)

对于删除大 ZSet,使用 zremrangebyrank 命令,每次删除 top 100 个元素。

Python 代码:

1
2
3
4
5
6
def del_large_sortedset():
r = redis.StrictRedis(host='large_sortedset_key', port=6379)
large_sortedset_key='xxx'
while r.zcard(large_sortedset_key)>0:
# 使用 zremrangebyrank 命令,每次删除 top 100个元素
r.zremrangebyrank(large_sortedset_key,0,99)

2、异步删除

从 Redis 4.0 版本开始,可以采用异步删除法,用 unlink 命令代替 del 来删除

这样 Redis 会将这个 key 放入到一个异步线程中进行删除,这样不会阻塞主线程。

除了主动调用 unlink 命令实现异步删除之外,我们还可以通过配置参数,达到某些条件的时候自动进行异步删除。

主要有 4 种场景,默认都是关闭的:

1
2
3
4
lazyfree-lazy-eviction no
lazyfree-lazy-expire no
lazyfree-lazy-server-del
noslave-lazy-flush no

它们代表的含义如下:

  • lazyfree-lazy-eviction:表示当 Redis 运行内存超过 maxmeory 时,是否开启 lazy free 机制删除;
  • lazyfree-lazy-expire:表示设置了过期时间的键值,当过期之后是否开启 lazy free 机制删除;
  • lazyfree-lazy-server-del:有些指令在处理已存在的键时,会带有一个隐式的 del 键的操作,比如 rename 命令,当目标键已存在,Redis 会先删除目标键,如果这些目标键是一个 big key,就会造成阻塞删除的问题,此配置表示在这种场景中是否开启 lazy free 机制删除;
  • slave-lazy-flush:针对 slave (从节点) 进行全量数据同步,slave 在加载 master 的 RDB 文件前,会运行 flushall 来清理自己的数据,它表示此时是否开启 lazy free 机制删除。

建议开启其中的 lazyfree-lazy-eviction、lazyfree-lazy-expire、lazyfree-lazy-server-del 等配置,这样就可以有效的提高主线程的执行效率。

最受欢迎文章

选出最受欢迎文章,需要支持对文章进行评分。

对文章进行投票

(1)使用 HASH 存储文章

使用 HASH 类型存储文章信息。其中:key 是文章 ID;field 是文章的属性 key;value 是属性对应值。

img

操作:

  • 存储文章信息 - 使用 HSETHMGET 命令
  • 查询文章信息 - 使用 HGETALL 命令
  • 添加投票 - 使用 HINCRBY 命令

(2)使用 ZSET 针对不同维度集合排序

使用 ZSET 类型分别存储按照时间排序和按照评分排序的文章 ID 集合。

img

操作:

  • 添加记录 - 使用 ZADD 命令
  • 添加分数 - 使用 ZINCRBY 命令
  • 取出多篇文章 - 使用 ZREVRANGE 命令

(3)为了防止重复投票,使用 SET 类型记录每篇文章 ID 对应的投票集合。

img

操作:

  • 添加投票者 - 使用 SADD 命令
  • 设置有效期 - 使用 EXPIRE 命令

(4)假设 user:115423 给 article:100408 投票,分别需要高更新评分排序集合以及投票集合。

img

当需要对一篇文章投票时,程序需要用 ZSCORE 命令检查记录文章发布时间的有序集合,判断文章的发布时间是否超过投票有效期(比如:一星期)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void articleVote(Jedis conn, String user, String article) {
// 计算文章的投票截止时间。
long cutoff = (System.currentTimeMillis() / 1000) - ONE_WEEK_IN_SECONDS;

// 检查是否还可以对文章进行投票
// (虽然使用散列也可以获取文章的发布时间,
// 但有序集合返回的文章发布时间为浮点数,
// 可以不进行转换直接使用)。
if (conn.zscore("time:", article) < cutoff) {
return;
}

// 从article:id标识符(identifier)里面取出文章的ID。
String articleId = article.substring(article.indexOf(':') + 1);

// 如果用户是第一次为这篇文章投票,那么增加这篇文章的投票数量和评分。
if (conn.sadd("voted:" + articleId, user) == 1) {
conn.zincrby("score:", VOTE_SCORE, article);
conn.hincrBy(article, "votes", 1);
}
}

发布并获取文章

发布文章:

  • 添加文章 - 使用 INCR 命令计算新的文章 ID,填充文章信息,然后用 HSET 命令或 HMSET 命令写入到 HASH 结构中。
  • 将文章作者 ID 添加到投票名单 - 使用 SADD 命令添加到代表投票名单的 SET 结构中。
  • 设置投票有效期 - 使用 EXPIRE 命令设置投票有效期。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public String postArticle(Jedis conn, String user, String title, String link) {
// 生成一个新的文章ID。
String articleId = String.valueOf(conn.incr("article:"));

String voted = "voted:" + articleId;
// 将发布文章的用户添加到文章的已投票用户名单里面,
conn.sadd(voted, user);
// 然后将这个名单的过期时间设置为一周(第3章将对过期时间作更详细的介绍)。
conn.expire(voted, ONE_WEEK_IN_SECONDS);

long now = System.currentTimeMillis() / 1000;
String article = "article:" + articleId;
// 将文章信息存储到一个散列里面。
HashMap<String, String> articleData = new HashMap<String, String>();
articleData.put("title", title);
articleData.put("link", link);
articleData.put("user", user);
articleData.put("now", String.valueOf(now));
articleData.put("votes", "1");
conn.hmset(article, articleData);

// 将文章添加到根据发布时间排序的有序集合和根据评分排序的有序集合里面。
conn.zadd("score:", now + VOTE_SCORE, article);
conn.zadd("time:", now, article);

return articleId;
}

分页查询最受欢迎文章:

使用 ZINTERSTORE 命令根据页码、每页记录数、排序号,根据评分值从大到小分页查出文章 ID 列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public List<Map<String, String>> getArticles(Jedis conn, int page, String order) {
// 设置获取文章的起始索引和结束索引。
int start = (page - 1) * ARTICLES_PER_PAGE;
int end = start + ARTICLES_PER_PAGE - 1;

// 获取多个文章ID。
Set<String> ids = conn.zrevrange(order, start, end);
List<Map<String, String>> articles = new ArrayList<>();
// 根据文章ID获取文章的详细信息。
for (String id : ids) {
Map<String, String> articleData = conn.hgetAll(id);
articleData.put("id", id);
articles.add(articleData);
}

return articles;
}

对文章进行分组

如果文章需要分组,功能需要分为两块:

  • 记录文章属于哪个群组
  • 负责取出群组里的文章

将文章添加、删除群组:

1
2
3
4
5
6
7
8
9
10
11
12
public void addRemoveGroups(Jedis conn, String articleId, String[] toAdd, String[] toRemove) {
// 构建存储文章信息的键名。
String article = "article:" + articleId;
// 将文章添加到它所属的群组里面。
for (String group : toAdd) {
conn.sadd("group:" + group, article);
}
// 从群组里面移除文章。
for (String group : toRemove) {
conn.srem("group:" + group, article);
}
}

取出群组里的文章:

img

  • 通过对存储群组文章的集合和存储文章评分的有序集合执行 ZINTERSTORE 命令,可以得到按照文章评分排序的群组文章。
  • 通过对存储群组文章的集合和存储文章发布时间的有序集合执行 ZINTERSTORE 命令,可以得到按照文章发布时间排序的群组文章。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public List<Map<String, String>> getGroupArticles(Jedis conn, String group, int page, String order) {
// 为每个群组的每种排列顺序都创建一个键。
String key = order + group;
// 检查是否有已缓存的排序结果,如果没有的话就现在进行排序。
if (!conn.exists(key)) {
// 根据评分或者发布时间,对群组文章进行排序。
ZParams params = new ZParams().aggregate(ZParams.Aggregate.MAX);
conn.zinterstore(key, params, "group:" + group, order);
// 让Redis在60秒钟之后自动删除这个有序集合。
conn.expire(key, 60);
}
// 调用之前定义的getArticles函数来进行分页并获取文章数据。
return getArticles(conn, page, key);
}

管理令牌

网站一般会以 Cookie、Session、令牌这类信息存储用户身份信息。

可以将 Cookie/Session/令牌 和用户的映射关系存储在 HASH 结构。

下面以令牌来举例。

查询令牌

1
2
3
4
public String checkToken(Jedis conn, String token) {
// 尝试获取并返回令牌对应的用户。
return conn.hget("login:", token);
}

更新令牌

  • 用户每次访问页面,可以记录下令牌和当前时间戳的映射关系,存入一个 ZSET 结构中,以便分析用户是否活跃,继而可以周期性清理最老的令牌,统计当前在线用户数等行为。
  • 用户如果正在浏览商品,可以记录到用户最近浏览过的商品有序集合中(集合可以限定数量,超过数量进行裁剪),存入到一个 ZSET 结构中,以便分析用户最近可能感兴趣的商品,以便推荐商品。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void updateToken(Jedis conn, String token, String user, String item) {
// 获取当前时间戳。
long timestamp = System.currentTimeMillis() / 1000;
// 维持令牌与已登录用户之间的映射。
conn.hset("login:", token, user);
// 记录令牌最后一次出现的时间。
conn.zadd("recent:", timestamp, token);
if (item != null) {
// 记录用户浏览过的商品。
conn.zadd("viewed:" + token, timestamp, item);
// 移除旧的记录,只保留用户最近浏览过的25个商品。
conn.zremrangeByRank("viewed:" + token, 0, -26);
conn.zincrby("viewed:", -1, item);
}
}

清理令牌

上一节提到,更新令牌时,将令牌和当前时间戳的映射关系,存入一个 ZSET 结构中。所以可以通过排序得知哪些令牌最老。如果没有清理操作,更新令牌占用的内存会不断膨胀,直到导致机器宕机。

比如:最多允许存储 1000 万条令牌信息,周期性检查,一旦发现记录数超出 1000 万条,将 ZSET 从新到老排序,将超出 1000 万条的记录清除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public static class CleanSessionsThread extends Thread {

private Jedis conn;

private int limit;

private volatile boolean quit;

public CleanSessionsThread(int limit) {
this.conn = new Jedis("localhost");
this.conn.select(15);
this.limit = limit;
}

public void quit() {
quit = true;
}

@Override
public void run() {
while (!quit) {
// 找出目前已有令牌的数量。
long size = conn.zcard("recent:");
// 令牌数量未超过限制,休眠并在之后重新检查。
if (size <= limit) {
try {
sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
continue;
}

// 获取需要移除的令牌ID。
long endIndex = Math.min(size - limit, 100);
Set<String> tokenSet = conn.zrange("recent:", 0, endIndex - 1);
String[] tokens = tokenSet.toArray(new String[tokenSet.size()]);

// 为那些将要被删除的令牌构建键名。
ArrayList<String> sessionKeys = new ArrayList<String>();
for (String token : tokens) {
sessionKeys.add("viewed:" + token);
}

// 移除最旧的那些令牌。
conn.del(sessionKeys.toArray(new String[sessionKeys.size()]));
conn.hdel("login:", tokens);
conn.zrem("recent:", tokens);
}
}

}

参考资料

Redis 持久化

Redis 是内存型数据库,为了保证数据在宕机后不会丢失,需要将内存中的数据持久化到硬盘上。

Redis 支持两种持久化方式:RDB 和 AOF。这两种持久化方式既可以同时使用,也可以单独使用。

关键词:RDBAOFSAVEBGSAVEappendfsync

RDB 快照

RDB 简介

RDB 即“快照”,它将某时刻的所有 Redis 数据库中的所有键值对数据保存到一个经过压缩的“二进制文件”(RDB 文件)中

RDB 持久化即可以“手动”执行,也可以定期“自动”执行

RDB 文件的“载入”工作是在服务器“启动”时“自动”执行的

对于不同类型的键值对, RDB 文件会使用不同的方式来保存它们。

创建 RDB 后,用户可以对 RDB 进行备份,可以将 RDB 复制到其他服务器从而创建具有相同数据的服务器副本,还可以在重启服务器时使用。一句话来说:RDB 适用于作为“冷备”

RDB 的优点和缺点

RDB 的优点

  • RDB 文件非常紧凑,适合作为“冷备”。比如你可以在每个小时报保存一下过去 24 小时内的数据,同时每天保存过去 30 天的数据,这样即使出了问题你也可以根据需求恢复到不同版本的数据集。
  • 快照在保存 RDB 文件时父进程唯一需要做的就是 fork 出一个子进程,接下来的工作全部由子进程来做,父进程不需要再做其他 IO 操作,所以快照持久化方式可以最大化 Redis 的性能。
  • 恢复大数据集时,RDB 比 AOF 更快

RDB 的缺点

  • 如果系统发生故障,将会丢失最后一次创建快照之后的数据。如果你希望在 Redis 意外停止工作(例如电源中断)的情况下丢失的数据最少的话,那么 快照不适合你。虽然你可以配置不同的 save 时间点(例如每隔 5 分钟并且对数据集有 100 个写的操作),是 Redis 要完整的保存整个数据集是一个比较繁重的工作,你通常会每隔 5 分钟或者更久做一次完整的保存,万一在 Redis 意外宕机,你可能会丢失几分钟的数据。
  • 如果数据量很大,保存快照的时间会很长。快照需要经常 fork 子进程来保存数据集到硬盘上。当数据集比较大的时候,fork 的过程是非常耗时的,可能会导致 Redis 在一些毫秒级内不能响应客户端的请求。如果数据集巨大并且 CPU 性能不是很好的情况下,这种情况会持续 1 秒。AOF 也需要 fork,但是你可以调节重写日志文件的频率来提高数据集的耐久度。

RDB 的创建

有两个 Redis 命令可以用于生成 RDB 文件:SAVEBGSAVE

SAVE 命令由服务器进程直接执行保存操作,直到 RDB 创建完成为止。所以该命令“会阻塞”服务器,在阻塞期间,服务器不能响应任何命令请求。

1
2
>SAVE
"OK"

BGSAVE 命令会“派生”(fork)一个子进程,由子进程负责创建 RDB 文件,服务器进程继续处理命令请求,所以该命令“不会阻塞”服务器

1
2
>BGSAVE
"Background saving started"

🔔 【注意】

BGSAVE 命令的实现采用的是写时复制技术(Copy-On-Write,缩写为 CoW)。

BGSAVE 命令执行期间,SAVEBGSAVEBGREWRITEAOF 三个命令会被拒绝,以免与当前的 BGSAVE 操作产生竞态条件,降低性能。

创建 RDB 的工作由 rdb.c/rdbSave 函数完成。

RDB 的载入

RDB 文件的“载入”工作是在服务器“启动”时“自动”执行的。Redis 并没有专门用于载入 RDB 文件的命令。

服务器载入 RDB 文件期间,会一直处于阻塞状态,直到载入完成为止。

载入 RDB 的工作由 rdb.c/rdbLoad 函数完成。

🔔 【注意】

因为 AOF 的更新频率通常比 RDB 的更新频率高,所以:

  • 如果服务器开了 AOF,则服务器会优先使用 AOF 来还原数据。
  • 只有在 AOF 处于关闭时,服务器才会使用 RDB 来还原数据。

自动间隔保存

Redis 支持通过在 redis.conf 文件中配置 save 选项,让服务器每隔一段时间自动执行一次 BGSAVE 命令。save 选项可以设置多个保存条件,只要其中任意一个条件被满足,服务器就会执行 BGSAVE 命令。

【示例】redis.conf 中自动保存配置

1
2
3
4
5
6
# 900 秒内,至少对数据库进行了 1 次修改
save 900 1
# 300 秒内,至少对数据库进行了 10 次修改
save 300 10
# 60 秒内,至少对数据库进行了 10000 次修改
save 60 10000

只要满足以上任意条件,Redis 服务就会执行 BGSAVE 命令。

自动间隔的保存条件定义在 redis.h/redisServer 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct redisServer {
// 记录了保存条件的数组
struct saveparam *saveparams;

// 自从上次 SAVE 执行以来,数据库被修改的次数
long long dirty;

// 上一次完成 SAVE 的时间
time_t lastsave;
}

// 服务器的保存条件(BGSAVE 自动执行的条件)
struct saveparam {

// 多少秒之内
time_t seconds;

// 发生多少次修改
int changes;

};

redisServer 中的 saveparams 数组维护了多个自动间隔保存条件。

服务每次成功执行一个修改命令后,dirty 计数器就会加 1;而 lastsave 则记录了上一次完成 SAVE 的时间。Redis 会通过一个 serverCron 函数周期性检查 save 选项所设条件是否满足,如果满足,则执行 BGSVAE 命令。

RDB 的文件结构

RDB 文件是一个经过压缩的“二进制文件”,由多个部分组成。

对于不同类型(STRING、HASH、LIST、SET、SORTED SET)的键值对,RDB 文件会使用不同的方式来保存它们。

Redis 本身提供了一个 RDB 文件检查工具 redis-check-dump

RDB 的配置

Redis RDB 默认配置如下:

1
2
3
4
5
6
7
8
save 900 1
save 300 10
save 60 10000
stop-writes-on-bgsave-error yes
rdbcompression yes
rdbchecksum yes
dbfilename dump.rdb
dir ./

Redis 的配置文件 redis.conf 中与 RDB 有关的选项:

  • save - Redis 会根据 save 选项,让服务器每隔一段时间自动执行一次 BGSAVE 命令

  • stop-writes-on-bgsave-error - 当 BGSAVE 命令出现错误时停止写 RDB 文件

  • rdbcompression - RDB 文件开启压缩功能

  • rdbchecksum - 对 RDB 文件进行校验

  • dbfilename - RDB 文件名

  • dir - RDB 文件和 AOF 文件的存储路径

AOF 日志

AOF 简介

AOF(Append Only File) 是将所有写命令追加写入“日志文件”,以此来记录数据的变化。当服务器重启时,会重新载入和执行 AOF 文件中的命令,就可以恢复原始的数据。AOF 适合作为“热备”

AOF 可以通过 appendonly yes 配置选项来开启。

AOF 的优点和缺点

AOF 的优点

  • 如果系统发生故障,AOF 丢失数据比 RDB 少。你可以使用不同的 fsync 策略:无 fsync;每秒 fsync;每次写的时候 fsync。使用默认的每秒 fsync 策略,Redis 的性能依然很好(fsync 是由后台线程进行处理的,主线程会尽力处理客户端请求),一旦出现故障,你最多丢失 1 秒的数据。
  • AOF 文件可修复 - AOF 文件是一个只进行追加的日志文件,所以不需要写入 seek,即使由于某些原因(磁盘空间已满,写的过程中宕机等等)未执行完整的写入命令,你也也可使用 redis-check-aof 工具修复这些问题。
  • AOF 文件可压缩。Redis 可以在 AOF 文件体积变得过大时,自动地在后台对 AOF 进行重写:重写后的新 AOF 文件包含了恢复当前数据集所需的最小命令集合。整个重写操作是绝对安全的,因为 Redis 在创建新 AOF 文件的过程中,会继续将命令追加到现有的 AOF 文件里面,即使重写过程中发生停机,现有的 AOF 文件也不会丢失。而一旦新 AOF 文件创建完毕,Redis 就会从旧 AOF 文件切换到新 AOF 文件,并开始对新 AOF 文件进行追加操作。
  • AOF 文件可读 - AOF 文件有序地保存了对数据库执行的所有写入操作,这些写入操作以 Redis 命令的格式保存。因此 AOF 文件的内容非常容易被人读懂,对文件进行分析(parse)也很轻松。 导出(export) AOF 文件也非常简单。举个例子,如果你不小心执行了 FLUSHALL 命令,但只要 AOF 文件未被重写,那么只要停止服务器,移除 AOF 文件末尾的 FLUSHALL 命令,并重启 Redis ,就可以将数据集恢复到 FLUSHALL 执行之前的状态。

AOF 的缺点

  • AOF 文件体积一般比 RDB 大 - 对于相同的数据集来说,AOF 文件的体积通常要大于 RDB 文件的体积。
  • 恢复大数据集时,AOF 比 RDB 慢。 - 根据所使用的 fsync 策略,AOF 的速度可能会慢于快照。在一般情况下,每秒 fsync 的性能依然非常高,而关闭 fsync 可以让 AOF 的速度和快照一样快,即使在高负荷之下也是如此。不过在处理巨大的写入载入时,快照可以提供更有保证的最大延迟时间(latency)。

AOF 的创建

Redis 命令请求会先保存到 AOF 缓冲区,再定期写入并同步到 AOF 文件

AOF 的实现可以分为命令追加(append)、文件写入、文件同步(sync)三个步骤。

  • 命令追加 - 当 Redis 服务器开启 AOF 功能时,服务器在执行完一个写命令后,会以 Redis 命令协议格式将被执行的写命令追加到 AOF 缓冲区的末尾。
  • 文件写入文件同步
    • Redis 的服务器进程就是一个事件循环,这个循环中的文件事件负责接收客户端的命令请求,以及向客户端发送命令回复。而时间事件则负责执行想 serverCron 这样的定时运行的函数。
    • 因为服务器在处理文件事件时可能会执行写命令,这些写命令会被追加到 AOF 缓冲区,服务器每次结束事件循环前,都会根据 appendfsync 选项来判断 AOF 缓冲区内容是否需要写入和同步到 AOF 文件中。

appendfsync 不同选项决定了不同的持久化行为:

  • always - 将 AOF 缓冲区中所有内容写入并同步到 AOF 文件。这种方式是最数据最安全的,但也是性能最差的。
  • no - 将 AOF 缓冲区所有内容写入到 AOF 文件,但并不对 AOF 文件进行同步,何时同步由操作系统决定。这种方式是数据最不安全的,一旦出现故障,未来得及同步的所有数据都会丢失。
  • everysec - appendfsync 默认选项。将 AOF 缓冲区所有内容写入到 AOF 文件,如果上次同步 AOF 文件的时间距离现在超过一秒钟,那么再次对 AOF 文件进行同步,这个同步操作是有一个线程专门负责执行的。这张方式是前面两种的这种方案——性能足够好,且即使出现故障,仅丢失一秒钟内的数据。

appendfsync 选项的不同值对 AOF 持久化功能的安全性、以及 Redis 服务器的性能有很大的影响。

AOF 的载入

因为 AOF 文件中包含了重建数据库所需的所有写命令,所以服务器只要载入并执行一遍 AOF 文件中保存的写命令,就可以还原服务器关闭前的数据库状态。

AOF 载入过程如下:

  1. 服务器启动载入程序。
  2. 创建一个伪客户端。因为 Redis 命令只能在客户端上下文中执行,所以需要创建一个伪客户端来载入、执行 AOF 文件中记录的命令。
  3. 从 AOF 文件中分析并读取一条写命令。
  4. 使用伪客户端执行写命令。
  5. 循环执行步骤 3、4,直到所有写命令都被处理完毕为止。
  6. 载入完毕。

AOF 的重写

随着 Redis 不断运行,AOF 的体积也会不断增长,这将导致两个问题:

  • AOF 耗尽磁盘可用空间。
  • Redis 重启后需要执行 AOF 文件记录的所有写命令来还原数据集,如果 AOF 过大,则还原操作执行的时间就会非常长。

为了解决 AOF 体积膨胀问题,Redis 提供了 AOF 重写功能,来对 AOF 文件进行压缩。AOF 重写可以产生一个新的 AOF 文件,这个新的 AOF 文件和原来的 AOF 文件所保存的数据库状态一致,但体积更小

AOF 重写并非读取和分析现有 AOF 文件的内容,而是直接从数据库中读取当前的数据库状态。即从数据库中读取键的当前值,然后用一条命令去记录该键值对,以此代替之前可能存在冗余的命令。

AOF 后台重写

作为一种辅助性功能,显然 Redis 并不想在 AOF 重写时阻塞 Redis 服务接收其他命令。因此,Redis 决定通过 BGREWRITEAOF 命令创建一个子进程,然后由子进程负责对 AOF 文件进行重写,这与 BGSAVE 原理类似。

  • 在执行 BGREWRITEAOF 命令时,Redis 服务器会维护一个 AOF 重写缓冲区。当 AOF 重写子进程开始工作后,Redis 每执行完一个写命令,会同时将这个命令发送给 AOF 缓冲区和 AOF 重写缓冲区。
  • 由于彼此不是在同一个进程中工作,AOF 重写不影响 AOF 写入和同步。当子进程完成创建新 AOF 文件的工作之后,服务器会将重写缓冲区中的所有内容追加到新 AOF 文件的末尾,使得新旧两个 AOF 文件所保存的数据库状态一致。
  • 最后,服务器用新的 AOF 文件替换就的 AOF 文件,以此来完成 AOF 重写操作。

BGREWRITEAOF 命令的实现采用的是写时复制技术(Copy-On-Write,缩写为 CoW)。

可以通过设置 auto-aof-rewrite-percentageauto-aof-rewrite-min-size,使得 Redis 在满足条件时,自动执行 BGREWRITEAOF

假设配置如下:

1
2
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

表明,当 AOF 大于 64MB,且 AOF 体积比上一次重写后的体积大了至少 100% 时,执行 BGREWRITEAOF

AOF 的配置

AOF 的默认配置:

1
2
3
4
5
appendonly no
appendfsync everysec
no-appendfsync-on-rewrite no
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

AOF 持久化通过在 redis.conf 中的 appendonly yes 配置选项来开启。

  • appendonly - 开启 AOF 功能。
  • appendfilename - AOF 文件名。
  • appendfsync - 用于设置同步频率,它有以下可选项:
    • always - 每个 Redis 写命令都要同步写入硬盘。这样做会严重降低 Redis 的速度。
    • everysec - 每秒执行一次同步,显示地将多个写命令同步到硬盘。为了兼顾数据安全和写入性能,推荐使用 appendfsync everysec 选项。Redis 每秒同步一次 AOF 文件时的性能和不使用任何持久化特性时的性能相差无几。
    • no - 让操作系统来决定应该何时进行同步。
  • no-appendfsync-on-rewrite - AOF 重写时不支持追加命令。
  • auto-aof-rewrite-percentage - AOF 重写百分比。
  • auto-aof-rewrite-min-size - AOF 重写文件的最小大小。
  • dir - RDB 文件和 AOF 文件的存储路径。

RDB 和 AOF

当 Redis 启动时, 如果 RDB 和 AOF 功能都开启了,那么程序会优先使用 AOF 文件来恢复数据集,因为 AOF 文件所保存的数据通常是最完整的。

如何选择持久化

  • 如果不关心数据丢失,可以不持久化。
  • 如果可以承受数分钟以内的数据丢失,可以只使用 RDB。
  • 如果不能承受数分钟以内的数据丢失,可以同时使用 RDB 和 AOF。

有很多用户都只使用 AOF 持久化, 但并不推荐这种方式: 因为定时生成 RDB 快照(snapshot)非常便于进行数据库备份,并且快照恢复数据集的速度也要比 AOF 恢复的速度要快,除此之外,使用快照还可以避免之前提到的 AOF 程序的 bug 。

RDB 切换为 AOF

在 Redis 2.2 或以上版本,可以在不重启的情况下,从 RDB 切换为 AOF :

  • 为最新的 dump.rdb 文件创建一个备份。
  • 将备份放到一个安全的地方。
  • 执行以下两条命令:
  • redis-cli config set appendonly yes
  • redis-cli config set save
  • 确保写命令会被正确地追加到 AOF 文件的末尾。
  • 执行的第一条命令开启了 AOF 功能: Redis 会阻塞直到初始 AOF 文件创建完成为止, 之后 Redis 会继续处理命令请求, 并开始将写入命令追加到 AOF 文件末尾。

执行的第二条命令用于关闭快照功能。 这一步是可选的, 如果你愿意的话, 也可以同时使用快照和 AOF 这两种持久化功能。

🔔 重要:别忘了在 redis.conf 中打开 AOF 功能!否则的话,服务器重启之后,之前通过 CONFIG SET 设置的配置就会被遗忘,程序会按原来的配置来启动服务器。

AOF 和 RDB 的相互作用

BGSAVEBGREWRITEAOF 命令不可以同时执行。这是为了避免两个 Redis 后台进程同时对磁盘进行大量的 I/O 操作。

如果 BGSAVE 正在执行,并且用户显示地调用 BGREWRITEAOF 命令,那么服务器将向用户回复一个 OK 状态,并告知用户,BGREWRITEAOF 已经被预定执行。一旦 BGSAVE 执行完毕, BGREWRITEAOF 就会正式开始。

混合持久化

RDB 优点是数据恢复速度快,但是快照的频率不好把握。频率太低,丢失的数据就会比较多,频率太高,就会影响性能。AOF 优点是丢失数据少,但是数据恢复不快。

为了集成了两者的优点,Redis 4.0 提出了混合使用 AOF 日志和内存快照,也叫混合持久化,既保证了 Redis 重启速度,又降低数据丢失风险。

混合持久化工作在 AOF 日志重写过程,当开启了混合持久化时,在 AOF 重写日志时,fork 出来的重写子进程会先将与主线程共享的内存数据以 RDB 方式写入到 AOF 文件,然后主线程处理的操作命令会被记录在重写缓冲区里,重写缓冲区里的增量命令会以 AOF 方式写入到 AOF 文件,写入完成后通知主进程将新的含有 RDB 格式和 AOF 格式的 AOF 文件替换旧的的 AOF 文件。

也就是说,使用了混合持久化,AOF 文件的前半部分是 RDB 格式的全量数据,后半部分是 AOF 格式的增量数据

这样的好处在于,重启 Redis 加载数据的时候,由于前半部分是 RDB 内容,这样加载的时候速度会很快

加载完 RDB 的内容后,才会加载后半部分的 AOF 内容,这里的内容是 Redis 后台子进程重写 AOF 期间,主线程处理的操作命令,可以使得数据更少的丢失

混合持久化优点

  • 混合持久化结合了 RDB 和 AOF 持久化的优点,开头为 RDB 的格式,使得 Redis 可以更快的启动,同时结合 AOF 的优点,有减低了大量数据丢失的风险。

混合持久化缺点

  • AOF 文件中添加了 RDB 格式的内容,使得 AOF 文件的可读性变得很差;
  • 兼容性差,如果开启混合持久化,那么此混合持久化 AOF 文件,就不能用在 Redis 4.0 之前版本了。

Redis 备份

应该确保 Redis 数据有完整的备份。

备份 Redis 数据建议采用 RDB。

备份过程

  1. 创建一个定期任务(cron job),每小时将一个 RDB 文件备份到一个文件夹,并且每天将一个 RDB 文件备份到另一个文件夹。
  2. 确保快照的备份都带有相应的日期和时间信息,每次执行定期任务脚本时,使用 find 命令来删除过期的快照:比如说,你可以保留最近 48 小时内的每小时快照,还可以保留最近一两个月的每日快照。
  3. 至少每天一次,将 RDB 备份到你的数据中心之外,或者至少是备份到你运行 Redis 服务器的物理机器之外。

容灾备份

Redis 的容灾备份基本上就是对数据进行备份,并将这些备份传送到多个不同的外部数据中心。

容灾备份可以在 Redis 运行并产生快照的主数据中心发生严重的问题时,仍然让数据处于安全状态。

参考资料

Redis 基本数据类型

关键词:StringHashListSetZset

Redis 提供了多种数据类型,每种数据类型有丰富的命令支持。

Redis 支持的基本数据类型:STRING、HASH、LIST、SET、ZSET

Redis 支持的高级数据类型:BitMap、HyperLogLog、GEO、Stream

使用 Redis ,不仅要了解其数据类型的特性,还需要根据业务场景,灵活的、高效的使用其数据类型来建模。

String

String 简介

String 类型是键值对结构。

String 类型是二进制安全的。二进制安全是指,String 类型不仅可以保存文本数据,还可以保存任意格式的二进制数据,如:图片、音频、视频、压缩文件等。

默认情况下,String 类型的值最大可为 512 MB

String 实现

String 类型的底层的数据结构实现主要是 int 和 SDS(简单动态字符串)。

SDS 和我们认识的 C 字符串不太一样,之所以没有使用 C 语言的字符串表示,因为 SDS 相比于 C 的原生字符串:

  • SDS 不仅可以保存文本数据,还可以保存二进制数据。因为 SDS 使用 len 属性的值而不是空字符来判断字符串是否结束,并且 SDS 的所有 API 都会以处理二进制的方式来处理 SDS 存放在 buf[] 数组里的数据。所以 SDS 不光能存放文本数据,而且能保存图片、音频、视频、压缩文件这样的二进制数据。
  • **SDS 获取字符串长度的时间复杂度是 O(1)**。因为 C 语言的字符串并不记录自身长度,所以获取长度的复杂度为 O(n);而 SDS 结构里用 len 属性记录了字符串长度,所以复杂度为 O(1)
  • Redis 的 SDS API 是安全的,拼接字符串不会造成缓冲区溢出。因为 SDS 在拼接字符串之前会检查 SDS 空间是否满足要求,如果空间不够会自动扩容,所以不会导致缓冲区溢出的问题。

字符串对象的编码可以是 intraw 或者 embstr

字符串对象保存各类型值的编码方式:

编码
可以用 long 类型保存的整数。 int
可以用 long double 类型保存的浮点数。 embstr 或者 raw
字符串值, 或者因为长度太大而没办法用 long 类型表示的整数, 又或者因为长度太大而没办法用 long double 类型表示的浮点数。 embstr 或者 raw

如果一个字符串对象保存的是整数值, 并且这个整数值可以用 long 类型来表示, 那么字符串对象会将整数值保存在字符串对象结构的 ptr 属性里面(将 void* 转换成 long ), 并将字符串对象的编码设置为 int

【示例】set 整数值

1
2
3
4
5
> SET number 10086
OK

> OBJECT ENCODING number
"int"

如果字符串对象保存的是一个字符串值, 并且这个字符串值的长度大于 39 字节, 那么字符串对象将使用一个简单动态字符串(SDS)来保存这个字符串值, 并将对象的编码设置为 raw

1
2
3
4
5
6
7
8
> SET story "Long, long, long ago there lived a king ..."
OK

> STRLEN story
(integer) 43

> OBJECT ENCODING story
"raw"

如果字符串对象保存的是一个字符串值, 并且这个字符串值的长度小于等于 39 字节, 那么字符串对象将使用 embstr 编码的方式来保存这个字符串值。embstr 编码是专门用于保存短字符串的一种优化编码方式。

【示例】set 字符串值

1
2
3
4
5
> SET msg "hello"
OK

> OBJECT ENCODING msg
"embstr"

String 命令

命令 说明
SET 存储一个字符串值
SETNX 仅当键不存在时,才存储字符串值
GET 获取指定 key 的值
MGET 获取一个或多个指定 key 的值
INCRBY 将 key 中储存的数字加上指定的增量值
DECRBY 将 key 中储存的数字减去指定的减量值

更多命令请参考:Redis String 类型官方命令文档

【示例】SET、GET、DEL 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 将 key(name) 的 value 保存为 dunwu
> set name dunwu
OK
# 获取 key(name) 的 value
> get name
"dunwu"
# 将 key(name) 的 value 保存为 unknown(覆盖原 value)
> set name unknown
OK
> get name
"unknown"
# 检查 key(name) 是否存在
> exists name
(integer) 1
# 删除 key(name)
> del name
(integer) 1
> exists name
(integer) 0
> get name
(nil)

【示例】SETNX 操作

1
2
3
4
5
6
7
8
9
10
11
12
# 检查 key(lock) 是否存在
> exists lock
(integer) 0
# 将 key(lock) 的 value 保存为 1,保存成功
> setnx lock 1
(integer) 1
# 将 key(lock) 的 value 保存为 2,由于 key 已存在,保存失败
> setnx lock 2
(integer) 0
# 获取 key(lock) 的 value
> get lock
"1"

【示例】MSET、MGET 操作

1
2
3
4
5
6
7
8
# 批量设置 one、two、three 这 3 个 key
> mset one 1 tow 2 three 3
OK
# 批量获取 one、two、three 3 个 key 的 value
> mget one tow three
1) "1"
2) "2"
3) "3"

【示例】INCR、DECR、INCRBY、DECRBY 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 将 key(counter) 的 value 保存为 0
> set counter 0
OK
# 将 key(counter) 的 value 加 1
> incr counter
(integer) 1
# 将 key(counter) 的 value 加 9
> incrby counter 9
(integer) 10
# 将 key(counter) 的 value 减 1
> decr counter
(integer) 9
# 将 key(counter) 的 value 减 9
> decrby counter 9
(integer) 0

String 应用

适用场景:缓存、计数器、共享 Session

缓存对象

使用 String 来缓存对象有两种方式:

(1)缓存对象的 JSON 值

1
> set user:1 {"name":"dunwu","sex":"man"}

(2)将 key 分离为 user:ID: 属性的形式,采用 MSET 存储,用 MGET 获取各属性值

1
2
3
4
5
> mset user:1:name dunwu user:1:sex man
OK
> mget user:1:name user:1:sex
1) "dunwu"
2) "man"

计数器

【需求场景】

统计网站某内容的点击量、收藏量、点赞数等等。

【解决方案】

使用 Redis 的 String 类型存储一个计数器。

维护计数器的常见操作如下:

  • 增加统计值 - 使用 INCRDECR 命令
  • 减少统计值 - 使用 INCRBYDECRBY 操作

【示例代码】

1
2
3
4
5
6
7
8
9
10
11
12
# 初始化 ID 为 1024 的博文访问量为 0
> set blog:view:1024 0
OK
# ID 为 1024 的博文访问量加 1
> incr blog:view:1024
(integer) 1
# ID 为 1024 的博文访问量加 1
> incr blog:view:1024
(integer) 2
# 查看 ID 为 1024 的博文访问量
> get blog:view:1024
"2"

分布式锁

(1)申请锁

SET 命令有个 NX 参数可以实现“key 不存在才插入”,可以用它来实现分布式锁:

  • 如果 key 不存在,则显示插入成功,可以用来表示加锁成功;
  • 如果 key 存在,则会显示插入失败,可以用来表示加锁失败。

一般而言,还会对分布式锁加上过期时间,分布式锁的命令如下:

1
SET key value NX PX 30000
  • key - 就是分布式锁的关键字;
  • value - 是客户端生成的唯一的标识;
  • NX - 表示只有 key 不存在的时候才会设置成功。(如果此时 redis 中存在这个 key,那么设置失败,返回 nil
  • PX 30000 - 表示:30s 后,key 会被删除(这意味着锁被释放了)。设置过期时间,是为了防止出现各种意外,导致锁始终无法释放的情况。

(2)释放锁

释放锁就是删除 key ,但是一般可以用 lua 脚本删除,判断 value 一样才删除,这是为了保证释放锁操作和申请所操作是同一个客户端。由于涉及两个操作,为了保证原子性,可以使用 lua 脚本来实现,因为 Redis 执行 Lua 脚本时,是以原子性方式执行的。

1
2
3
4
5
6
-- 删除锁的时候,找到 key 对应的 value,跟自己传过去的 value 做比较,如果是一样的才删除。
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

共享 Session 信息

在分布式场景下,一个用户的 Session 如果只存储在一个服务器上,那么当负载均衡器把用户的下一个请求转发到另一个服务器上,该服务器没有用户的 Session,就可能导致用户需要重新进行登录等操作。

分布式 Session 的几种实现策略:

  1. 粘性 session
  2. 应用服务器间的 session 复制共享
  3. 基于缓存的 session 共享 ✅

基于缓存的 session 共享实现

使用一个单独的存储服务器存储 Session 数据,可以存在 MySQL 数据库上,也可以存在 Redis 或者 Memcached 这种内存型数据库。

缺点:需要去实现存取 Session 的代码。

Hash

Hash 简介

Hash 是一个键值对(key - value)集合,其中 value 的形式如: value=[{field1,value1},...{fieldN,valueN}]。Hash 特别适合用于存储对象。

Hash 实现

哈希对象的编码可以是 ziplist 或者 hashtable

ziplist 编码的哈希对象使用压缩列表作为底层实现,每当有新的键值对要加入到哈希对象时, 程序会先将保存了键的压缩列表节点推入到压缩列表表尾, 然后再将保存了值的压缩列表节点推入到压缩列表表尾。

hashtable 编码的哈希对象使用字典作为底层实现, 哈希对象中的每个键值对都使用一个字典键值对来保存。

当哈希对象同时满足以下两个条件时, 使用 ziplist 编码;否则,使用 hashtable 编码。

  1. 哈希对象保存的所有键值对的键和值的字符串长度都小于 64 字节(可由 hash-max-ziplist-value 配置);
  2. 哈希对象保存的键值对数量小于 512 个(可由 hash-max-ziplist-entries 配置);

注意:这两个条件的上限值是可以修改的, 具体请看配置文件中关于 hash-max-ziplist-value 选项和 hash-max-ziplist-entries 选项的说明。

Hash 命令

命令 行为
HSET 将指定字段的值设为 value
HGET 获取指定字段的值
HGETALL 获取所有键值对
HMSET 设置多个键值对
HMGET 获取所有指定字段的值
HDEL 删除指定字段
HINCRBY 为指定字段的整数值加上增量
HKEYS 获取所有字段

更多命令请参考:Redis Hash 类型官方命令文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 存储一个哈希表 key 的键值
HSET key field value
# 获取哈希表 key 对应的 field 键值
HGET key field

# 在一个哈希表 key 中存储多个键值对
HMSET key field value [field value...]
# 批量获取哈希表 key 中多个 field 键值
HMGET key field [field ...]
# 删除哈希表 key 中的 field 键值
HDEL key field [field ...]

# 返回哈希表 key 中 field 的数量
HLEN key
# 返回哈希表 key 中所有的键值
HGETALL key

# 为哈希表 key 中 field 键的值加上增量 n
HINCRBY key field n

Hash 应用

Hash 类型适用于存储结构化数据

缓存对象

Hash 类型的(key,field,value)的结构与对象的(对象 id,属性,值)的结构相似,也可以用来存储对象。

我们以用户信息为例,它在关系型数据库中的结构是这样的:

我们可以使用如下命令,将用户对象的信息存储到 Hash 类型:

1
2
3
4
5
6
7
8
9
10
11
12
# 存储一个哈希表 uid:1 的键值
> HMSET uid:1 name Tom age 15
2
# 存储一个哈希表 uid:2 的键值
> HMSET uid:2 name Jerry age 13
2
# 获取哈希表用户 id 为 1 中所有的键值
> HGETALL uid:1
1) "name"
2) "Tom"
3) "age"
4) "15"

Redis Hash 存储其结构如下图:

在介绍 String 类型的应用场景时有所介绍,String + Json 也是存储对象的一种方式,那么存储对象时,到底用 String + json 还是用 Hash 呢?

一般对象用 String + Json 存储,对象中某些频繁变化的属性可以考虑抽出来用 Hash 类型存储。

购物车

【需求场景】

用户浏览电商平台,添加商品到购物车,并支持查看购物车。需要考虑未登录的情况。

【解决方案】

可以使用 HASH 类型来实现购物车功能。

以用户 session 为 key,存储了商品 ID 和商品数量的映射。其中,商品 id 为 field,商品数量为 value。

为什么不使用用户 ID?

因为很多场景下需要支持用户在免登陆的情况下使用购物车的,因为未登录,所以无法知道用户的用户 ID,这种情况下使用用户 session 更合适。并且由于绑定的是 session,可以在清空 session 时,顺便清空购物车缓存,更加方便。

维护购物车的常见操作如下:

  • 添加商品 - HSET cart:{session} {商品 id} 1
  • 添加数量 - HINCRBY cart:{session} {商品 id} 1
  • 商品总数 - HLEN cart:{session}
  • 删除商品 - HDEL cart:{session} {商品 id}
  • 获取购物车所有商品 - HGETALL cart:{session}

当前仅仅是将商品 ID 存储到了 Redis 中,在回显商品具体信息的时候,还需要拿着商品 id 查询一次数据库,获取完整的商品的信息。

List

Redis 中的 List 类型就是有序列表。

List 简介

List 列表是简单的字符串列表,按照插入顺序排序,可以从头部或尾部向 List 列表添加元素。

列表的最大长度为 2^32 - 1,也即每个列表支持超过 40 亿个元素。

List 实现

列表对象的编码可以是 ziplist 或者 linkedlist

ziplist 编码的列表对象使用压缩列表作为底层实现, 每个压缩列表节点(entry)保存了一个列表元素。

inkedlist 编码的列表对象使用双链表作为底层实现。

当列表对象可以同时满足以下两个条件时, 列表对象使用 ziplist 编码;否则,使用 linkedlist 编码

  1. 列表对象保存的所有字符串元素的长度都小于 64 字节;
  2. 列表对象保存的元素数量小于 512 个;

注意

以上两个条件的上限值是可以修改的, 具体请看配置文件中关于 list-max-ziplist-value 选项和 list-max-ziplist-entries 选项的说明。

List 命令

命令 行为
LPUSH 将给定值推入列表的右端。
RPUSH 将给定值推入列表的右端。
LPOP 从列表的左端弹出一个值,并返回被弹出的值。
RPOP 从列表的右端弹出一个值,并返回被弹出的值。
LRANGE 获取列表在给定范围上的所有值。
LINDEX 获取列表在给定位置上的单个元素。
LREM 从列表的左端弹出一个值,并返回被弹出的值。
LTRIM 只保留指定区间内的元素,删除其他元素。

更多命令请参考:Redis List 类型官方命令文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 将一个或多个值 value 插入到 key 列表的表头(最左边),最后的值在最前面
LPUSH key value [value ...]
# 将一个或多个值 value 插入到 key 列表的表尾(最右边)
RPUSH key value [value ...]
# 移除并返回 key 列表的头元素
LPOP key
# 移除并返回 key 列表的尾元素
RPOP key

# 返回列表 key 中指定区间内的元素,区间以偏移量 start 和 stop 指定,从 0 开始
LRANGE key start stop

# 从 key 列表表头弹出一个元素,没有就阻塞 timeout 秒,如果 timeout=0 则一直阻塞
BLPOP key [key ...] timeout
# 从 key 列表表尾弹出一个元素,没有就阻塞 timeout 秒,如果 timeout=0 则一直阻塞
BRPOP key [key ...] timeout

List 应用

消息队列

消息队列在存取消息时,必须要满足三个需求,分别是消息保序、处理重复的消息和保证消息可靠性

Redis 的 List 和 Stream 两种数据类型,就可以满足消息队列的这三个需求。我们先来了解下基于 List 的消息队列实现方法,后面在介绍 Stream 数据类型时候,在详细说说 Stream。

1、如何满足消息保序需求?

List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,就已经能满足消息保序的需求了。

List 可以使用 LPUSH + RPOP(或者反过来,RPUSH+LPOP)命令实现消息队列。

  • 生产者使用 LPUSH key value[value...] 将消息插入到队列的头部,如果 key 不存在则会创建一个空的队列再插入消息。

  • 消费者使用 RPOP key 依次读取队列的消息,先进先出。

不过,在消费者读取数据时,有一个潜在的性能风险点。

在生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,如果消费者想要及时处理消息,就需要在程序中不停地调用 RPOP 命令(比如使用一个 while(1) 循环)。如果有新消息写入,RPOP 命令就会返回结果,否则,RPOP 命令返回空值,再继续循环。

所以,即使没有新消息写入 List,消费者也要不停地调用 RPOP 命令,这就会导致消费者程序的 CPU 一直消耗在执行 RPOP 命令上,带来不必要的性能损失。

为了解决这个问题,Redis 提供了 BRPOP 命令。BRPOP 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用 RPOP 命令相比,这种方式能节省 CPU 开销。

2、如何处理重复的消息?

消费者要实现重复消息的判断,需要 2 个方面的要求:

  • 每个消息都有一个全局的 ID。
  • 消费者要记录已经处理过的消息的 ID。当收到一条消息后,消费者程序就可以对比收到的消息 ID 和记录的已处理过的消息 ID,来判断当前收到的消息有没有经过处理。如果已经处理过,那么,消费者程序就不再进行处理了。

但是 List 并不会为每个消息生成 ID 号,所以我们需要自行为每个消息生成一个全局唯一 ID,生成之后,我们在用 LPUSH 命令把消息插入 List 时,需要在消息中包含这个全局唯一 ID。

例如,我们执行以下命令,就把一条全局 ID 为 111000102、库存量为 99 的消息插入了消息队列:

1
2
> LPUSH mq "111000102:stock:99"
(integer) 1

3、如何保证消息可靠性?

当消费者程序从 List 中读取一条消息后,List 就不会再留存这条消息了。所以,如果消费者程序在处理消息的过程出现了故障或宕机,就会导致消息没有处理完成,那么,消费者程序再次启动后,就没法再次从 List 中读取消息了。

为了留存消息,List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存

这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。

好了,到这里可以知道基于 List 类型的消息队列,满足消息队列的三大需求(消息保序、处理重复的消息和保证消息可靠性)。

  • 消息保序:使用 LPUSH + RPOP;
  • 阻塞读取:使用 BRPOP;
  • 重复消息处理:生产者自行实现全局唯一 ID;
  • 消息的可靠性:使用 BRPOPLPUSH

List 作为消息队列有什么缺陷?

List 不支持多个消费者消费同一条消息,因为一旦消费者拉取一条消息后,这条消息就从 List 中删除了,无法被其它消费者再次消费。

要实现一条消息可以被多个消费者消费,那么就要将多个消费者组成一个消费组,使得多个消费者可以消费同一条消息,但是 List 类型并不支持消费组的实现

这就要说起 Redis 从 5.0 版本开始提供的 Stream 数据类型了,Stream 同样能够满足消息队列的三大需求,而且它还支持“消费组”形式的消息读取。

输入自动补全

【需求场景】

根据用户输入,自动补全信息,如:联系人、商品名等。

  • 典型场景一 - 社交网站后台记录用户最近联系过的 100 个好友,当用户查找好友时,根据输入的关键字自动补全姓名。
  • 典型场景二 - 电商网站后台记录用户最近浏览过的 10 件商品,当用户查找商品是,根据输入的关键字自动补全商品名称。

【解决方案】

使用 Redis 的 List 类型存储一个最近信息列表,然后在需要自动补全信息时展示相应数量的数据。

维护最近信息列表的常见操作如下:

  • 如果指定信息已经存在于最近信息列表里,那么从列表里移除。使用 LREM 命令。
  • 将指定信息添加到最近信息列表的头部。使用 LPUSH 命令。
  • 添加操作完成后,如果最近信息列表中的数量超过上限 N,进行裁剪操作。使用 LTRIM 命令。

Set

Redis 中的 Set 类型就是无序且去重的集合。

Set 简介

Set 类型是一个无序并唯一的键值集合,它的存储顺序不会按照插入的先后顺序进行存储。

一个集合最多可以存储 2^32-1 个元素。概念和数学中个的集合基本类似,可以交集,并集,差集等等,所以 Set 类型除了支持集合内的增删改查,同时还支持多个集合取交集、并集、差集。

Set 类型和 List 类型的区别如下:

  • List 可以存储重复元素,Set 只能存储非重复元素;
  • List 是按照元素的先后顺序存储元素的,而 Set 则是无序方式存储元素的。

Set 实现

集合对象的编码可以是 intset 或者 hashtable

intset 编码的集合对象使用整数集合作为底层实现, 集合对象包含的所有元素都被保存在整数集合里面。

hashtable 编码的集合对象使用字典作为底层实现, 字典的每个键都是一个字符串对象, 每个字符串对象包含了一个集合元素, 而字典的值则全部被设置为 NULL

当集合对象可以同时满足以下两个条件时,集合对象使用 intset 编码;否则,使用 hashtable 编码:

  1. 集合对象保存的所有元素都是整数值;
  2. 集合对象保存的元素数量不超过 512 个;

注意:第二个条件的上限值是可以修改的, 具体请看配置文件中关于 set-max-intset-entries 选项的说明。

Set 命令

命令 行为
SADD 将给定元素添加到集合。
SMEMBERS 返回集合包含的所有元素。
SISMEMBER 检查给定元素是否存在于集合中。
SREM 如果给定的元素存在于集合中,那么移除这个元素。

更多命令请参考:Redis Set 类型官方命令文档

Set 常用操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 往集合 key 中存入元素,元素存在则忽略,若 key 不存在则新建
SADD key member [member ...]
# 从集合 key 中删除元素
SREM key member [member ...]
# 获取集合 key 中所有元素
SMEMBERS key
# 获取集合 key 中的元素个数
SCARD key

# 判断 member 元素是否存在于集合 key 中
SISMEMBER key member

# 从集合 key 中随机选出 count 个元素,元素不从 key 中删除
SRANDMEMBER key [count]
# 从集合 key 中随机选出 count 个元素,元素从 key 中删除
SPOP key [count]

Set 运算操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 交集运算
SINTER key [key ...]
# 将交集结果存入新集合 destination 中
SINTERSTORE destination key [key ...]

# 并集运算
SUNION key [key ...]
# 将并集结果存入新集合 destination 中
SUNIONSTORE destination key [key ...]

# 差集运算
SDIFF key [key ...]
# 将差集结果存入新集合 destination 中
SDIFFSTORE destination key [key ...]

Set 应用

集合的主要几个特性,无序、不可重复、支持并交差等操作。

因此 Set 类型比较适合用来数据去重和保障数据的唯一性,还可以用来统计多个集合的交集、错集和并集等,当我们存储的数据是无序并且需要去重的情况下,比较适合使用集合类型进行存储。

但是要提醒你一下,这里有一个潜在的风险。Set 的差集、并集和交集的计算复杂度较高,在数据量较大的情况下,如果直接执行这些计算,会导致 Redis 实例阻塞

在主从集群中,为了避免主库因为 Set 做聚合计算(交集、差集、并集)时导致主库被阻塞,我们可以选择一个从库完成聚合统计,或者把数据返回给客户端,由客户端来完成聚合统计。

点赞

Set 类型可以保证一个用户只能点一个赞,这里举例子一个场景,key 是文章 id,value 是用户 id。

uid:1uid:2uid:3 三个用户分别对 article:1 文章点赞了。

1
2
3
4
5
6
7
8
9
# uid:1 用户对文章 article:1 点赞
> SADD article:1 uid:1
(integer) 1
# uid:2 用户对文章 article:1 点赞
> SADD article:1 uid:2
(integer) 1
# uid:3 用户对文章 article:1 点赞
> SADD article:1 uid:3
(integer) 1

uid:1 取消了对 article:1 文章点赞。

1
2
> SREM article:1 uid:1
(integer) 1

获取 article:1 文章所有点赞用户 :

1
2
3
> SMEMBERS article:1
1) "uid:3"
2) "uid:2"

获取 article:1 文章的点赞用户数量:

1
2
> SCARD article:1
(integer) 2

判断用户 uid:1 是否对文章 article:1 点赞了:

1
2
> SISMEMBER article:1 uid:1
(integer) 0 # 返回 0 说明没点赞,返回 1 则说明点赞了

共同关注

Set 类型支持交集运算,所以可以用来计算共同关注的好友、公众号等。

key 可以是用户 id,value 则是已关注的公众号的 id。

uid:1 用户关注公众号 id 为 5、6、7、8、9,uid:2 用户关注公众号 id 为 7、8、9、10、11。

1
2
3
4
5
6
# uid:1 用户关注公众号 id 为 5、6、7、8、9
> SADD uid:1 5 6 7 8 9
(integer) 5
# uid:2 用户关注公众号 id 为 7、8、9、10、11
> SADD uid:2 7 8 9 10 11
(integer) 5

uid:1uid:2 共同关注的公众号:

1
2
3
4
5
# 获取共同关注
> SINTER uid:1 uid:2
1) "7"
2) "8"
3) "9"

uid:2 推荐 uid:1 关注的公众号:

1
2
3
> SDIFF uid:1 uid:2
1) "5"
2) "6"

验证某个公众号是否同时被 uid:1uid:2 关注:

1
2
3
4
> SISMEMBER uid:1 5
(integer) 1 # 返回 1,说明关注了
> SISMEMBER uid:2 5
(integer) 0 # 返回 0,说明没关注

抽奖活动

存储某活动中中奖的用户名,Set 类型因为有去重功能,可以保证同一个用户不会中奖两次。

key 为抽奖活动名,value 为员工名称,把所有员工名称放入抽奖箱:

1
2
> SADD lucky Tom Jerry John Sean Marry Lindy Sary Mark
(integer) 5

如果允许重复中奖,可以使用 SRANDMEMBER 命令。

1
2
3
4
5
6
7
8
9
10
11
12
# 抽取 1 个一等奖:
> SRANDMEMBER lucky 1
1) "Tom"
# 抽取 2 个二等奖:
> SRANDMEMBER lucky 2
1) "Mark"
2) "Jerry"
# 抽取 3 个三等奖:
> SRANDMEMBER lucky 3
1) "Sary"
2) "Tom"
3) "Jerry"

如果不允许重复中奖,可以使用 SPOP 命令。

1
2
3
4
5
6
7
8
9
10
11
12
# 抽取一等奖 1 个
> SPOP lucky 1
1) "Sary"
# 抽取二等奖 2 个
> SPOP lucky 2
1) "Jerry"
2) "Mark"
# 抽取三等奖 3 个
> SPOP lucky 3
1) "John"
2) "Sean"
3) "Lindy"

Zset

Redis 中的 Zset 类型就是有序且去重的集合。

Zset 简介

Zset 类型(有序集合类型)相比于 Set 类型多了一个排序属性 score(分值),对于有序集合 ZSet 来说,每个存储元素相当于有两个值组成的,一个是有序结合的元素值,一个是排序值。

有序集合保留了集合不能有重复成员的特性(分值可以重复),但不同的是,有序集合中的元素可以排序。

Zset 实现

有序集合的编码可以是 ziplist 或者 skiplist

ziplist 编码的有序集合对象使用压缩列表作为底层实现, 每个集合元素使用两个紧挨在一起的压缩列表节点来保存, 第一个节点保存元素的成员(member), 而第二个元素则保存元素的分值(score)。压缩列表内的集合元素按分值从小到大进行排序, 分值较小的元素被放置在靠近表头的方向, 而分值较大的元素则被放置在靠近表尾的方向。

skiplist 编码的有序集合对象使用 zset 结构作为底层实现, 一个 zset 结构同时包含一个字典和一个跳跃表

1
2
3
4
5
6
7
typedef struct zset {

zskiplist *zsl;

dict *dict;

} zset;

zset 结构中的 zsl 跳跃表按分值从小到大保存了所有集合元素, 每个跳跃表节点都保存了一个集合元素: 跳跃表节点的 object 属性保存了元素的成员, 而跳跃表节点的 score 属性则保存了元素的分值。 通过这个跳跃表, 程序可以对有序集合进行范围型操作, 比如 ZRANK 、 ZRANGE 等命令就是基于跳跃表 API 来实现的。

除此之外, zset 结构中的 dict 字典为有序集合创建了一个从成员到分值的映射, 字典中的每个键值对都保存了一个集合元素: 字典的键保存了元素的成员, 而字典的值则保存了元素的分值。 通过这个字典, 程序可以用 O(1) 复杂度查找给定成员的分值, ZSCORE 命令就是根据这一特性实现的, 而很多其他有序集合命令都在实现的内部用到了这一特性。

有序集合每个元素的成员都是一个字符串对象, 而每个元素的分值都是一个 double 类型的浮点数。 值得一提的是, 虽然 zset 结构同时使用跳跃表和字典来保存有序集合元素, 但这两种数据结构都会通过指针来共享相同元素的成员和分值, 所以同时使用跳跃表和字典来保存集合元素不会产生任何重复成员或者分值, 也不会因此而浪费额外的内存。

当有序集合对象可以同时满足以下两个条件时,有序集合对象使用 ziplist 编码;否则,使用 skiplist 编码。

  • 有序集合保存的元素数量小于 128 个;
  • 有序集合保存的所有元素成员的长度都小于 64 字节;

注意:以上两个条件的上限值是可以修改的, 具体请看配置文件中关于 zset-max-ziplist-entries 选项和 zset-max-ziplist-value 选项的说明。

在 Redis 7.0 中,压缩列表数据结构已经废弃了,交由 listpack 数据结构来实现了。

Zset 命令

命令 行为
ZADD 将一个带有给定分值的成员添加到有序集合里面
ZRANGE 顺序排序,并返回指定排名区间的成员
ZREVRANGE 反序排序,并返回指定排名区间的成员
ZRANGEBYSCORE 顺序排序,并返回指定排名区间的成员及其分值
ZREVRANGEBYSCORE 反序排序,并返回指定排名区间的成员及其分值
ZREM 移除指定的成员
ZSCORE 返回指定成员的分值
ZCARD 返回所有成员数

更多命令请参考:Redis ZSet 类型官方命令文档

Zset 常用操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 往有序集合 key 中加入带分值元素
ZADD key score member [[score member]...]
# 往有序集合 key 中删除元素
ZREM key member [member...]
# 返回有序集合 key 中元素 member 的分值
ZSCORE key member
# 返回有序集合 key 中元素个数
ZCARD key

# 为有序集合 key 中元素 member 的分值加上 increment
ZINCRBY key increment member

# 正序获取有序集合 key 从 start 下标到 stop 下标的元素
ZRANGE key start stop [WITHSCORES]
# 倒序获取有序集合 key 从 start 下标到 stop 下标的元素
ZREVRANGE key start stop [WITHSCORES]

# 返回有序集合中指定分数区间内的成员,分数由低到高排序。
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]

# 返回指定成员区间内的成员,按字典正序排列,分数必须相同。
ZRANGEBYLEX key min max [LIMIT offset count]
# 返回指定成员区间内的成员,按字典倒序排列,分数必须相同
ZREVRANGEBYLEX key max min [LIMIT offset count]

Zset 运算操作(相比于 Set 类型,ZSet 类型没有支持差集运算):

1
2
3
4
# 并集计算(相同元素分值相加),numberkeys 一共多少个 key,WEIGHTS 每个 key 对应的分值乘积
ZUNIONSTORE destkey numberkeys key [key...]
# 交集计算(相同元素分值相加),numberkeys 一共多少个 key,WEIGHTS 每个 key 对应的分值乘积
ZINTERSTORE destkey numberkeys key [key...]

Zset 应用

Zset 类型(Sorted Set,有序集合)可以根据元素的权重来排序,我们可以自己来决定每个元素的权重值。比如说,我们可以根据元素插入 Sorted Set 的时间确定权重值,先插入的元素权重小,后插入的元素权重大。

在面对需要展示最新列表、排行榜等场景时,如果数据更新频繁或者需要分页显示,可以优先考虑使用 Sorted Set。

排行榜

【需求场景】

各种排行榜,如:内容平台(视频、歌曲、文章)的播放量/收藏量/评分排行榜;电商网站的销售排行榜;

【解决方案】

有序集合比较典型的使用场景就是排行榜。例如学生成绩的排名榜、游戏积分排行榜、视频播放排名、电商系统中商品的销量排名等。

我们以博文点赞排名为例,dunwu 发表了五篇博文,分别获得赞为 200、40、100、50、150。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# article:1 文章获得了 200 个赞
> ZADD user:dunwu:ranking 200 article:1
(integer) 1
# article:2 文章获得了 40 个赞
> ZADD user:dunwu:ranking 40 article:2
(integer) 1
# article:3 文章获得了 100 个赞
> ZADD user:dunwu:ranking 100 article:3
(integer) 1
# article:4 文章获得了 50 个赞
> ZADD user:dunwu:ranking 50 article:4
(integer) 1
# article:5 文章获得了 150 个赞
> ZADD user:dunwu:ranking 150 article:5
(integer) 1

文章 article:4 新增一个赞,可以使用 ZINCRBY 命令(为有序集合 key 中元素 member 的分值加上 increment):

1
2
> ZINCRBY user:dunwu:ranking 1 article:4
"51"

查看某篇文章的赞数,可以使用 ZSCORE 命令(返回有序集合 key 中元素个数):

1
2
> ZSCORE user:dunwu:ranking article:4
"50"

获取 dunwu 文章赞数最多的 3 篇文章,可以使用 ZREVRANGE 命令(倒序获取有序集合 key 从 start 下标到 stop 下标的元素):

1
2
3
4
5
6
7
8
# WITHSCORES 表示把 score 也显示出来
> ZREVRANGE user:dunwu:ranking 0 2 WITHSCORES
1) "article:1"
2) "200"
3) "article:5"
4) "150"
5) "article:3"
6) "100"

获取 dunwu 100 赞到 200 赞的文章,可以使用 ZRANGEBYSCORE 命令(返回有序集合中指定分数区间内的成员,分数由低到高排序):

1
2
3
4
5
6
7
> ZRANGEBYSCORE user:dunwu:ranking 100 200 WITHSCORES
1) "article:3"
2) "100"
3) "article:5"
4) "150"
5) "article:1"
6) "200"

前缀排序

使用有序集合的 ZRANGEBYLEXZREVRANGEBYLEX 可以帮助我们实现电话号码或姓名的排序,我们以 ZRANGEBYLEX (返回指定成员区间内的成员,按 key 正序排列,分数必须相同)为例。

注意:不要在分数不一致的 SortSet 集合中去使用 ZRANGEBYLEX 和 ZREVRANGEBYLEX 指令,因为获取的结果会不准确。

1、电话排序

我们可以将电话号码存储到 SortSet 中,然后根据需要来获取号段:

1
2
3
4
5
6
> ZADD phone 0 13100111100 0 13110114300 0 13132110901
(integer) 3
> ZADD phone 0 13200111100 0 13210414300 0 13252110901
(integer) 3
> ZADD phone 0 13300111100 0 13310414300 0 13352110901
(integer) 3

获取所有号码:

1
2
3
4
5
6
7
8
9
10
> ZRANGEBYLEX phone - +
1) "13100111100"
2) "13110114300"
3) "13132110901"
4) "13200111100"
5) "13210414300"
6) "13252110901"
7) "13300111100"
8) "13310414300"
9) "13352110901"

获取 132 号段的号码:

1
2
3
4
> ZRANGEBYLEX phone [132 (133
1) "13200111100"
2) "13210414300"
3) "13252110901"

获取 132、133 号段的号码:

1
2
3
4
5
6
7
> ZRANGEBYLEX phone [132 (134
1) "13200111100"
2) "13210414300"
3) "13252110901"
4) "13300111100"
5) "13310414300"
6) "13352110901"

2、姓名排序

1
2
> zadd names 0 Toumas 0 Jake 0 Bluetuo 0 Gaodeng 0 Aimini 0 Aidehua
(integer) 6

获取所有人的名字:

1
2
3
4
5
6
7
> ZRANGEBYLEX names - +
1) "Aidehua"
2) "Aimini"
3) "Bluetuo"
4) "Gaodeng"
5) "Jake"
6) "Toumas"

获取名字中大写字母 A 开头的所有人:

1
2
3
> ZRANGEBYLEX names [A (B
1) "Aidehua"
2) "Aimini"

获取名字中大写字母 C 到 Z 的所有人:

1
2
3
4
> ZRANGEBYLEX names [C [Z
1) "Gaodeng"
2) "Jake"
3) "Toumas"

总结

Redis 常见的五种数据类型:String(字符串),Hash(哈希),List(列表),Set(集合)及 Zset(sorted set:有序集合)

这五种数据类型都由多种数据结构实现的,主要是出于时间和空间的考虑,当数据量小的时候使用更简单的数据结构,有利于节省内存,提高性能。

可以看到,Redis 数据类型的底层数据结构随着版本的更新也有所不同,比如:

  • 在 Redis 3.0 版本中 List 对象的底层数据结构由“双向链表”或“压缩表列表”实现,但是在 3.2 版本之后,List 数据类型底层数据结构是由 quicklist 实现的;
  • 在最新的 Redis 代码中,压缩列表数据结构已经废弃了,交由 listpack 数据结构来实现了。

Redis 五种数据类型的应用场景:

  • String 类型的应用场景:缓存对象、分布式锁、共享 session、计数器、限流、分布式 ID 等。
  • List 类型的应用场景:消息队列(有两个问题:1. 生产者需要自行实现全局唯一 ID;2. 不能以消费组形式消费数据)、输入自动补全等。
  • Hash 类型:缓存对象、购物车等。
  • Set 类型:聚合计算(并集、交集、差集)场景,比如点赞、共同关注、抽奖活动等。
  • Zset 类型:排序场景,比如排行榜、电话和姓名排序等。

Redis 后续版本又支持四种数据类型,它们的应用场景如下:

  • BitMap(2.2 版新增):二值状态统计的场景,比如签到、判断用户登陆状态、连续签到用户总数等;
  • HyperLogLog(2.8 版新增):海量数据基数统计的场景,比如百万级网页 UV 计数等;
  • GEO(3.2 版新增):存储地理位置信息的场景,比如滴滴叫车;
  • Stream(5.0 版新增):消息队列,相比于基于 List 类型实现的消息队列,有这两个特有的特性:自动生成全局唯一消息 ID,支持以消费组形式消费数据。

针对 Redis 是否适合做消息队列,关键看你的业务场景:

  • 如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。
  • 如果你的业务有海量消息,消息积压的概率比较大,并且不能接受数据丢失,那么还是用专业的消息队列中间件吧。

参考资料

Redis 高级数据类型

关键词:BitMapHyperLogLogGeoStream

Redis 支持的高级数据类型:BitMap、HyperLogLog、GEO、Stream

使用 Redis ,不仅要了解其数据类型的特性,还需要根据业务场景,灵活的、高效的使用其数据类型来建模。

BitMap

BitMap 简介

Bitmap,即位图,是一串连续的二进制数组(0 和 1),可以通过偏移量(offset)定位元素。由于 bit 是计算机中最小的单位,使用它进行储存将非常节省空间,特别适合一些数据量大且使用二值统计的场景。例如在一个系统中,不同的用户使用单调递增的用户 ID 表示。40 亿($$2^{32}$$ = $$410241024*1024$$ ≈ 40 亿)用户只需要 512M 内存就能记住某种状态,例如用户是否已登录。

BitMap 实现

实际上,BitMap 不是真实的数据结构,而是基于 String 实现的一组位操作

由于 STRING 是二进制安全的,并且其最大长度是 512 MB,所以 BitMap 能最大设置 $$2^{32}$$ 个不同的 bit。

BitMap 命令

命令 行为
SETBIT 对 key 所储存的字符串值,设置或清除指定偏移量上的位(bit)
GETBIT 对 key 所储存的字符串值,获取指定偏移量上的位(bit)
BITOP 对一个或多个字符串执行位运算

【示例】SETBIT、GETBIT 操作

假设有 1000 个传感器,标记为 0-999。现在,想要快速确定某传感器是否在一小时内对服务器执行了 ping 操作。

1
2
3
4
5
6
7
8
9
# 传感器 123 在 2024 年 1 月 1 日 00:00 内对服务器执行 ping 操作
> SETBIT pings:2024-01-01-00:00 123 1
(integer) 0
# 传感器 123 是否在 2024 年 1 月 1 日 00:00 内对服务器执行 ping 操作
> GETBIT pings:2024-01-01-00:00 123
1
What about sensor 456?
> GETBIT pings:2024-01-01-00:00 456
0

【示例】BITOP 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
# BitMap间的运算
# operations 位移操作符,枚举值
AND 与运算 &
OR 或运算 |
XOR 异或 ^
NOT 取反 ~
# result 计算的结果,会存储在该key中
# key1 … keyn 参与运算的key,可以有多个,空格分割,not运算只能一个key
# 当 BITOP 处理不同长度的字符串时,较短的那个字符串所缺少的部分会被看作 0。返回值是保存到 destkey 的字符串的长度(以字节byte为单位),和输入 key 中最长的字符串长度相等。
BITOP [operations] [result] [key1] [keyn…]

# 返回指定key中第一次出现指定value(0/1)的位置
BITPOS [key] [value]

BitMap 应用

Bitmap 类型非常适合二值状态统计的场景,这里的二值状态就是指集合元素的取值就只有 0 和 1 两种,在记录海量数据时,Bitmap 能够有效地节省内存空间。

签到统计

在签到打卡的场景中,我们只用记录签到(1)或未签到(0),所以它就是非常典型的二值状态。

签到统计时,每个用户一天的签到用 1 个 bit 位就能表示,一个月(假设是 31 天)的签到情况用 31 个 bit 位就可以,而一年的签到也只需要用 365 个 bit 位,根本不用太复杂的集合类型。

假设我们要统计 ID 100 的用户在 2022 年 6 月份的签到情况,就可以按照下面的步骤进行操作。

第一步,执行下面的命令,记录该用户 6 月 3 号已签到。

1
SETBIT uid:sign:100:202206 2 1

第二步,检查该用户 6 月 3 日是否签到。

1
GETBIT uid:sign:100:202206 2

第三步,统计该用户在 6 月份的签到次数。

1
BITCOUNT uid:sign:100:202206

这样,我们就知道该用户在 6 月份的签到情况了。

如何统计这个月首次打卡时间呢?

Redis 提供了 BITPOS key bitValue [start] [end]指令,返回数据表示 Bitmap 中第一个值为 bitValue 的 offset 位置。

在默认情况下,命令将检测整个位图,用户可以通过可选的 start 参数和 end 参数指定要检测的范围。所以我们可以通过执行这条命令来获取 userID = 100 在 2022 年 6 月份首次打卡日期:

1
BITPOS uid:sign:100:202206 1

需要注意的是,因为 offset 从 0 开始的,所以我们需要将返回的 value + 1。

判断用户是否登录

Bitmap 提供了 GETBIT、SETBIT 操作,通过一个偏移值 offset 对 bit 数组的 offset 位置的 bit 位进行读写操作,需要注意的是 offset 从 0 开始。

只需要一个 key = login_status 表示存储用户登陆状态集合数据,将用户 ID 作为 offset,在线就设置为 1,下线设置 0。通过 GETBIT判断对应的用户是否在线。50000 万 用户只需要 6 MB 的空间。

假如我们要判断 ID = 10086 的用户的登陆情况:

第一步,执行以下指令,表示用户已登录。

1
SETBIT login_status 10086 1

第二步,检查该用户是否登陆,返回值 1 表示已登录。

1
GETBIT login_status 10086

第三步,登出,将 offset 对应的 value 设置成 0。

1
SETBIT login_status 10086 0

连续签到用户总数

如何统计出这连续 7 天连续打卡用户总数呢?

我们把每天的日期作为 Bitmap 的 key,userId 作为 offset,若是打卡则将 offset 位置的 bit 设置成 1。

key 对应的集合的每个 bit 位的数据则是一个用户在该日期的打卡记录。

一共有 7 个这样的 Bitmap,如果我们能对这 7 个 Bitmap 的对应的 bit 位做 “与”运算。同样的 UserID offset 都是一样的,当一个 userID 在 7 个 Bitmap 对应对应的 offset 位置的 bit = 1 就说明该用户 7 天连续打卡。

结果保存到一个新 Bitmap 中,我们再通过 BITCOUNT 统计 bit = 1 的个数便得到了连续打卡 7 天的用户总数了。

Redis 提供了 BITOP operation destkey key [key ...]这个指令用于对一个或者多个 key 的 Bitmap 进行位元操作。

  • operation 可以是 andORNOTXOR。当 BITOP 处理不同长度的字符串时,较短的那个字符串所缺少的部分会被看作 0 。空的 key 也被看作是包含 0 的字符串序列。

假设要统计 3 天连续打卡的用户数,则是将三个 bitmap 进行 AND 操作,并将结果保存到 destmap 中,接着对 destmap 执行 BITCOUNT 统计,如下命令:

1
2
3
4
# 与操作
BITOP AND destmap bitmap:01 bitmap:02 bitmap:03
# 统计 bit 位 = 1 的个数
BITCOUNT destmap

即使一天产生一个亿的数据,Bitmap 占用的内存也不大,大约占 12 MB 的内存(10^8/8/1024/1024),7 天的 Bitmap 的内存开销约为 84 MB。同时我们最好给 Bitmap 设置过期时间,让 Redis 删除过期的打卡数据,节省内存。

HyperLogLog

HyperLogLog 简介

Redis HyperLogLog 是 Redis 2.8.9 版本新增的数据类型,是一种用于“统计基数”的数据集合类型,基数统计就是指统计一个集合中不重复的元素个数。但要注意,**HyperLogLog 是统计规则是基于概率完成的,不是非常准确,标准误算率是 0.81%**。

所以,简单来说 HyperLogLog 提供不精确的去重计数

HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的内存空间总是固定的、并且是很小的。

在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数,和元素越多就越耗费内存的 Set 和 Hash 类型相比,HyperLogLog 就非常节省空间。

这什么概念?举个例子给大家对比一下。

用 Java 语言来说,一般 long 类型占用 8 字节,而 1 字节有 8 位,即:1 byte = 8 bit,即 long 数据类型最大可以表示的数是:2^63-1。对应上面的2^64个数,假设此时有2^63-1这么多个数,从 0 ~ 2^63-1,按照long以及1k = 1024 字节的规则来计算内存总数,就是:((2^63-1) * 8/1024)K,这是很庞大的一个数,存储空间远远超过12K,而 HyperLogLog 却可以用 12K 就能统计完。

HyperLogLog 实现

HyperLogLog 的实现涉及到很多数学问题,太费脑子了,我也没有搞懂,如果你想了解一下,课下可以看看这个:HyperLogLog

HyperLogLog 命令

HyperLogLog 命令很少,就三个。

1
2
3
4
5
6
7
8
# 添加指定元素到 HyperLogLog 中
PFADD key element [element ...]

# 返回给定 HyperLogLog 的基数估算值。
PFCOUNT key [key ...]

# 将多个 HyperLogLog 合并为一个 HyperLogLog
PFMERGE destkey sourcekey [sourcekey ...]

HyperLogLog 应用

百万级网页 UV 计数

Redis HyperLogLog 优势在于只需要花费 12 KB 内存,就可以计算接近 2^64 个元素的基数,和元素越多就越耗费内存的 Set 和 Hash 类型相比,HyperLogLog 就非常节省空间。

所以,非常适合统计百万级以上的网页 UV 的场景。

在统计 UV 时,你可以用 PFADD 命令(用于向 HyperLogLog 中添加新元素)把访问页面的每个用户都添加到 HyperLogLog 中。

1
PFADD page1:uv user1 user2 user3 user4 user5

接下来,就可以用 PFCOUNT 命令直接获得 page1 的 UV 值了,这个命令的作用就是返回 HyperLogLog 的统计结果。

1
PFCOUNT page1:uv

不过,有一点需要你注意一下,HyperLogLog 的统计规则是基于概率完成的,所以它给出的统计结果是有一定误差的,标准误算率是 0.81%。

这也就意味着,你使用 HyperLogLog 统计的 UV 是 100 万,但实际的 UV 可能是 101 万。虽然误差率不算大,但是,如果你需要精确统计结果的话,最好还是继续用 Set 或 Hash 类型。

GEO

GEO 简介

Redis GEO 是 Redis 3.2 版本新增的数据类型,主要用于存储地理位置信息,并对存储的信息进行操作。

在日常生活中,我们越来越依赖搜索“附近的餐馆”、在打车软件上叫车,这些都离不开基于位置信息服务(Location-Based Service,LBS)的应用。LBS 应用访问的数据是和人或物关联的一组经纬度信息,而且要能查询相邻的经纬度范围,GEO 就非常适合应用在 LBS 服务的场景中。

GEO 实现

GEO 本身并没有设计新的底层数据结构,而是直接使用了 Zset 类型。

GEO 类型使用 GeoHash 编码方法实现了经纬度到 Sorted Set 中元素权重分数的转换,这其中的两个关键机制就是“对二维地图做区间划分”和“对区间进行编码”。一组经纬度落在某个区间后,就用区间的编码值来表示,并把编码值作为 Sorted Set 元素的权重分数。

这样一来,我们就可以把经纬度保存到 Sorted Set 中,利用 Sorted Set 提供的“按权重进行有序范围查找”的特性,实现 LBS 服务中频繁使用的“搜索附近”的需求。

GEO 命令

1
2
3
4
5
6
7
8
9
10
11
# 存储指定的地理空间位置,可以将一个或多个经度(longitude)、纬度(latitude)、位置名称(member)添加到指定的 key 中。
GEOADD key longitude latitude member [longitude latitude member ...]

# 从给定的 key 里返回所有指定名称(member)的位置(经度和纬度),不存在的返回 nil。
GEOPOS key member [member ...]

# 返回两个给定位置之间的距离。
GEODIST key member1 member2 [m|km|ft|mi]

# 根据用户给定的经纬度坐标来获取指定范围内的地理位置集合。
GEORADIUS key longitude latitude radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC] [STORE key] [STOREDIST key]

GEO 应用

滴滴叫车

这里以滴滴叫车的场景为例,介绍下具体如何使用 GEO 命令:GEOADD 和 GEORADIUS 这两个命令。

假设车辆 ID 是 33,经纬度位置是(116.034579,39.030452),我们可以用一个 GEO 集合保存所有车辆的经纬度,集合 key 是 cars:locations。

执行下面的这个命令,就可以把 ID 号为 33 的车辆的当前经纬度位置存入 GEO 集合中:

1
GEOADD cars:locations 116.034579 39.030452 33

当用户想要寻找自己附近的网约车时,LBS 应用就可以使用 GEORADIUS 命令。

例如,LBS 应用执行下面的命令时,Redis 会根据输入的用户的经纬度信息(116.054579,39.030452),查找以这个经纬度为中心的 5 公里内的车辆信息,并返回给 LBS 应用。

1
GEORADIUS cars:locations 116.054579 39.030452 5 km ASC COUNT 10

Stream

Stream 简介

Redis Stream 是 Redis 5.0 版本新增加的数据类型,Redis 专门为消息队列设计的数据类型。

在 Redis 5.0 Stream 没出来之前,消息队列的实现方式都有着各自的缺陷,例如:

  • 发布订阅模式,不能持久化也就无法可靠的保存消息,并且对于离线重连的客户端不能读取历史消息的缺陷;
  • List 实现消息队列的方式不能重复消费,一个消息消费完就会被删除,而且生产者需要自行实现全局唯一 ID。

基于以上问题,Redis 5.0 便推出了 Stream 类型也是此版本最重要的功能,用于完美地实现消息队列,它支持消息的持久化、支持自动生成全局唯一 ID、支持 ack 确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠。

Stream 命令

Stream 消息队列操作命令:

  • XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
  • XLEN:查询消息长度;
  • XREAD:用于读取消息,可以按 ID 读取数据;
  • XDEL:根据消息 ID 删除消息;
  • DEL:删除整个 Stream;
  • XRANGE:读取区间消息
  • XREADGROUP:按消费组形式读取消息;
  • XPENDING 和 XACK:
    • XPENDING 命令可以用来查询每个消费组内所有消费者“已读取、但尚未确认”的消息;
    • XACK 命令用于向消息队列确认消息处理已完成;

Stream 应用

消息队列

生产者通过 XADD 命令插入一条消息:

1
2
3
4
# * 表示让 Redis 为插入的数据自动生成一个全局唯一的 ID
# 往名称为 mymq 的消息队列中插入一条消息,消息的键是 name,值是 xiaolin
> XADD mymq * name xiaolin
"1654254953808-0"

插入成功后会返回全局唯一的 ID:”1654254953808-0”。消息的全局唯一 ID 由两部分组成:

  • 第一部分“1654254953808”是数据插入时,以毫秒为单位计算的当前服务器时间;
  • 第二部分表示插入消息在当前毫秒内的消息序号,这是从 0 开始编号的。例如,“1654254953808-0”就表示在“1654254953808”毫秒内的第 1 条消息。

消费者通过 XREAD 命令从消息队列中读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取(注意是输入消息 ID 的下一条信息开始读取,不是查询输入 ID 的消息)。

1
2
3
4
5
6
# 从 ID 号为 1654254953807-0 的消息开始,读取后续的所有消息(示例中一共 1 条)。
> XREAD STREAMS mymq 1654254953807-0
1) 1) "mymq"
2) 1) 1) "1654254953808-0"
2) 1) "name"
2) "xiaolin"

如果想要实现阻塞读(当没有数据时,阻塞住),可以调用 XRAED 时设定 BLOCK 配置项,实现类似于 BRPOP 的阻塞读取操作。

比如,下面这命令,设置了 BLOCK 10000 的配置项,10000 的单位是毫秒,表明 XREAD 在读取最新消息时,如果没有消息到来,XREAD 将阻塞 10000 毫秒(即 10 秒),然后再返回。

1
2
3
4
# 命令最后的“$”符号表示读取最新的消息
> XREAD BLOCK 10000 STREAMS mymq $
(nil)
(10.00s)

Stream 的基础方法,使用 xadd 存入消息和 xread 循环阻塞读取消息的方式可以实现简易版的消息队列,交互流程如下图所示:

前面介绍的这些操作 List 也支持的,接下来看看 Stream 特有的功能。

Stream 可以以使用 XGROUP 创建消费组,创建消费组之后,Stream 可以使用 XREADGROUP 命令让消费组内的消费者读取消息。

创建两个消费组,这两个消费组消费的消息队列是 mymq,都指定从第一条消息开始读取:

1
2
3
4
5
6
# 创建一个名为 group1 的消费组,0-0 表示从第一条消息开始读取。
> XGROUP CREATE mymq group1 0-0
OK
# 创建一个名为 group2 的消费组,0-0 表示从第一条消息开始读取。
> XGROUP CREATE mymq group2 0-0
OK

消费组 group1 内的消费者 consumer1 从 mymq 消息队列中读取所有消息的命令如下:

1
2
3
4
5
6
# 命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。
> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
1) 1) "mymq"
2) 1) 1) "1654254953808-0"
2) 1) "name"
2) "xiaolin"

消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了,即同一个消费组里的消费者不能消费同一条消息

比如说,我们执行完刚才的 XREADGROUP 命令后,再执行一次同样的命令,此时读到的就是空值了:

1
2
> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
(nil)

但是,不同消费组的消费者可以消费同一条消息(但是有前提条件,创建消息组的时候,不同消费组指定了相同位置开始读取消息)

比如说,刚才 group1 消费组里的 consumer1 消费者消费了一条 id 为 1654254953808-0 的消息,现在用 group2 消费组里的 consumer1 消费者消费消息:

1
2
3
4
5
> XREADGROUP GROUP group2 consumer1 STREAMS mymq >
1) 1) "mymq"
2) 1) 1) "1654254953808-0"
2) 1) "name"
2) "xiaolin"

因为我创建两组的消费组都是从第一条消息开始读取,所以可以看到第二组的消费者依然可以消费 id 为 1654254953808-0 的这一条消息。因此,不同的消费组的消费者可以消费同一条消息。

使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。

例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 让 group2 中的 consumer1 从 mymq 消息队列中消费一条消息
> XREADGROUP GROUP group2 consumer1 COUNT 1 STREAMS mymq >
1) 1) "mymq"
2) 1) 1) "1654254953808-0"
2) 1) "name"
2) "xiaolin"
# 让 group2 中的 consumer2 从 mymq 消息队列中消费一条消息
> XREADGROUP GROUP group2 consumer2 COUNT 1 STREAMS mymq >
1) 1) "mymq"
2) 1) 1) "1654256265584-0"
2) 1) "name"
2) "xiaolincoding"
# 让 group2 中的 consumer3 从 mymq 消息队列中消费一条消息
> XREADGROUP GROUP group2 consumer3 COUNT 1 STREAMS mymq >
1) 1) "mymq"
2) 1) 1) "1654256271337-0"
2) 1) "name"
2) "Tom"

基于 Stream 实现的消息队列,如何保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息?

Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。

消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 XACK 命令确认消息已经被消费完成,整个流程的执行如下图所示:

如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息

例如,我们来查看一下 group2 中各个消费者已读取、但尚未确认的消息个数,命令如下:

1
2
3
4
5
6
7
8
9
10
127.0.0.1:6379> XPENDING mymq group2
1) (integer) 3
2) "1654254953808-0" # 表示 group2 中所有消费者读取的消息最小 ID
3) "1654256271337-0" # 表示 group2 中所有消费者读取的消息最大 ID
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer2"
2) "1"
3) 1) "consumer3"
2) "1"

如果想查看某个消费者具体读取了哪些数据,可以执行下面的命令:

1
2
3
4
5
6
# 查看 group2 里 consumer2 已从 mymq 消息队列中读取了哪些消息
> XPENDING mymq group2 - + 10 consumer2
1) 1) "1654256265584-0"
2) "consumer2"
3) (integer) 410700
4) (integer) 1

可以看到,consumer2 已读取的消息的 ID 是 1654256265584-0。

一旦消息 1654256265584-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除

1
2
> XACK mymq group2 1654256265584-0
(integer) 1

当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。

1
2
> XPENDING mymq group2 - + 10 consumer2
(empty array)

好了,基于 Stream 实现的消息队列就说到这里了,小结一下:

  • 消息保序:XADD/XREAD
  • 阻塞读取:XREAD block
  • 重复消息处理:Stream 在使用 XADD 命令,会自动生成全局唯一 ID;
  • 消息可靠性:内部使用 PENDING List 自动保存消息,使用 XPENDING 命令查看消费组已经读取但是未被确认的消息,消费者使用 XACK 确认消息;
  • 支持消费组形式消费数据

Redis 基于 Stream 消息队列与专业的消息队列有哪些差距?

一个专业的消息队列,必须要做到两大块:

  • 消息不丢。
  • 消息可堆积。

1、Redis Stream 消息会丢失吗?

使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者,所以要保证消息就是保证三个环节都不能丢失数据。

Redis Stream 消息队列能不能保证三个环节都不丢失数据?

  • Redis 生产者会不会丢消息?生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。从消息被生产出来,然后提交给 MQ 的过程中,只要能正常收到(MQ 中间件)的 ack 确认响应,就表示发送成功,所以只要处理好返回值和异常,如果返回异常则进行消息重发,那么这个阶段是不会出现消息丢失的。
  • Redis 消费者会不会丢消息?不会,因为 Stream(MQ 中间件)会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,但是未被确认的消息。消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。等到消费者执行完业务逻辑后,再发送消费确认 XACK 命令,也能保证消息的不丢失。
  • Redis 消息中间件会不会丢消息?,Redis 在以下 2 个场景下,都会导致数据丢失:

可以看到,Redis 在队列中间件环节无法保证消息不丢。像 RabbitMQ 或 Kafka 这类专业的队列中间件,在使用时是部署一个集群,生产者在发布消息时,队列中间件通常会写“多个节点”,也就是有多个副本,这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失。

2、Redis Stream 消息可堆积吗?

Redis 的数据都存储在内存中,这就意味着一旦发生消息积压,则会导致 Redis 的内存持续增长,如果超过机器内存上限,就会面临被 OOM 的风险。

所以 Redis 的 Stream 提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。

当指定队列最大长度时,队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。

但 Kafka、RabbitMQ 专业的消息队列它们的数据都是存储在磁盘上,当消息积压时,无非就是多占用一些磁盘空间。

因此,把 Redis 当作队列来使用时,会面临的 2 个问题:

  • Redis 本身可能会丢数据;
  • 面对消息挤压,内存资源会紧张;

所以,能不能将 Redis 作为消息队列来使用,关键看你的业务场景:

  • 如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。
  • 如果你的业务有海量消息,消息积压的概率比较大,并且不能接受数据丢失,那么还是用专业的消息队列中间件吧。

补充:Redis 发布/订阅机制为什么不可以作为消息队列?

发布订阅机制存在以下缺点,都是跟丢失数据有关:

  1. 发布/订阅机制没有基于任何数据类型实现,所以不具备“数据持久化”的能力,也就是发布/订阅机制的相关操作,不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,发布/订阅机制的数据也会全部丢失。
  2. 发布订阅模式是“发后既忘”的工作模式,如果有订阅者离线重连之后不能消费之前的历史消息。
  3. 当消费端有一定的消息积压时,也就是生产者发送的消息,消费者消费不过来时,如果超过 32M 或者是 60s 内持续保持在 8M 以上,消费端会被强行断开,这个参数是在配置文件中设置的,默认值是 client-output-buffer-limit pubsub 32mb 8mb 60

所以,发布/订阅机制只适合即时通讯的场景,比如构建哨兵集群的场景采用了发布/订阅机制。

总结

Redis 后续版本又支持四种数据类型,它们的应用场景如下:

  • BitMap(2.2 版新增):二值状态统计的场景,比如签到、判断用户登陆状态、连续签到用户总数等;
  • HyperLogLog(2.8 版新增):海量数据基数统计的场景,比如百万级网页 UV 计数等;
  • GEO(3.2 版新增):存储地理位置信息的场景,比如滴滴叫车;
  • Stream(5.0 版新增):消息队列,相比于基于 List 类型实现的消息队列,有这两个特有的特性:自动生成全局唯一消息 ID,支持以消费组形式消费数据。

针对 Redis 是否适合做消息队列,关键看你的业务场景:

  • 如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。
  • 如果你的业务有海量消息,消息积压的概率比较大,并且不能接受数据丢失,那么还是用专业的消息队列中间件吧。

参考资料

Redis 集群

Redis 集群(Redis Cluster) 是 Redis 官方提供的“分布式数据库”方案

Redis Cluster 既然被设计分布式系统,自然需要具备分布式系统的基本特性:伸缩性、高可用、一致性。

  • 伸缩性 - Redis Cluster 通过划分虚拟 hash 槽来进行“分区”,以实现集群的伸缩性。
  • 高可用 - Redis Cluster 采用主从架构,支持“复制”和“自动故障转移”,以保证 Redis Cluster 的高可用。
  • 一致性 - 根据 CAP 理论,Consistency、Availability、Partition tolerance 三者不可兼得。而 Redis Cluster 的选择是 AP,即不保证“强一致性”,尽力达到“最终一致性”。

Redis Cluster 应用了 Raft 协议 协议和 Gossip 协议。

关键词:高可用监控选主故障转移分区RaftGossip

Redis Cluster 分区

集群节点

Redis Cluster 由多个节点组成,节点刚启动时,彼此是相互独立的。节点通过握手( CLUSTER MEET 命令)来将其他节点添加到自己所处的集群中

向一个节点发送 CLUSTER MEET 命令,可以让当前节点与指定 IP、PORT 的节点进行三次握手,握手成功时,当前节点会将指定节点加入所在集群。

集群节点保存键值对以及过期时间的方式与单机 Redis 服务完全相同

Redis Cluster 节点分为主节点(master)和从节点(slave):

  • 主节点用于处理槽。
  • 从节点用于复制主节点, 并在主节点下线时, 代替主节点继续处理命令请求。

分配 Hash 槽

分布式存储需要解决的首要问题是把整个数据集按照“分区规则” 到多个节点,即每个节点负责整体数据的一个 子集

Redis Cluster 将整个数据库规划为 “16384” 个虚拟的哈希槽,数据库中的每个键都属于其中一个槽。每个节点都会记录哪些槽指派给了自己, 而哪些槽又被指派给了其他节点

如果数据库中有任何一个槽没有得到分配,那么集群处于“下线”状态

通过向节点发送 CLUSTER ADDSLOTS 命令,可以将一个或多个槽指派给节点负责。

1
2
> CLUSTER ADDSLOTS 1 2 3
OK

集群中的每个节点负责一部分哈希槽,比如集群中有3个节点,则:

  • 节点A存储的哈希槽范围是:0 – 5500
  • 节点B存储的哈希槽范围是:5501 – 11000
  • 节点C存储的哈希槽范围是:11001 – 16384

路由

当客户端向节点发送与数据库键有关的命令时,接受命令的节点会计算出命令要处理的数据库属于哪个槽,并检查这个槽是否指派给了自己

  • 如果键所在的槽正好指派给了当前节点,那么当前节点直接执行命令。
  • 如果键所在的槽没有指派给当前节点,那么节点会向客户端返回一个 MOVED 错误,指引客户端重定向至正确的节点。

计算键属于哪个槽

决定一个 key 应该分配到那个槽的算法是:计算该 key 的 CRC16 结果再模 16834

1
HASH_SLOT = CRC16(KEY) mod 16384

当节点计算出 key 所属的槽为 i 之后,节点会根据以下条件判断槽是否由自己负责:

1
clusterState.slots[i] == clusterState.myself

MOVED 错误

节点在接到一个命令请求时,会先检查这个命令请求要处理的键所在的槽是否由自己负责, 如果不是的话, 节点将向客户端返回一个 MOVED 错误, MOVED 错误携带的信息可以指引客户端转向至正在负责相关槽的节点。

MOVED 错误的格式为:

1
MOVED <slot> <ip>:<port>

提示:MOVED 命令的作用有点类似 HTTP 协议中的重定向。

重新分区

对 Redis Cluster 的重新分片工作是由客户端(redis-trib)执行的, 重新分片的关键是将属于某个槽的所有键值对从一个节点转移至另一个节点

重新分区操作可以“在线”进行,在重新分区的过程中,集群不需要下线,并且源节点和目标节点都可以继续处理命令请求。

重新分区的实现原理如下图所示:

img

ASK 错误

如果节点 A 正在迁移槽 i 至节点 B , 那么当节点 A 没能在自己的数据库中找到命令指定的数据库键时, 节点 A 会向客户端返回一个 ASK 错误, 指引客户端到节点 B 继续查找指定的数据库键。

ASK 错误与 MOVED 的区别在于:

  • MOVED 错误表示槽的负责权已经从一个节点转移到了另一个节点;
  • ASK 错误只是两个节点在迁移槽的过程中使用的一种临时措施。

判断 ASK 错误的过程如下图所示:

img

Redis Cluster 复制

Redis Cluster 中的节点分为主节点和从节点,其中主节点用于处理槽,而从节点则用于复制某个主节点,并在被复制的主节点下线时,代替下线主节点继续处理命令请求。

向一个节点发送命令 CLUSTER REPLICATE <node_id> 可以让接收命令的节点成为 node_id 所指定节点的从节点,并开始对主节点进行复制。

Redis Cluster 节点间的复制是“异步”的。

Redis Cluster 故障转移

故障检测

集群中每个节点都会定期向集群中的其他节点发送 PING 消息,以此来检测对方是否在线

节点的状态信息可以分为:

  • 在线状态;
  • 疑似下线状态(PFAIL) - 即在规定的时间内,没有应答 PING 消息
  • 已下线状态(FAIL) - 半数以上负责处理槽的主节点都将某个主节点视为“疑似下线”,则这个主节点将被标记为“已下线”

故障转移

  1. 下线主节点的所有从节点中,会有一个从节点被选中。
  2. 被选中的从节点会执行 SLAVEOF no one 命令,成为新的主节点。
  3. 新的主节点会撤销所有对已下线主节点的槽指派,并将这些槽全部指派给自己。
  4. 新的主节点向集群广播一条 PONG 消息,告知其他节点这个从节点已变成主节点。
  5. 新的主节点开始接收和自己负责处理的槽有关的命令请求,故障转移完成。

选主

Redis Sentinel 和 Redis Cluster 的选主流程非常相似,二者都基于Raft 协议 实现。

  1. 从节点发现自己的主节点状态为 FAIL
  2. 从节点将自己记录的纪元(epoch)加 1,并广播消息,要求所有收到消息且有投票权的主节点都为自己投票。——这里的纪元(epoch),相当于 Raft 协议中的选期(term)。因个人习惯,后面统一将纪元描述为选期。
  3. 如果某主节点具有投票权(它正在负责处理槽),并且这个主节点尚未投票,那么主节点就返回一条确认消息,表示支持该从节点成为新的主节点。
  4. 每个参与选举的从节点都会根据收到的确认消息,统计自己所得的选票。
  5. 假设集群中存在 N 个具有投票权的主节点,那么当某从节点得到“半数以上”(N / 2 + 1)的选票,则该从节点当选为新的主节点
  6. 由于每个选期中,任意具有投票权的主节点“只能投一票”,所以获得“半数以上”选票的从节点只能有一个。
  7. 如果在一个选期中,没有从节点能获得“半数以上”投票,则本次选期作废,开始进入下一个选期,直到选出新的主节点为止。

Redis Cluster 通信

集群中的节点通过发送和接收消息来进行通信。Redis Cluster 各实例之间的通信方式采用 Gossip 协议来实现。

Redis Cluster 采用 Gossip 协议基于两个主要目标:去中心化以及失败检测

Redis Cluster 中,每个节点之间都会同步信息,但是每个节点的信息不保证实时的,即无法保证数据强一致性,但是保证“数据最终一致性”——当集群中发生节点增减、故障、主从关系变化、槽信息变更等事件时,通过不断的通信,在经过一段时间后,所有的节点都会同步集群全部节点的最新状态。

Redis Cluster 节点发送的消息主要有以下五种:

  • MEET - 请求接收方加入发送方所在的集群。
  • PING - 集群中每个节点每隔一段时间(默认为一秒)从已知节点列表中随机选出五个节点,然后对这五个节点中最久没联系的节点发送 PING 消息,以此检测被选中的节点是否在线。
  • PONG - 当接收方收到发送方发来的 MEET 消息或 PING 消息时,会返回一条 PONG 消息作为应答。
  • FAIL - 当一个主节点 A 判断另一个主节点 B 已经进入 FAIL 状态时,节点 A 会向集群广播一条关于节点 B 的 FAIL 消息,所有收到这条消息的节点都会立即将节点 B 标记为已下线。
  • PUBLISH - 当节点收到一个 PUBLISH 命令时,节点会执行这个命令,并向集群广播一条 PUBLISH 消息,所有接受到这条消息的节点都会执行相同的 PUBLISH 命令。

Redis Cluster 应用

集群功能限制

Redis Cluster 相对 单机,存在一些功能限制,需要 开发人员 提前了解,在使用时做好规避。

  • key 批量操作 支持有限:类似 msetmget 操作,目前只支持对具有相同 slot 值的 key 执行 批量操作。对于 映射为不同 slot 值的 key 由于执行 mgetmget 等操作可能存在于多个节点上,因此不被支持。
  • key 事务操作 支持有限:只支持 key同一节点上事务操作,当多个 key 分布在 不同 的节点上时 无法 使用事务功能。
  • key 作为 数据分区 的最小粒度,不能将一个 大的键值 对象如 hashlist 等映射到 不同的节点
  • 不支持 多数据库空间单机 下的 Redis 可以支持 16 个数据库(db0 ~ db15),集群模式 下只能使用 一个 数据库空间,即 db0
  • 复制结构 只支持一层:从节点 只能复制 主节点,不支持 嵌套树状复制 结构。

集群规模限制

Redis Cluster 的优点是易于使用。分区、主从复制、弹性扩容这些功能都可以做到自动化,通过简单的部署就可以获得一个大容量、高可靠、高可用的 Redis 集群,并且对于应用来说,近乎于是透明的。

所以,Redis Cluster 非常适合构建中小规模 Redis 集群,这里的中小规模指的是,大概几个到几十个节点这样规模的 Redis 集群。

但是 Redis Cluster 不太适合构建超大规模集群,主要原因是,它采用了去中心化的设计。

Redis 的每个节点上,都保存了所有槽和节点的映射关系表,客户端可以访问任意一个节点,再通过重定向命令,找到数据所在的那个节点。那么,这个映射关系表是如何更新的呢?Redis Cluster 采用了一种去中心化的流言 (Gossip) 协议来传播集群配置的变化。

Gossip 协议的优点是去中心化;缺点是传播速度慢,并且是集群规模越大,传播的越慢。

集群配置

我们后面会部署一个 Redis Cluster 作为例子,在那之前,先介绍一下集群在 redis.conf 中的参数。

  • cluster-enabled <yes/no> - 如果配置”yes”则开启集群功能,此 redis 实例作为集群的一个节点,否则,它是一个普通的单一的 redis 实例。
  • cluster-config-file <filename> - 注意:虽然此配置的名字叫“集群配置文件”,但是此配置文件不能人工编辑,它是集群节点自动维护的文件,主要用于记录集群中有哪些节点、他们的状态以及一些持久化参数等,方便在重启时恢复这些状态。通常是在收到请求之后这个文件就会被更新。
  • cluster-node-timeout <milliseconds> - 这是集群中的节点能够失联的最大时间,超过这个时间,该节点就会被认为故障。如果主节点超过这个时间还是不可达,则用它的从节点将启动故障迁移,升级成主节点。注意,任何一个节点在这个时间之内如果还是没有连上大部分的主节点,则此节点将停止接收任何请求。
  • cluster-slave-validity-factor <factor> - 如果设置成0,则无论从节点与主节点失联多久,从节点都会尝试升级成主节点。如果设置成正数,则 cluster-node-timeout 乘以 cluster-slave-validity-factor 得到的时间,是从节点与主节点失联后,此从节点数据有效的最长时间,超过这个时间,从节点不会启动故障迁移。假设 cluster-node-timeout=5,cluster-slave-validity-factor=10,则如果从节点跟主节点失联超过 50 秒,此从节点不能成为主节点。注意,如果此参数配置为非 0,将可能出现由于某主节点失联却没有从节点能顶上的情况,从而导致集群不能正常工作,在这种情况下,只有等到原来的主节点重新回归到集群,集群才恢复运作。
  • cluster-migration-barrier <count> - 主节点需要的最小从节点数,只有达到这个数,主节点失败时,它从节点才会进行迁移。更详细介绍可以看本教程后面关于副本迁移到部分。
  • cluster-require-full-coverage <yes/no> - 在部分 key 所在的节点不可用时,如果此参数设置为”yes”(默认值), 则整个集群停止接受操作;如果此参数设置为”no”,则集群依然为可达节点上的 key 提供读操作。

其他 Redis 集群方案

Redis Cluster 不太适合用于大规模集群,所以,如果要构建超大 Redis 集群,需要选择替代方案。一般有三种方案类型:

  • 客户端分区方案
  • 代理分区方案
  • 查询路由方案

客户端分区方案

客户端 就已经决定数据会被 存储 到哪个 Redis 节点或者从哪个 Redis 节点 读取数据。其主要思想是采用 哈希算法 将 Redis 数据的 key 进行散列,通过 hash 函数,特定的 key映射 到特定的 Redis 节点上。

客户端分区方案 的代表为 Redis Sharding,Redis Sharding 是 Redis Cluster 出来之前,业界普遍使用的 Redis 多实例集群 方法。Java 的 Redis 客户端驱动库 Jedis,支持 Redis Sharding 功能,即 ShardedJedis 以及 结合缓存池 的 ShardedJedisPool。

  • 优点:不使用 第三方中间件分区逻辑 可控,配置 简单,节点之间无关联,容易 线性扩展,灵活性强。
  • 缺点客户端 无法 动态增删 服务节点,客户端需要自行维护 分发逻辑,客户端之间 无连接共享,会造成 连接浪费

代理分区方案

客户端 发送请求到一个 代理组件代理 解析 客户端 的数据,并将请求转发至正确的节点,最后将结果回复给客户端。

  • 优点:简化 客户端 的分布式逻辑,客户端 透明接入,切换成本低,代理的 转发存储 分离。
  • 缺点:多了一层 代理层,加重了 架构部署复杂度性能损耗

代理分区 主流实现的有方案有 TwemproxyCodis

Twemproxy

Twemproxy 也叫 nutcraker,是 Twitter 开源的一个 Redis 和 Memcache 的 中间代理服务器 程序。

Twemproxy 作为 代理,可接受来自多个程序的访问,按照 路由规则,转发给后台的各个 Redis 服务器,再原路返回。**Twemproxy** 存在 单点故障 问题,需要结合 Lvs 和 Keepalived 做 高可用方案

  • 优点:应用范围广,稳定性较高,中间代理层 高可用
  • 缺点:无法平滑地 水平扩容/缩容,无 可视化管理界面,运维不友好,出现故障,不能 自动转移

Codis

Codis 是一个 分布式 Redis 解决方案,对于上层应用来说,连接 Codis-Proxy 和直接连接 原生的 Redis-Server 没有的区别。Codis 底层会 处理请求的转发,不停机的进行 数据迁移 等工作。Codis 采用了无状态的 代理层,对于 客户端 来说,一切都是透明的。

  • 优点:实现了上层 Proxy 和底层 Redis 的 高可用数据分区自动平衡,提供 命令行接口 和 RESTful API,提供 监控管理 界面,可以动态 添加删除 Redis 节点。
  • 缺点部署架构配置 复杂,不支持 跨机房多租户,不支持 鉴权管理

查询路由方案

客户端随机地 请求任意一个 Redis 实例,然后由 Redis 将请求 转发正确 的 Redis 节点。Redis Cluster 实现了一种 混合形式查询路由,但并不是 直接 将请求从一个 Redis 节点 转发 到另一个 Redis 节点,而是在 客户端 的帮助下直接 重定向redirected)到正确的 Redis 节点。

  • 优点去中心化,数据按照 存储分布在多个 Redis 实例上,可以平滑的进行节点 扩容/缩容,支持 高可用自动故障转移,运维成本低。
  • 缺点:重度依赖 Redis-trib 工具,缺乏 监控管理,需要依赖 Smart Client (维护连接缓存路由表MultiOpPipeline 支持)。Failover 节点的 检测过慢,不如有 中心节点 的集群及时(如 ZooKeeper)。Gossip 消息采用广播方式,集群规模越大,开销越大。无法根据统计区分 冷热数据

参考资料

Redis 运维

Redis 是一个高性能的 key-value 数据库。

SET 操作每秒钟 110000 次;GET 操作每秒钟 81000 次。

Redis 安装

Window 下安装

下载地址:https://github.com/MSOpenTech/redis/releases

Redis 支持 32 位和 64 位。这个需要根据你系统平台的实际情况选择,这里我们下载 Redis-x64-xxx.zip压缩包到 C 盘,解压后,将文件夹重新命名为 redis

打开一个 cmd 窗口 使用 cd 命令切换目录到 C:\redis 运行 redis-server.exe redis.windows.conf

如果想方便的话,可以把 redis 的路径加到系统的环境变量里,这样就省得再输路径了,后面的那个 redis.windows.conf 可以省略,如果省略,会启用默认的。

这时候另启一个 cmd 窗口,原来的不要关闭,不然就无法访问服务端了。

切换到 redis 目录下运行 redis-cli.exe -h 127.0.0.1 -p 6379

Linux 下安装

下载地址: http://redis.io/download,下载最新文档版本。

下载、解压、编译 Redis

1
2
3
4
wget http://download.redis.io/releases/redis-5.0.4.tar.gz
tar xzf redis-5.0.4.tar.gz
cd redis-5.0.4
make

为了编译 Redis 源码,你需要 gcc-c++和 tcl。如果你的系统是 CentOS,可以直接执行命令:yum install -y gcc-c++ tcl 来安装。

进入到解压后的 src 目录,通过如下命令启动 Redis:

1
src/redis-server

您可以使用内置的客户端与 Redis 进行交互:

1
2
3
4
5
$ src/redis-cli
redis> set foo bar
OK
redis> get foo
"bar"

Ubuntu 下安装

在 Ubuntu 系统安装 Redis 可以使用以下命令:

1
2
sudo apt-get update
sudo apt-get install redis-server

开机启动

  • 开机启动配置:echo "/usr/local/bin/redis-server /etc/redis.conf" >> /etc/rc.local

开放防火墙端口

  • 添加规则:iptables -I INPUT -p tcp -m tcp --dport 6379 -j ACCEPT
  • 保存规则:service iptables save
  • 重启 iptables:service iptables restart

Redis 安装脚本

CentOS7 环境安装脚本:软件运维配置脚本集合

安装说明

  • 采用编译方式安装 Redis, 并将其注册为 systemd 服务
  • 安装路径为:/usr/local/redis
  • 默认下载安装 5.0.4 版本,端口号为:6379,密码为空

使用方法

  • 默认安装 - 执行以下任意命令即可:
1
2
curl -o- https://gitee.com/turnon/linux-tutorial/raw/master/codes/linux/soft/redis-install.sh | bash
wget -qO- https://gitee.com/turnon/linux-tutorial/raw/master/codes/linux/soft/redis-install.sh | bash
  • 自定义安装 - 下载脚本到本地,并按照以下格式执行:
1
sh redis-install.sh [version] [port] [password]

参数说明:

  • version - redis 版本号
  • port - redis 服务端口号
  • password - 访问密码

Redis 单机使用和配置

启动 Redis

启动 redis 服务

1
2
cd /usr/local/redis/src
./redis-server

启动 redis 客户端

1
2
cd /usr/local/redis/src
./redis-cli

查看 redis 是否启动

1
redis-cli

以上命令将打开以下终端:

1
redis 127.0.0.1:6379>

127.0.0.1 是本机 IP ,6379 是 redis 服务端口。现在我们输入 PING 命令。

1
2
redis 127.0.0.1:6379> ping
PONG

以上说明我们已经成功启动了 redis。

Redis 常见配置

Redis 默认的配置文件是根目录下的 redis.conf 文件。

如果需要指定特定文件作为配置文件,需要使用命令: ./redis-server -c xxx.conf

每次修改配置后,需要重启才能生效。

Redis 官方默认配置:

自 Redis2.6 起就可以直接通过命令行传递 Redis 配置参数。这种方法可以用于测试。自 Redis2.6 起就可以直接通过命令行传递 Redis 配置参数。这种方法可以用于测试。

设为守护进程

Redis 默认以非守护进程方式启动,而通常我们会将 Redis 设为守护进程启动方式,配置:daemonize yes

远程访问

Redis 默认绑定 127.0.0.1,这样就只能本机才能访问,若要 Redis 允许远程访问,需要配置:bind 0.0.0.0

设置密码

Redis 默认访问不需要密码,如果需要设置密码,需要如下配置:

  • protected-mode yes
  • requirepass <密码>

配置参数表

配置项 说明
daemonize no Redis 默认不是以守护进程的方式运行,可以通过该配置项修改,使用 yes 启用守护进程(Windows 不支持守护线程的配置为 no )
pidfile /var/run/redis.pid 当 Redis 以守护进程方式运行时,Redis 默认会把 pid 写入 /var/run/redis.pid 文件,可以通过 pidfile 指定
port 6379 指定 Redis 监听端口,默认端口为 6379,作者在自己的一篇博文中解释了为什么选用 6379 作为默认端口,因为 6379 在手机按键上 MERZ 对应的号码,而 MERZ 取自意大利歌女 Alessia Merz 的名字
bind 127.0.0.1 绑定的主机地址
timeout 300 当客户端闲置多长时间后关闭连接,如果指定为 0,表示关闭该功能
loglevel notice 指定日志记录级别,Redis 总共支持四个级别:debug、verbose、notice、warning,默认为 notice
logfile stdout 日志记录方式,默认为标准输出,如果配置 Redis 为守护进程方式运行,而这里又配置为日志记录方式为标准输出,则日志将会发送给 /dev/null
databases 16 设置数据库的数量,默认数据库为 0,可以使用 SELECT 命令在连接上指定数据库 id
save <seconds> <changes> Redis 默认配置文件中提供了三个条件:save 900 1save 300 10save 60 10000 分别表示 900 秒(15 分钟)内有 1 个更改,300 秒(5 分钟)内有 10 个更改以及 60 秒内有 10000 个更改。 指定在多长时间内,有多少次更新操作,就将数据同步到数据文件,可以多个条件配合
rdbcompression yes 指定存储至本地数据库时是否压缩数据,默认为 yes,Redis 采用 LZF 压缩,如果为了节省 CPU 时间,可以关闭该选项,但会导致数据库文件变的巨大
dbfilename dump.rdb 指定本地数据库文件名,默认值为 dump.rdb
dir ./ 指定本地数据库存放目录
slaveof <masterip> <masterport> 设置当本机为 slav 服务时,设置 master 服务的 IP 地址及端口,在 Redis 启动时,它会自动从 master 进行数据同步
masterauth <master-password> 当 master 服务设置了密码保护时,slav 服务连接 master 的密码
requirepass foobared 设置 Redis 连接密码,如果配置了连接密码,客户端在连接 Redis 时需要通过 AUTH <password> 命令提供密码,默认关闭
maxclients 128 设置同一时间最大客户端连接数,默认无限制,Redis 可以同时打开的客户端连接数为 Redis 进程可以打开的最大文件描述符数,如果设置 maxclients 0,表示不作限制。当客户端连接数到达限制时,Redis 会关闭新的连接并向客户端返回 max number of clients reached 错误信息
maxmemory <bytes> 指定 Redis 最大内存限制,Redis 在启动时会把数据加载到内存中,达到最大内存后,Redis 会先尝试清除已到期或即将到期的 Key,当此方法处理 后,仍然到达最大内存设置,将无法再进行写入操作,但仍然可以进行读取操作。Redis 新的 vm 机制,会把 Key 存放内存,Value 会存放在 swap 区
appendonly no 指定是否在每次更新操作后进行日志记录,Redis 在默认情况下是异步的把数据写入磁盘,如果不开启,可能会在断电时导致一段时间内的数据丢失。因为 redis 本身同步数据文件是按上面 save 条件来同步的,所以有的数据会在一段时间内只存在于内存中。默认为 no
appendfilename appendonly.aof 指定更新日志文件名,默认为 appendonly.aof
appendfsync everysec 指定更新日志条件,共有 3 个可选值:no:表示等操作系统进行数据缓存同步到磁盘(快)always:表示每次更新操作后手动调用 fsync() 将数据写到磁盘(慢,安全)everysec:表示每秒同步一次(折中,默认值)
vm-enabled no 指定是否启用虚拟内存机制,默认值为 no,简单的介绍一下,VM 机制将数据分页存放,由 Redis 将访问量较少的页即冷数据 swap 到磁盘上,访问多的页面由磁盘自动换出到内存中(在后面的文章我会仔细分析 Redis 的 VM 机制)
vm-swap-file /tmp/redis.swap 虚拟内存文件路径,默认值为 /tmp/redis.swap,不可多个 Redis 实例共享
vm-max-memory 0 将所有大于 vm-max-memory 的数据存入虚拟内存,无论 vm-max-memory 设置多小,所有索引数据都是内存存储的(Redis 的索引数据 就是 keys),也就是说,当 vm-max-memory 设置为 0 的时候,其实是所有 value 都存在于磁盘。默认值为 0
vm-page-size 32 Redis swap 文件分成了很多的 page,一个对象可以保存在多个 page 上面,但一个 page 上不能被多个对象共享,vm-page-size 是要根据存储的 数据大小来设定的,作者建议如果存储很多小对象,page 大小最好设置为 32 或者 64bytes;如果存储很大大对象,则可以使用更大的 page,如果不确定,就使用默认值
vm-pages 134217728 设置 swap 文件中的 page 数量,由于页表(一种表示页面空闲或使用的 bitmap)是在放在内存中的,,在磁盘上每 8 个 pages 将消耗 1byte 的内存。
vm-max-threads 4 设置访问 swap 文件的线程数,最好不要超过机器的核数,如果设置为 0,那么所有对 swap 文件的操作都是串行的,可能会造成比较长时间的延迟。默认值为 4
glueoutputbuf yes 设置在向客户端应答时,是否把较小的包合并为一个包发送,默认为开启
hash-max-zipmap-entries 64 hash-max-zipmap-value 512 指定在超过一定的数量或者最大的元素超过某一临界值时,采用一种特殊的哈希算法
activerehashing yes 指定是否激活重置哈希,默认为开启(后面在介绍 Redis 的哈希算法时具体介绍)
include /path/to/local.conf 指定包含其它的配置文件,可以在同一主机上多个 Redis 实例之间使用同一份配置文件,而同时各个实例又拥有自己的特定配置文件

压力测试

参考官方文档:How fast is Redis?

Redis 自带了一个性能测试工具:redis-benchmark

(1)基本测试

1
redis-benchmark -q -n 100000
  • -q 表示静默(quiet)执行
  • -n 100000 请求 10 万次

(2)测试指定读写指令

1
2
3
$ redis-benchmark -t set,lpush -n 100000 -q
SET: 74239.05 requests per second
LPUSH: 79239.30 requests per second

(3)测试 pipeline 模式下指定读写指令

1
2
3
redis-benchmark -n 1000000 -t set,get -P 16 -q
SET: 403063.28 requests per second
GET: 508388.41 requests per second

Redis 集群使用和配置

Redis 3.0 后支持集群模式。

集群规划

Redis 集群一般由 多个节点 组成,节点数量至少为 6 个,才能保证组成 完整高可用 的集群。

理想情况当然是所有节点各自在不同的机器上,首先于资源,本人在部署 Redis 集群时,只得到 3 台服务器。所以,我计划每台服务器部署 2 个 Redis 节点。

【示例】最简高可用 Redis 集群规划

机器配置:16G 内存 + 8 核 CPU + 1T 磁盘

Redis 进程分配 10 G 内存。一般线上生产环境,Redis 的内存尽量不要超过 10g,超过 10g 可能会有问题。

集群拓扑:三主三从;三哨兵,每个哨兵监听所有主节点。

估算性能:

  • 容量:三主,占用 30 G 内存,所以最大存储容量为 30 G。假设每条数据记录平均 大小为 10 K,则最大能存储 300 万条数据。
  • 吞吐量:单机一般 TPS/QPS 为 五万到八万左右。假设为五万,那么三主三从架构理论上能达到 TPS 15 万,QPS 30 万。

部署集群

Redis 集群节点的安装与单节点服务相同,差异仅在于部署方式。

注意:为了演示方便,本示例将所有 Redis 集群节点都部署在一台机器上,实际生产环境中,基本都会将节点部署在不同机器上。要求更高的,可能还要考虑多机房部署。

(1)创建节点目录

我个人偏好将软件放在 /opt 目录下,在我的机器中,Redis 都安装在 /usr/local/redis 目录下。所以,下面的命令和配置都假设 Redis 安装目录为 /usr/local/redis

确保机器上已经安装了 Redis 后,执行以下命令,创建 Redis 集群节点实例目录:

1
2
3
4
5
6
sudo mkdir -p /usr/local/redis/conf/7001
sudo mkdir -p /usr/local/redis/conf/7002
sudo mkdir -p /usr/local/redis/conf/7003
sudo mkdir -p /usr/local/redis/conf/7004
sudo mkdir -p /usr/local/redis/conf/7005
sudo mkdir -p /usr/local/redis/conf/7006

(2)配置集群节点

每个实例目录下,新建 redis.conf 配置文件。

实例配置模板以 7001 节点为例(其他节点,完全替换配置中的端口号 7001 即可),如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 端口号
port 7001
# 绑定的主机端口(0.0.0.0 表示允许远程访问)
bind 0.0.0.0
# 以守护进程方式启动
daemonize yes

# 开启集群模式
cluster-enabled yes
# 集群的配置,配置文件首次启动自动生成
cluster-config-file /usr/local/redis/conf/7001/7001.conf
# 请求超时时间,设置 10 秒
cluster-node-timeout 10000

# 开启 AOF 持久化
appendonly yes
# 数据存放目录
dir /usr/local/redis/conf/7001
# 进程文件
pidfile /usr/local/redis/conf/7001/7001.pid
# 日志文件
logfile /usr/local/redis/conf/7001/7001.log

(3)批量启动 Redis 节点

Redis 的 utils/create-cluster 目录下自带了一个名为 create-cluster 的脚本工具,可以利用它来新建、启动、停止、重启 Redis 节点。

脚本中有几个关键参数:

  • PORT=30000 - 初始端口号
  • TIMEOUT=2000 - 超时时间
  • NODES=6 - 节点数
  • REPLICAS=1 - 备份数

脚本中的每个命令项会根据初始端口号,以及设置的节点数,遍历的去执行操作。

由于前面的规划中,节点端口是从 7001 ~ 7006,所以需要将 PORT 变量设为 7000。

脚本中启动每个 Redis 节点是通过指定命令行参数来配置属性。所以,我们需要改一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
PORT=7000
TIMEOUT=2000
NODES=6
ENDPORT=$((PORT+NODES))

# ...

if [ "$1" == "start" ]
then
while [ $((PORT < ENDPORT)) != "0" ]; do
PORT=$((PORT+1))
echo "Starting $PORT"
/usr/local/redis/src/redis-server /usr/local/redis/conf/${PORT}/redis.conf
done
exit 0
fi

好了,在每台服务器上,都执行 ./create-cluster start 来启动节点。

然后,通过 ps 命令来确认 Redis 进程是否已经工作:

1
2
3
4
5
6
7
8
# root @ dbClusterDev01 in /usr/local/redis/conf [11:07:55]
$ ps -ef | grep redis
root 4604 1 0 11:07 ? 00:00:00 /opt/redis/src/redis-server 0.0.0.0:7001 [cluster]
root 4609 1 0 11:07 ? 00:00:00 /opt/redis/src/redis-server 0.0.0.0:7002 [cluster]
root 4614 1 0 11:07 ? 00:00:00 /opt/redis/src/redis-server 0.0.0.0:7003 [cluster]
root 4619 1 0 11:07 ? 00:00:00 /opt/redis/src/redis-server 0.0.0.0:7004 [cluster]
root 4624 1 0 11:07 ? 00:00:00 /opt/redis/src/redis-server 0.0.0.0:7005 [cluster]
root 4629 1 0 11:07 ? 00:00:00 /opt/redis/src/redis-server 0.0.0.0:7006 [cluster]

(4)启动集群

通过 redis-cli --cluster create 命令可以自动配置集群,如下:

1
./redis-cli --cluster create 127.0.0.1:7001 127.0.0.1:7002 127.0.0.2:7003 127.0.0.2:7004 127.0.0.3:7005 127.0.0.3:7006 --cluster-replicas 1

redis-cluster 会根据设置的节点数和副本数自动分片(分配 Hash 虚拟槽 slot),如果满意,输入 yes ,直接开始分片。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
>>> Performing hash slots allocation on 6 nodes...
Master[0] -> Slots 0 - 5460
Master[1] -> Slots 5461 - 10922
Master[2] -> Slots 10923 - 16383
Adding replica 127.0.0.2:7004 to 127.0.0.1:7001
Adding replica 127.0.0.3:7006 to 127.0.0.2:7003
Adding replica 127.0.0.1:7002 to 127.0.0.3:7005
M: b721235997deb6b9a7a2be690b5b9663db8057c6 127.0.0.1:7001
slots:[0-5460] (5461 slots) master
S: bda9b7036df0bbefe601bda4ce45d3787a2e9bd9 127.0.0.1:7002
replicates 3623fff69b5243ed18c02a2fbb6f53069b0f1505
M: 91523c0391a044da6cc9f53bb965aabe89502187 127.0.0.2:7003
slots:[5461-10922] (5462 slots) master
S: 9d899cbe49dead7b8c4f769920cdb75714a441ae 127.0.0.2:7004
replicates b721235997deb6b9a7a2be690b5b9663db8057c6
M: 3623fff69b5243ed18c02a2fbb6f53069b0f1505 127.0.0.3:7005
slots:[10923-16383] (5461 slots) master
S: a2869dc153ea4977ca790b76483574a5d56cb40e 127.0.0.3:7006
replicates 91523c0391a044da6cc9f53bb965aabe89502187
Can I set the above configuration? (type 'yes' to accept): yes
>>> Nodes configuration updated
>>> Assign a different config epoch to each node
>>> Sending CLUSTER MEET messages to join the cluster
Waiting for the cluster to join
....
>>> Performing Cluster Check (using node 127.0.0.1:7001)
M: b721235997deb6b9a7a2be690b5b9663db8057c6 127.0.0.1:7001
slots:[0-5460] (5461 slots) master
1 additional replica(s)
S: a2869dc153ea4977ca790b76483574a5d56cb40e 127.0.0.1:7006
slots: (0 slots) slave
replicates 91523c0391a044da6cc9f53bb965aabe89502187
M: 91523c0391a044da6cc9f53bb965aabe89502187 127.0.0.1:7003
slots:[5461-10922] (5462 slots) master
1 additional replica(s)
M: 3623fff69b5243ed18c02a2fbb6f53069b0f1505 127.0.0.1:7005
slots:[10923-16383] (5461 slots) master
1 additional replica(s)
S: 9d899cbe49dead7b8c4f769920cdb75714a441ae 127.0.0.1:7004
slots: (0 slots) slave
replicates b721235997deb6b9a7a2be690b5b9663db8057c6
S: bda9b7036df0bbefe601bda4ce45d3787a2e9bd9 127.0.0.1:7002
slots: (0 slots) slave
replicates 3623fff69b5243ed18c02a2fbb6f53069b0f1505
[OK] All nodes agree about slots configuration.
>>> Check for open slots...
>>> Check slots coverage...
[OK] All 16384 slots covered.

(5)日常维护操作

  • 关闭集群 - ./create-cluster stop
  • 检查集群是否健康(指定任意节点即可):./redis-cli --cluster check <ip:port>
  • 尝试修复集群节点:./redis-cli --cluster fix <ip:port>

部署哨兵

redis-cluster 实现了 Redis 的分片、复制。

但 redis-cluster 没有解决故障转移问题,一旦任意分片的 Master 节点宕机、网络不通,就会导致 redis-cluster 的集群不能工作。为了解决高可用的问题,Redis 提供了 Redis 哨兵来监控 Redis 节点状态,并且会在 Master 宕机时,发起选举,将这个 Master 的一个 Slave 节点选举为 Master。

(1)创建节点目录

我个人偏好将软件放在 /opt 目录下,在我的机器中,Redis 都安装在 /usr/local/redis 目录下。所以,下面的命令和配置都假设 Redis 安装目录为 /usr/local/redis

确保机器上已经安装了 Redis 后,执行以下命令,创建 Redis 集群节点实例目录:

1
2
3
sudo mkdir -p /usr/local/redis/conf/27001
sudo mkdir -p /usr/local/redis/conf/27002
sudo mkdir -p /usr/local/redis/conf/27003

(2)配置集群节点

每个实例目录下,新建 redis.conf 配置文件。

实例配置模板以 7001 节点为例(其他节点,完全替换配置中的端口号 7001 即可),如下:

1
2
3
4
5
6
7
8
port 27001
daemonize yes
sentinel monitor redis-master 172.22.6.3 7001 2
sentinel down-after-milliseconds redis-master 5000
sentinel failover-timeout redis-master 900000
sentinel parallel-syncs redis-master 1
#sentinel auth-pass redis-master 123456
logfile /usr/local/redis/conf/27001/27001.log

(3)批量启动哨兵节点

1
2
3
/opt/redis/src/redis-sentinel /usr/local/redis/conf/27001/sentinel.conf
/opt/redis/src/redis-sentinel /usr/local/redis/conf/27002/sentinel.conf
/opt/redis/src/redis-sentinel /usr/local/redis/conf/27003/sentinel.conf

扩容

(1)查看信息

进入任意节点

1
./redis-cli -h 172.22.6.3 -p 7001

cluster info 查看集群节点状态

1
2
3
4
5
6
7
172.22.6.3:7001> cluster nodes
f158bf70bb2767cac271ce4efcfc14ba0b7ca98b 172.22.6.3:7006@17006 slave e7aa182e756b76ec85b471797db9b66e4b2da725 0 1594528179000 6 connected
f348e67648460c7a800120d69b4977bf2e4524cb 172.22.6.3:7001@17001 myself,master - 0 1594528179000 1 connected 0-5460
52601e2d4af0e64b83f4cc6d20e8316d0ac38b99 172.22.6.3:7004@17004 slave 4802fafe897160c46392c6e569d6f5e466cca696 0 1594528178000 4 connected
c6c6a68674ae8aac3c6ec792c8af4dc1228c6c31 172.22.6.3:7005@17005 slave f348e67648460c7a800120d69b4977bf2e4524cb 0 1594528179852 5 connected
e7aa182e756b76ec85b471797db9b66e4b2da725 172.22.6.3:7002@17002 master - 0 1594528178000 2 connected 5461-10922
4802fafe897160c46392c6e569d6f5e466cca696 172.22.6.3:7003@17003 master - 0 1594528178000 3 connected 10923-16383

cluster info 查看集群信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
172.22.6.3:7001> cluster info
cluster_state:ok
cluster_slots_assigned:16384
cluster_slots_ok:16384
cluster_slots_pfail:0
cluster_slots_fail:0
cluster_known_nodes:6
cluster_size:3
cluster_current_epoch:6
cluster_my_epoch:1
cluster_stats_messages_ping_sent:3406
cluster_stats_messages_pong_sent:3569
cluster_stats_messages_publish_sent:5035
cluster_stats_messages_sent:12010
cluster_stats_messages_ping_received:3564
cluster_stats_messages_pong_received:3406
cluster_stats_messages_meet_received:5
cluster_stats_messages_publish_received:5033
cluster_stats_messages_received:12008

(2)添加节点到集群

将已启动的节点实例添加到集群中

1
redis-cli --cluster add-node 127.0.0.1:7007 127.0.0.1:7008

添加主节点

添加一组主节点

1
2
3
./redis-cli --cluster add-node 172.22.6.3:7007 172.22.6.3:7001
./redis-cli --cluster add-node 172.22.6.3:7008 172.22.6.3:7001
./redis-cli --cluster add-node 172.22.6.3:7009 172.22.6.3:7001

查看节点状态

1
2
3
4
5
6
7
8
9
10
172.22.6.3:7001> cluster nodes
f158bf70bb2767cac271ce4efcfc14ba0b7ca98b 172.22.6.3:7006@17006 slave e7aa182e756b76ec85b471797db9b66e4b2da725 0 1594529342575 6 connected
f348e67648460c7a800120d69b4977bf2e4524cb 172.22.6.3:7001@17001 myself,master - 0 1594529340000 1 connected 0-5460
55cacf121662833a4a19dbeb4a5df712cfedf77f 172.22.6.3:7009@17009 master - 0 1594529342000 0 connected
c6c6a68674ae8aac3c6ec792c8af4dc1228c6c31 172.22.6.3:7005@17005 slave f348e67648460c7a800120d69b4977bf2e4524cb 0 1594529341573 5 connected
4802fafe897160c46392c6e569d6f5e466cca696 172.22.6.3:7003@17003 master - 0 1594529343577 3 connected 10923-16383
e7aa182e756b76ec85b471797db9b66e4b2da725 172.22.6.3:7002@17002 master - 0 1594529342000 2 connected 5461-10922
e5ba78fe629115977a74fbbe1478caf8868d6d55 172.22.6.3:7007@17007 master - 0 1594529341000 0 connected
52601e2d4af0e64b83f4cc6d20e8316d0ac38b99 172.22.6.3:7004@17004 slave 4802fafe897160c46392c6e569d6f5e466cca696 0 1594529340000 4 connected
79d4fffc2cec210556c3b4c44e63ab506e87eda3 172.22.6.3:7008@17008 master - 0 1594529340000 7 connected

可以发现,新加入的三个主节点,还没有分配哈希槽,所以,暂时还无法访问。

添加从节点

–slave:设置该参数,则新节点以 slave 的角色加入集群
–master-id:这个参数需要设置了–slave 才能生效,–master-id 用来指定新节点的 master 节点。如果不设置该参数,则会随机为节点选择 master 节点。

语法

1
2
redis-cli --cluster add-node  新节点IP地址:端口    存在节点IP:端口 --cluster-slave (从节点) --cluster-master-id (master节点的ID)
redis-cli --cluster add-node 10.42.141.119:6379 10.42.166.105:6379 --cluster-slave --cluster-master-id dfa238fff8a7a49230cff7eb74f573f5645c8ec5

示例

1
2
3
./redis-cli --cluster add-node 172.22.6.3:7010 172.22.6.3:7007 --cluster-slave
./redis-cli --cluster add-node 172.22.6.3:7011 172.22.6.3:7008 --cluster-slave
./redis-cli --cluster add-node 172.22.6.3:7012 172.22.6.3:7009 --cluster-slave

查看状态

1
2
3
4
5
6
7
8
9
10
11
12
13
172.22.6.3:7001> cluster nodes
ef5c1b9ce4cc795dc12b2c1e8736a572647b4c3e 172.22.6.3:7011@17011 slave 79d4fffc2cec210556c3b4c44e63ab506e87eda3 0 1594529492043 7 connected
f158bf70bb2767cac271ce4efcfc14ba0b7ca98b 172.22.6.3:7006@17006 slave e7aa182e756b76ec85b471797db9b66e4b2da725 0 1594529491943 6 connected
f348e67648460c7a800120d69b4977bf2e4524cb 172.22.6.3:7001@17001 myself,master - 0 1594529488000 1 connected 0-5460
5140d1129ed850df59c51cf818c4eb74545d9959 172.22.6.3:7010@17010 slave e5ba78fe629115977a74fbbe1478caf8868d6d55 0 1594529488000 0 connected
55cacf121662833a4a19dbeb4a5df712cfedf77f 172.22.6.3:7009@17009 master - 0 1594529488000 8 connected
c6c6a68674ae8aac3c6ec792c8af4dc1228c6c31 172.22.6.3:7005@17005 slave f348e67648460c7a800120d69b4977bf2e4524cb 0 1594529490000 5 connected
4802fafe897160c46392c6e569d6f5e466cca696 172.22.6.3:7003@17003 master - 0 1594529489939 3 connected 10923-16383
e7aa182e756b76ec85b471797db9b66e4b2da725 172.22.6.3:7002@17002 master - 0 1594529491000 2 connected 5461-10922
e5ba78fe629115977a74fbbe1478caf8868d6d55 172.22.6.3:7007@17007 master - 0 1594529490942 0 connected
52601e2d4af0e64b83f4cc6d20e8316d0ac38b99 172.22.6.3:7004@17004 slave 4802fafe897160c46392c6e569d6f5e466cca696 0 1594529491000 4 connected
02e9f57b5b45c350dc57acf1c8efa8db136db7b7 172.22.6.3:7012@17012 master - 0 1594529489000 0 connected
79d4fffc2cec210556c3b4c44e63ab506e87eda3 172.22.6.3:7008@17008 master - 0 1594529489000 7 connected

分配哈希槽

执行 ./redis-cli --cluster rebalance 172.22.6.3:7001 --cluster-threshold 1 --cluster-use-empty-masters

参数说明:

rebalance:表明让 Redis 自动根据节点数进行均衡哈希槽分配。

–cluster-use-empty-masters:表明

img

执行结束后,查看状态:

img

Redis 命令

通用命令

命令详细用法,请参考 Redis 命令官方文档

搬迁两张 cheat sheet 图,原址:https://www.cheatography.com/tasjaevan/cheat-sheets/redis/

集群命令

  • 集群
    • cluster info - 打印集群的信息
    • cluster nodes - 列出集群当前已知的所有节点( node),以及这些节点的相关信息。
  • 节点
    • cluster meet <ip> <port> - 将 ip 和 port 所指定的节点添加到集群当中,让它成为集群的一份子。
    • cluster forget <node_id> - 从集群中移除 node_id 指定的节点。
    • cluster replicate <node_id> - 将当前节点设置为 node_id 指定的节点的从节点。
    • cluster saveconfig - 将节点的配置文件保存到硬盘里面。
  • 槽(slot)
    • cluster addslots <slot> [slot ...] - 将一个或多个槽( slot)指派( assign)给当前节点。
    • cluster delslots <slot> [slot ...] - 移除一个或多个槽对当前节点的指派。
    • cluster flushslots - 移除指派给当前节点的所有槽,让当前节点变成一个没有指派任何槽的节点。
    • cluster setslot <slot> node <node_id> - 将槽 slot 指派给 node_id 指定的节点,如果槽已经指派给另一个节点,那么先让另一个节点删除该槽>,然后再进行指派。
    • cluster setslot <slot> migrating <node_id> - 将本节点的槽 slot 迁移到 node_id 指定的节点中。
    • cluster setslot <slot> importing <node_id> - 从 node_id 指定的节点中导入槽 slot 到本节点。
    • cluster setslot <slot> stable - 取消对槽 slot 的导入( import)或者迁移( migrate)。
    • cluster keyslot <key> - 计算键 key 应该被放置在哪个槽上。
    • cluster countkeysinslot <slot> - 返回槽 slot 目前包含的键值对数量。
    • cluster getkeysinslot <slot> <count> - 返回 count 个 slot 槽中的键。

重新分片

添加节点:./redis-cli –cluster add-node 192.168.1.136:7007 192.168.1.136:7001 –cluster-slave

redis-cli –cluster reshard 172.22.6.3 7001

客户端

推荐使用 RedisDesktopManager

参考资料