Dunwu Blog

大道至简,知易行难

Kibana 运维

通过 Kibana,您可以对自己的 Elasticsearch 进行可视化,还可以在 Elastic Stack 中进行导航,这样您便可以进行各种操作了,从跟踪查询负载,到理解请求如何流经您的整个应用,都能轻松完成。

1. 安装

1.1. 环境要求

版本:Elastic Stack 7.4

1.2. 安装步骤

安装步骤如下:

  1. kibana 官方下载地址下载所需版本包并解压到本地。
  2. 修改 config/kibana.yml 配置文件,设置 elasticsearch.url 指向 Elasticsearch 实例。
  3. 运行 bin/kibana (Windows 上运行 bin\kibana.bat
  4. 在浏览器上访问 http://localhost:5601

2. 使用

2.1. 检索

单击侧面导航栏中的 检索(Discover) ,可以显示 Kibana 的数据查询功能功能。

img

在搜索栏中,您可以输入 Elasticsearch 查询条件来搜索您的数据。您可以在 Discover 页面中浏览结果并在 Visualize 页面中创建已保存搜索条件的可视化。

当前索引模式显示在查询栏下方。索引模式确定提交查询时搜索哪些索引。要搜索一组不同的索引,请从下拉菜单中选择不同的模式。要添加索引模式(index pattern),请转至 Management/Kibana/Index Patterns 并单击 Add New

您可以使用字段名称和您感兴趣的值构建搜索。对于数字字段,可以使用比较运算符,如大于(>),小于(<)或等于(=)。您可以将元素与逻辑运算符 ANDORNOT 链接,全部使用大写。

默认情况下,每个匹配文档都显示所有字段。要选择要显示的文档字段,请将鼠标悬停在“可用字段”列表上,然后单击要包含的每个字段旁边的添加按钮。例如,如果只添加 account_number,则显示将更改为包含五个帐号的简单列表:

img

kibana 的搜索栏遵循 query-string-syntax 文档中所说明的查询语义。

这里说明一些最基本的查询语义。

查询字符串会被解析为一系列的术语和运算符。一个术语可以是一个单词(如:quick、brown)或用双引号包围的短语(如”quick brown”)。

查询操作允许您自定义搜索 - 下面介绍了可用的选项。

2.1.1. 字段名称

正如查询字符串查询中所述,将在搜索条件中搜索 default_field,但可以在查询语法中指定其他字段:

例如:

  • 查询 status 字段中包含 active 关键字
1
status:active
  • title 字段包含 quickbrown 关键字。如果您省略 OR 运算符,则将使用默认运算符
1
2
title:(quick OR brown)
title:(quick brown)
  • author 字段查找精确的短语 “john smith”,即精确查找。
1
author:"John Smith"
  • 任意字段 book.titlebook.contentbook.date 都包含 quickbrown(注意我们需要如何使用 \* 表示通配符)
1
book.\*:(quick brown)
  • title 字段包含任意非 null 值
1
_exists_:title

2.1.2. 通配符

ELK 提供了 ? 和 * 两个通配符。

  • ? 表示任意单个字符;
  • * 表示任意零个或多个字符。
1
qu?ck bro*

注意:通配符查询会使用大量的内存并且执行性能较为糟糕,所以请慎用。 > 提示:纯通配符 * 被写入 exsits 查询,从而提高了查询效率。因此,通配符 field:* 将匹配包含空值的文档,如:{“field”:“”},但是如果字段丢失或显示将值置为 null 则不匹配,如:“field”:null} > 提示:在一个单词的开头(例如:*ing)使用通配符这种方式的查询量特别大,因为索引中的所有术语都需要检查,以防万一匹配。通过将 allow_leading_wildcard 设置为 false,可以禁用。

2.1.3. 正则表达式

可以通过 / 将正则表达式包裹在查询字符串中进行查询

例:

1
name:/joh?n(ath[oa]n)/

支持的正则表达式语义可以参考:Regular expression syntax

2.1.4. 模糊查询

我们可以使用 ~ 运算符来进行模糊查询。

例:

假设我们实际想查询

1
quick brown forks

但是,由于拼写错误,我们的查询关键字变成如下情况,依然可以查到想要的结果。

1
quikc\~ brwn\~ foks\~

这种模糊查询使用 Damerau-Levenshtein 距离来查找所有匹配最多两个更改的项。所谓的更改是指单个字符的插入,删除或替换,或者两个相邻字符的换位。

默认编辑距离为 2,但编辑距离为 1 应足以捕捉所有人类拼写错误的 80%。它可以被指定为:

1
quikc\~1

2.1.5. 近似检索

尽管短语查询(例如,john smith)期望所有的词条都是完全相同的顺序,但是近似查询允许指定的单词进一步分开或以不同的顺序排列。与模糊查询可以为单词中的字符指定最大编辑距离一样,近似搜索也允许我们指定短语中单词的最大编辑距离:

1
"fox quick"\~5

字段中的文本越接近查询字符串中指定的原始顺序,该文档就越被认为是相关的。当与上面的示例查询相比时,短语 "quick fox" 将被认为比 "quick brown fox" 更近似查询条件。

2.1.6. 范围

可以为日期,数字或字符串字段指定范围。闭区间范围用方括号 [min TO max] 和开区间范围用花括号 {min TO max} 来指定。

我们不妨来看一些示例。

  • 2012 年的所有日子
1
date:[2012-01-01 TO 2012-12-31]
  • 数字 1 到 5
1
count:[1 TO 5]
  • alphaomega 之间的标签,不包括 alphaomega
1
tags:{alpha TO omega}
  • 10 以上的数字
1
count:[10 TO *]
  • 2012 年以前的所有日期
1
date:{* TO 2012-01-01}

此外,开区间和闭区间也可以组合使用

  • 数组 1 到 5,但不包括 5
1
count:[1 TO 5}

一边无界的范围也可以使用以下语法:

1
2
3
4
age:>10
age:>=10
age:<10
age:<=10

当然,你也可以使用 AND 运算符来得到连个查询结果的交集

1
2
age:(>=10 AND <20)
age:(+>=10 +<20)

2.1.7. Boosting

使用操作符 ^ 使一个术语比另一个术语更相关。例如,如果我们想查找所有有关狐狸的文档,但我们对狐狸特别感兴趣:

1
quick^2 fox

默认提升值是 1,但可以是任何正浮点数。 0 到 1 之间的提升减少了相关性。

增强也可以应用于短语或组:

1
"john smith"^2   (foo bar)^4

2.1.8. 布尔操作

默认情况下,只要一个词匹配,所有词都是可选的。搜索 foo bar baz 将查找包含 foobarbaz 中的一个或多个的任何文档。我们已经讨论了上面的default_operator,它允许你强制要求所有的项,但也有布尔运算符可以在查询字符串本身中使用,以提供更多的控制。

首选的操作符是 +(此术语必须存在)和 - (此术语不得存在)。所有其他条款是可选的。例如,这个查询:

1
quick brown +fox -news

这条查询意味着:

  • fox 必须存在
  • news 必须不存在
  • quick 和 brown 是可有可无的

熟悉的运算符 ANDORNOT(也写成 &&||!)也被支持。然而,这些操作符有一定的优先级:NOT 优先于 ANDAND 优先于 OR。虽然 +- 仅影响运算符右侧的术语,但 ANDOR 会影响左侧和右侧的术语。

2.1.9. 分组

多个术语或子句可以用圆括号组合在一起,形成子查询

1
(quick OR brown) AND fox

可以使用组来定位特定的字段,或者增强子查询的结果:

1
status:(active OR pending) title:(full text search)^2

2.1.10. 保留字

如果你需要使用任何在你的查询本身中作为操作符的字符(而不是作为操作符),那么你应该用一个反斜杠来转义它们。例如,要搜索(1 + 1)= 2,您需要将查询写为 \(1\+1\)\=2

保留字符是:+ - = && || > < ! ( ) { } [ ] ^ " ~ * ? : \ /

无法正确地转义这些特殊字符可能会导致语法错误,从而阻止您的查询运行。

2.1.11. 空查询

如果查询字符串为空或仅包含空格,则查询将生成一个空的结果集。

2.2. 可视化

要想使用可视化的方式展示您的数据,请单击侧面导航栏中的 可视化(Visualize)

Visualize 工具使您能够以多种方式(如饼图、柱状图、曲线图、分布图等)查看数据。要开始使用,请点击蓝色的 Create a visualization+ 按钮。

https://www.elastic.co/guide/en/kibana/6.1/images/tutorial-visualize-landing.png

有许多可视化类型可供选择。

https://www.elastic.co/guide/en/kibana/6.1/images/tutorial-visualize-wizard-step-1.png

下面,我们来看创建几个图标示例:

2.2.1. Pie

您可以从保存的搜索中构建可视化文件,也可以输入新的搜索条件。要输入新的搜索条件,首先需要选择一个索引模式来指定要搜索的索引。

默认搜索匹配所有文档。最初,一个“切片”包含整个饼图:

https://www.elastic.co/guide/en/kibana/6.1/images/tutorial-visualize-pie-1.png

要指定在图表中展示哪些数据,请使用 Elasticsearch 存储桶聚合。分组汇总只是将与您的搜索条件相匹配的文档分类到不同的分类中,也称为分组。

为每个范围定义一个存储桶:

  1. 单击 Split Slices
  2. Aggregation 列表中选择 Terms。_注意:这里的 Terms 是 Elk 采集数据时定义好的字段或标签_。
  3. Field 列表中选择 level.keyword
  4. 点击 images/apply-changes-button.png 按钮来更新图表。

image.png

完成后,如果想要保存这个图表,可以点击页面最上方一栏中的 Save 按钮。

2.2.2. Vertical Bar

我们在展示一下如何创建柱状图。

  1. 点击蓝色的 Create a visualization+ 按钮。选择 Vertical Bar
  2. 选择索引模式。由于您尚未定义任何 bucket ,因此您会看到一个大栏,显示与默认通配符查询匹配的文档总数。
  3. 指定 Y 轴所代表的字段
  4. 指定 X 轴所代表的字段
  5. 点击 images/apply-changes-button.png 按钮来更新图表。

image.png

完成后,如果想要保存这个图表,可以点击页面最上方一栏中的 Save 按钮。

2.3. 报表

报表(Dashboard) 可以整合和共享 Visualize 集合。

  1. 点击侧面导航栏中的 Dashboard。
  2. 点击添加显示保存的可视化列表。
  3. 点击之前保存的 Visualize,然后点击列表底部的小向上箭头关闭可视化列表。
  4. 将鼠标悬停在可视化对象上会显示允许您编辑,移动,删除和调整可视化对象大小的容器控件。

3. FAQ

3.1. Kibana No Default Index Pattern Warning

问题:安装 ELK 后,访问 kibana 页面时,提示以下错误信息:

1
2
3
Warning No default index pattern. You must select or create one to continue.
...
Unable to fetch mapping. Do you have indices matching the pattern?

这就说明 logstash 没有把日志写入到 elasticsearch。

解决方法:

检查 logstash 与 elasticsearch 之间的通讯是否有问题,一般问题就出在这。

4. 参考资料

Elastic 技术栈之 Logstash

本文是 Elastic 技术栈(ELK)的 Logstash 应用。

如果不了解 Elastic 的安装、配置、部署,可以参考:Elastic 技术栈之快速入门

简介

Logstash 可以传输和处理你的日志、事务或其他数据。

功能

Logstash 是 Elasticsearch 的最佳数据管道。

Logstash 是插件式管理模式,在输入、过滤、输出以及编码过程中都可以使用插件进行定制。Logstash 社区有超过 200 种可用插件。

工作原理

Logstash 有两个必要元素:inputoutput ,一个可选元素:filter

这三个元素,分别代表 Logstash 事件处理的三个阶段:输入 > 过滤器 > 输出。

img

  • input 负责从数据源采集数据。
  • filter 将数据修改为你指定的格式或内容。
  • output 将数据传输到目的地。

在实际应用场景中,通常输入、输出、过滤器不止一个。Logstash 的这三个元素都使用插件式管理方式,用户可以根据应用需要,灵活的选用各阶段需要的插件,并组合使用。

后面将对插件展开讲解,暂且不表。

设置

设置文件

  • **logstash.yml**:logstash 的默认启动配置文件
  • **jvm.options**:logstash 的 JVM 配置文件。
  • startup.options (Linux):包含系统安装脚本在 /usr/share/logstash/bin 中使用的选项为您的系统构建适当的启动脚本。安装 Logstash 软件包时,系统安装脚本将在安装过程结束时执行,并使用 startup.options 中指定的设置来设置用户,组,服务名称和服务描述等选项。

logstash.yml 设置项

节选部分设置项,更多项请参考:https://www.elastic.co/guide/en/logstash/current/logstash-settings-file.html

参数 描述 默认值
node.name 节点名 机器的主机名
path.data Logstash 及其插件用于任何持久性需求的目录。 LOGSTASH_HOME/data
pipeline.workers 同时执行管道的过滤器和输出阶段的工作任务数量。如果发现事件正在备份,或 CPU 未饱和,请考虑增加此数字以更好地利用机器处理能力。 Number of the host’s CPU cores
pipeline.batch.size 尝试执行过滤器和输出之前,单个工作线程从输入收集的最大事件数量。较大的批量处理大小一般来说效率更高,但是以增加的内存开销为代价。您可能必须通过设置 LS_HEAP_SIZE 变量来有效使用该选项来增加 JVM 堆大小。 125
pipeline.batch.delay 创建管道事件批处理时,在将一个尺寸过小的批次发送给管道工作任务之前,等待每个事件需要多长时间(毫秒)。 5
pipeline.unsafe_shutdown 如果设置为 true,则即使在内存中仍存在 inflight 事件时,也会强制 Logstash 在关闭期间退出。默认情况下,Logstash 将拒绝退出,直到所有接收到的事件都被推送到输出。启用此选项可能会导致关机期间数据丢失。 false
path.config 主管道的 Logstash 配置路径。如果您指定一个目录或通配符,配置文件将按字母顺序从目录中读取。 Platform-specific. See [dir-layout].
config.string 包含用于主管道的管道配置的字符串。使用与配置文件相同的语法。 None
config.test_and_exit 设置为 true 时,检查配置是否有效,然后退出。请注意,使用此设置不会检查 grok 模式的正确性。 Logstash 可以从目录中读取多个配置文件。如果将此设置与 log.level:debug 结合使用,则 Logstash 将记录组合的配置文件,并注掉其源文件的配置块。 false
config.reload.automatic 设置为 true 时,定期检查配置是否已更改,并在配置更改时重新加载配置。这也可以通过 SIGHUP 信号手动触发。 false
config.reload.interval Logstash 检查配置文件更改的时间间隔。 3s
config.debug 设置为 true 时,将完全编译的配置显示为调试日志消息。您还必须设置log.level:debug。警告:日志消息将包括任何传递给插件配置作为明文的“密码”选项,并可能导致明文密码出现在您的日志! false
config.support_escapes 当设置为 true 时,带引号的字符串将处理转义字符。 false
modules 配置时,模块必须处于上表所述的嵌套 YAML 结构中。 None
http.host 绑定地址 "127.0.0.1"
http.port 绑定端口 9600
log.level 日志级别。有效选项:fatal > error > warn > info > debug > trace info
log.format 日志格式。json (JSON 格式)或 plain (原对象) plain
path.logs Logstash 自身日志的存储路径 LOGSTASH_HOME/logs
path.plugins 在哪里可以找到自定义的插件。您可以多次指定此设置以包含多个路径。

启动

命令行

通过命令行启动 logstash 的方式如下:

1
bin/logstash [options]

其中 options 是您可以指定用于控制 Logstash 执行的命令行标志。

在命令行上设置的任何标志都会覆盖 Logstash 设置文件(logstash.yml)中的相应设置,但设置文件本身不会更改。

虽然可以通过指定命令行参数的方式,来控制 logstash 的运行方式,但显然这么做很麻烦。

建议通过指定配置文件的方式,来控制 logstash 运行,启动命令如下:

1
bin/logstash -f logstash.conf

若想了解更多的命令行参数细节,请参考:https://www.elastic.co/guide/en/logstash/current/running-logstash-command-line.html

配置文件

上节,我们了解到,logstash 可以执行 bin/logstash -f logstash.conf ,按照配置文件中的参数去覆盖默认设置文件(logstash.yml)中的设置。

这节,我们就来学习一下这个配置文件如何配置参数。

配置文件结构

在工作原理一节中,我们已经知道了 Logstash 主要有三个工作阶段 input 、filter、output。而 logstash 配置文件文件结构也与之相对应:

1
2
3
4
5
input {}

filter {}

output {}

每个部分都包含一个或多个插件的配置选项。如果指定了多个过滤器,则会按照它们在配置文件中的显示顺序应用它们。

插件配置

插件的配置由插件名称和插件的一个设置块组成。

下面的例子中配置了两个输入文件配置:

1
2
3
4
5
6
7
8
9
10
11
input {
file {
path => "/var/log/messages"
type => "syslog"
}

file {
path => "/var/log/apache/access.log"
type => "apache"
}
}

您可以配置的设置因插件类型而异。你可以参考: Input Plugins, Output Plugins, Filter Plugins, 和 Codec Plugins

值类型

一个插件可以要求设置的值是一个特定的类型,比如布尔值,列表或哈希值。以下值类型受支持。

  • Array
1
users => [ {id => 1, name => bob}, {id => 2, name => jane} ]
  • Lists
1
2
path => ['/var/log/messages', '/var/log/*.log']
uris => ['http://elastic.co', 'http://example.net']
  • Boolean
1
ssl_enable => true
  • Bytes
1
2
3
4
my_bytes => "1113"   # 1113 bytes
my_bytes => "10MiB" # 10485760 bytes
my_bytes => "100kib" # 102400 bytes
my_bytes => "180 mb" # 180000000 bytes
  • Codec
1
codec => 'json'
  • Hash
1
2
3
4
5
match => {
"field1" => "value1"
"field2" => "value2"
...
}
  • Number
1
port => 33
  • Password
1
my_password => 'password'
  • URI
1
my_uri => 'http://foo:bar@example.net'
  • Path
1
my_path => '/tmp/logstash'
  • String

  • 转义字符

插件

input

Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。

常用 input 插件

  • file:从文件系统上的文件读取,就像 UNIX 命令 tail -0F 一样
  • syslog:在众所周知的端口 514 上侦听系统日志消息,并根据 RFC3164 格式进行解析
  • redis:从 redis 服务器读取,使用 redis 通道和 redis 列表。 Redis 经常用作集中式 Logstash 安装中的“代理”,它将来自远程 Logstash“托运人”的 Logstash 事件排队。
  • beats:处理由 Filebeat 发送的事件。

更多详情请见:Input Plugins

filter

过滤器是 Logstash 管道中的中间处理设备。如果符合特定条件,您可以将条件过滤器组合在一起,对事件执行操作。

常用 filter 插件

  • grok:解析和结构任意文本。 Grok 目前是 Logstash 中将非结构化日志数据解析为结构化和可查询的最佳方法。
  • mutate:对事件字段执行一般转换。您可以重命名,删除,替换和修改事件中的字段。
  • drop:完全放弃一个事件,例如调试事件。
  • clone:制作一个事件的副本,可能会添加或删除字段。
  • geoip:添加有关 IP 地址的地理位置的信息(也可以在 Kibana 中显示惊人的图表!)

更多详情请见:Filter Plugins

output

输出是 Logstash 管道的最后阶段。一个事件可以通过多个输出,但是一旦所有输出处理完成,事件就完成了执行。

常用 output 插件

  • elasticsearch:将事件数据发送给 Elasticsearch(推荐模式)。
  • file:将事件数据写入文件或磁盘。
  • graphite:将事件数据发送给 graphite(一个流行的开源工具,存储和绘制指标。 http://graphite.readthedocs.io/en/latest/)。
  • statsd:将事件数据发送到 statsd (这是一种侦听统计数据的服务,如计数器和定时器,通过 UDP 发送并将聚合发送到一个或多个可插入的后端服务)。

更多详情请见:Output Plugins

codec

用于格式化对应的内容。

常用 codec 插件

  • json:以 JSON 格式对数据进行编码或解码。
  • multiline:将多行文本事件(如 java 异常和堆栈跟踪消息)合并为单个事件。

更多插件请见:Codec Plugins

实战

前面的内容都是对 Logstash 的介绍和原理说明。接下来,我们来实战一些常见的应用场景。

传输控制台数据

stdin input 插件从标准输入读取事件。这是最简单的 input 插件,一般用于测试场景。

应用

(1)创建 logstash-input-stdin.conf

1
2
3
4
5
input { stdin { } }
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}

更多配置项可以参考:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-stdin.html

(2)执行 logstash,使用 -f 来指定你的配置文件:

1
bin/logstash -f logstash-input-stdin.conf

传输 logback 日志

elk 默认使用的 Java 日志工具是 log4j2 ,并不支持 logback 和 log4j。

想使用 logback + logstash ,可以使用 logstash-logback-encoderlogstash-logback-encoder 提供了 UDP / TCP / 异步方式来传输日志数据到 logstash。

如果你使用的是 log4j ,也不是不可以用这种方式,只要引入桥接 jar 包即可。如果你对 log4j 、logback ,或是桥接 jar 包不太了解,可以参考我的这篇博文:细说 Java 主流日志工具库

TCP 应用

logstash 配置

(1)创建 logstash-input-tcp.conf

1
2
3
4
5
6
7
8
9
10
11
input {
tcp {
port => 9251
codec => json_lines
mode => server
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}

更多配置项可以参考:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html

(2)执行 logstash,使用 -f 来指定你的配置文件:bin/logstash -f logstash-input-udp.conf

java 应用配置

(1)在 Java 应用的 pom.xml 中引入 jar 包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>4.11</version>
</dependency>

<!-- logback 依赖包 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-access</artifactId>
<version>1.2.3</version>
</dependency>

(2)接着,在 logback.xml 中添加 appender

1
2
3
4
5
6
7
8
9
10
11
<appender name="ELK-TCP" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<!--
destination 是 logstash 服务的 host:port,
相当于和 logstash 建立了管道,将日志数据定向传输到 logstash
-->
<destination>192.168.28.32:9251</destination>
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<logger name="io.github.dunwu.spring" level="TRACE" additivity="false">
<appender-ref ref="ELK-TCP" />
</logger>

(3)接下来,就是 logback 的具体使用 ,如果对此不了解,不妨参考一下我的这篇博文:细说 Java 主流日志工具库

实例:我的 logback.xml

UDP 应用

UDP 和 TCP 的使用方式大同小异。

logstash 配置

(1)创建 logstash-input-udp.conf

1
2
3
4
5
6
7
8
9
10
input {
udp {
port => 9250
codec => json
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}

更多配置项可以参考:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-udp.html

(2)执行 logstash,使用 -f 来指定你的配置文件:bin/logstash -f logstash-input-udp.conf

java 应用配置

(1)在 Java 应用的 pom.xml 中引入 jar 包:

TCP 应用 一节中的引入依赖包完全相同。

(2)接着,在 logback.xml 中添加 appender

1
2
3
4
5
6
7
<appender name="ELK-UDP" class="net.logstash.logback.appender.LogstashSocketAppender">
<host>192.168.28.32</host>
<port>9250</port>
</appender>
<logger name="io.github.dunwu.spring" level="TRACE" additivity="false">
<appender-ref ref="ELK-UDP" />
</logger>

(3)接下来,就是 logback 的具体使用 ,如果对此不了解,不妨参考一下我的这篇博文:细说 Java 主流日志工具库

实例:我的 logback.xml

传输文件

在 Java Web 领域,需要用到一些重要的工具,例如 Tomcat 、Nginx 、Mysql 等。这些不属于业务应用,但是它们的日志数据对于定位问题、分析统计同样很重要。这时无法使用 logback 方式将它们的日志传输到 logstash。

如何采集这些日志文件呢?别急,你可以使用 logstash 的 file input 插件。

需要注意的是,传输文件这种方式,必须在日志所在的机器上部署 logstash 。

应用

logstash 配置

(1)创建 logstash-input-file.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
input {
file {
path => ["/var/log/nginx/access.log"]
type => "nginx-access-log"
start_position => "beginning"
}
}

output {
if [type] == "nginx-access-log" {
elasticsearch {
hosts => ["localhost:9200"]
index => "nginx-access-log"
}
}
}

(2)执行 logstash,使用 -f 来指定你的配置文件:bin/logstash -f logstash-input-file.conf

更多配置项可以参考:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html

小技巧

启动、终止应用

如果你的 logstash 每次都是通过指定配置文件方式启动。不妨建立一个启动脚本。

1
2
# cd xxx 进入 logstash 安装目录下的 bin 目录
logstash -f logstash.conf

如果你的 logstash 运行在 linux 系统下,不妨使用 nohup 来启动一个守护进程。这样做的好处在于,即使关闭终端,应用仍会运行。

创建 startup.sh

1
nohup ./logstash -f logstash.conf >> nohup.out 2>&1 &

终止应用没有什么好方法,你只能使用 ps -ef | grep logstash ,查出进程,将其 kill 。不过,我们可以写一个脚本来干这件事:

创建 shutdown.sh

脚本不多解释,请自行领会作用。

1
2
PID=`ps -ef | grep logstash | awk '{ print $2}' | head -n 1`
kill -9 ${PID}

资料

推荐阅读

Logstash 运维

Logstash 是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。

1. 安装

1.1. 安装步骤

安装步骤如下:

(1)在 logstash 官方下载地址下载所需版本包并解压到本地。

(2)添加一个 logstash.conf 文件,指定要使用的插件以及每个插件的设置。举个简单的例子:

1
2
3
4
5
input { stdin { } }
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}

(3)运行 bin/logstash -f logstash.conf (Windows 上运行bin/logstash.bat -f logstash.conf

2. 配置

2.1. 设置文件

  • **logstash.yml**:logstash 的默认启动配置文件
  • **jvm.options**:logstash 的 JVM 配置文件。
  • startup.options (Linux):包含系统安装脚本在 /usr/share/logstash/bin 中使用的选项为您的系统构建适当的启动脚本。安装 Logstash 软件包时,系统安装脚本将在安装过程结束时执行,并使用 startup.options 中指定的设置来设置用户,组,服务名称和服务描述等选项。

2.2. logstash.yml 设置项

节选部分设置项,更多项请参考:https://www.elastic.co/guide/en/logstash/current/logstash-settings-file.html

参数 描述 默认值
node.name 节点名 机器的主机名
path.data Logstash 及其插件用于任何持久性需求的目录。 LOGSTASH_HOME/data
pipeline.workers 同时执行管道的过滤器和输出阶段的工作任务数量。如果发现事件正在备份,或 CPU 未饱和,请考虑增加此数字以更好地利用机器处理能力。 Number of the host’s CPU cores
pipeline.batch.size 尝试执行过滤器和输出之前,单个工作线程从输入收集的最大事件数量。较大的批量处理大小一般来说效率更高,但是以增加的内存开销为代价。您可能必须通过设置 LS_HEAP_SIZE 变量来有效使用该选项来增加 JVM 堆大小。 125
pipeline.batch.delay 创建管道事件批处理时,在将一个尺寸过小的批次发送给管道工作任务之前,等待每个事件需要多长时间(毫秒)。 5
pipeline.unsafe_shutdown 如果设置为 true,则即使在内存中仍存在 inflight 事件时,也会强制 Logstash 在关闭期间退出。默认情况下,Logstash 将拒绝退出,直到所有接收到的事件都被推送到输出。启用此选项可能会导致关机期间数据丢失。 false
path.config 主管道的 Logstash 配置路径。如果您指定一个目录或通配符,配置文件将按字母顺序从目录中读取。 Platform-specific. See [dir-layout].
config.string 包含用于主管道的管道配置的字符串。使用与配置文件相同的语法。 None
config.test_and_exit 设置为 true 时,检查配置是否有效,然后退出。请注意,使用此设置不会检查 grok 模式的正确性。 Logstash 可以从目录中读取多个配置文件。如果将此设置与 log.level:debug 结合使用,则 Logstash 将记录组合的配置文件,并注掉其源文件的配置块。 false
config.reload.automatic 设置为 true 时,定期检查配置是否已更改,并在配置更改时重新加载配置。这也可以通过 SIGHUP 信号手动触发。 false
config.reload.interval Logstash 检查配置文件更改的时间间隔。 3s
config.debug 设置为 true 时,将完全编译的配置显示为调试日志消息。您还必须设置log.level:debug。警告:日志消息将包括任何传递给插件配置作为明文的“密码”选项,并可能导致明文密码出现在您的日志! false
config.support_escapes 当设置为 true 时,带引号的字符串将处理转义字符。 false
modules 配置时,模块必须处于上表所述的嵌套 YAML 结构中。 None
http.host 绑定地址 "127.0.0.1"
http.port 绑定端口 9600
log.level 日志级别。有效选项:fatal > error > warn > info > debug > trace info
log.format 日志格式。json (JSON 格式)或 plain (原对象) plain
path.logs Logstash 自身日志的存储路径 LOGSTASH_HOME/logs
path.plugins 在哪里可以找到自定义的插件。您可以多次指定此设置以包含多个路径。

3. 启动

3.1. 命令行

通过命令行启动 logstash 的方式如下:

1
bin/logstash [options]

其中 options 是您可以指定用于控制 Logstash 执行的命令行标志。

在命令行上设置的任何标志都会覆盖 Logstash 设置文件(logstash.yml)中的相应设置,但设置文件本身不会更改。

虽然可以通过指定命令行参数的方式,来控制 logstash 的运行方式,但显然这么做很麻烦。

建议通过指定配置文件的方式,来控制 logstash 运行,启动命令如下:

1
bin/logstash -f logstash.conf

若想了解更多的命令行参数细节,请参考:https://www.elastic.co/guide/en/logstash/current/running-logstash-command-line.html

3.2. 配置文件

上节,我们了解到,logstash 可以执行 bin/logstash -f logstash.conf ,按照配置文件中的参数去覆盖默认设置文件(logstash.yml)中的设置。

这节,我们就来学习一下这个配置文件如何配置参数。

3.2.1. 配置文件结构

在工作原理一节中,我们已经知道了 Logstash 主要有三个工作阶段 input 、filter、output。而 logstash 配置文件文件结构也与之相对应:

1
2
3
4
5
input {}

filter {}

output {}

每个部分都包含一个或多个插件的配置选项。如果指定了多个过滤器,则会按照它们在配置文件中的显示顺序应用它们。

3.2.2. 插件配置

插件的配置由插件名称和插件的一个设置块组成。

下面的例子中配置了两个输入文件配置:

1
2
3
4
5
6
7
8
9
10
11
input {
file {
path => "/var/log/messages"
type => "syslog"
}

file {
path => "/var/log/apache/access.log"
type => "apache"
}
}

您可以配置的设置因插件类型而异。你可以参考: Input Plugins, Output Plugins, Filter Plugins, 和 Codec Plugins

3.2.3. 值类型

一个插件可以要求设置的值是一个特定的类型,比如布尔值,列表或哈希值。以下值类型受支持。

  • Array
1
users => [ {id => 1, name => bob}, {id => 2, name => jane} ]
  • Lists
1
2
path => [ "/var/log/messages", "/var/log/*.log" ]
uris => [ "http://elastic.co", "http://example.net" ]
  • Boolean
1
ssl_enable => true
  • Bytes
1
2
3
4
my_bytes => "1113"   # 1113 bytes
my_bytes => "10MiB" # 10485760 bytes
my_bytes => "100kib" # 102400 bytes
my_bytes => "180 mb" # 180000000 bytes
  • Codec
1
codec => "json"
  • Hash
1
2
3
4
5
match => {
"field1" => "value1"
"field2" => "value2"
...
}
  • Number
1
port => 33
  • Password
1
my_password => "password"
  • URI
1
my_uri => "http://foo:bar@example.net"
  • Path
1
my_path => "/tmp/logstash"
  • String

  • 转义字符

4. 插件

4.1. input

Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。

4.1.1. 常用 input 插件

  • file:从文件系统上的文件读取,就像 UNIX 命令 tail -0F 一样
  • syslog:在众所周知的端口 514 上侦听系统日志消息,并根据 RFC3164 格式进行解析
  • redis:从 redis 服务器读取,使用 redis 通道和 redis 列表。 Redis 经常用作集中式 Logstash 安装中的“代理”,它将来自远程 Logstash“托运人”的 Logstash 事件排队。
  • beats:处理由 Filebeat 发送的事件。

更多详情请见:Input Plugins

4.2. filter

过滤器是 Logstash 管道中的中间处理设备。如果符合特定条件,您可以将条件过滤器组合在一起,对事件执行操作。

4.2.1. 常用 filter 插件

  • grok:解析和结构任意文本。 Grok 目前是 Logstash 中将非结构化日志数据解析为结构化和可查询的最佳方法。

  • mutate:对事件字段执行一般转换。您可以重命名,删除,替换和修改事件中的字段。

  • drop:完全放弃一个事件,例如调试事件。

  • clone:制作一个事件的副本,可能会添加或删除字段。

  • geoip:添加有关 IP 地址的地理位置的信息(也可以在 Kibana 中显示惊人的图表!)

更多详情请见:Filter Plugins

4.3. output

输出是 Logstash 管道的最后阶段。一个事件可以通过多个输出,但是一旦所有输出处理完成,事件就完成了执行。

4.3.1. 常用 output 插件

  • elasticsearch:将事件数据发送给 Elasticsearch(推荐模式)。
  • file:将事件数据写入文件或磁盘。
  • graphite:将事件数据发送给 graphite(一个流行的开源工具,存储和绘制指标。 http://graphite.readthedocs.io/en/latest/)。
  • statsd:将事件数据发送到 statsd (这是一种侦听统计数据的服务,如计数器和定时器,通过 UDP 发送并将聚合发送到一个或多个可插入的后端服务)。

更多详情请见:Output Plugins

4.4. codec

用于格式化对应的内容。

4.4.1. 常用 codec 插件

  • json:以 JSON 格式对数据进行编码或解码。
  • multiline:将多行文本事件(如 java 异常和堆栈跟踪消息)合并为单个事件。

更多插件请见:Codec Plugins

5. 实战

前面的内容都是对 Logstash 的介绍和原理说明。接下来,我们来实战一些常见的应用场景。

5.1. 传输控制台数据

stdin input 插件从标准输入读取事件。这是最简单的 input 插件,一般用于测试场景。

应用

(1)创建 logstash-input-stdin.conf

1
2
3
4
5
input { stdin { } }
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}

更多配置项可以参考:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-stdin.html

(2)执行 logstash,使用 -f 来指定你的配置文件:

1
bin/logstash -f logstash-input-stdin.conf

5.2. 传输 logback 日志

elk 默认使用的 Java 日志工具是 log4j2 ,并不支持 logback 和 log4j。

想使用 logback + logstash ,可以使用 logstash-logback-encoderlogstash-logback-encoder 提供了 UDP / TCP / 异步方式来传输日志数据到 logstash。

如果你使用的是 log4j ,也不是不可以用这种方式,只要引入桥接 jar 包即可。如果你对 log4j 、logback ,或是桥接 jar 包不太了解,可以参考我的这篇博文:细说 Java 主流日志工具库

5.2.1. TCP 应用

logstash 配置:

(1)创建 logstash-input-tcp.conf

1
2
3
4
5
6
7
8
9
10
11
12
input {
# stdin { }
tcp {
# host:port就是上面appender中的 destination,
# 这里其实把logstash作为服务,开启9250端口接收logback发出的消息
host => "127.0.0.1" port => 9250 mode => "server" tags => ["tags"] codec => json_lines
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}

更多配置项可以参考:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html

(2)执行 logstash,使用 -f 来指定你的配置文件:bin/logstash -f logstash-input-udp.conf

java 应用配置:

(1)在 Java 应用的 pom.xml 中引入 jar 包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>4.11</version>
</dependency>

<!-- logback 依赖包 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-access</artifactId>
<version>1.2.3</version>
</dependency>

(2)接着,在 logback.xml 中添加 appender

1
2
3
4
5
6
7
8
9
10
11
<appender name="ELK-TCP" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<!--
destination 是 logstash 服务的 host:port,
相当于和 logstash 建立了管道,将日志数据定向传输到 logstash
-->
<destination>192.168.28.32:9251</destination>
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<logger name="io.github.dunwu.spring" level="TRACE" additivity="false">
<appender-ref ref="ELK-TCP" />
</logger>

大功告成,此后,io.github.dunwu.spring 包中的 TRACE 及以上级别的日志信息都会被定向输出到 logstash 服务。

img

接下来,就是 logback 的具体使用 ,如果对此不了解,不妨参考一下我的这篇博文:细说 Java 主流日志工具库

实例:我的 logback.xml

5.2.2. UDP 应用

UDP 和 TCP 的使用方式大同小异。

logstash 配置:

(1)创建 logstash-input-udp.conf

1
2
3
4
5
6
7
8
9
10
input {
udp {
port => 9250
codec => json
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}

更多配置项可以参考:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-udp.html

(2)执行 logstash,使用 -f 来指定你的配置文件:bin/logstash -f logstash-input-udp.conf

java 应用配置:

(1)在 Java 应用的 pom.xml 中引入 jar 包:

TCP 应用 一节中的引入依赖包完全相同。

(2)接着,在 logback.xml 中添加 appender

1
2
3
4
5
6
7
<appender name="ELK-UDP" class="net.logstash.logback.appender.LogstashSocketAppender">
<host>192.168.28.32</host>
<port>9250</port>
</appender>
<logger name="io.github.dunwu.spring" level="TRACE" additivity="false">
<appender-ref ref="ELK-UDP" />
</logger>

(3)接下来,就是 logback 的具体使用 ,如果对此不了解,不妨参考一下我的这篇博文:细说 Java 主流日志工具库

实例:我的 logback.xml

5.3. 传输文件

在 Java Web 领域,需要用到一些重要的工具,例如 Tomcat 、Nginx 、Mysql 等。这些不属于业务应用,但是它们的日志数据对于定位问题、分析统计同样很重要。这时无法使用 logback 方式将它们的日志传输到 logstash。

如何采集这些日志文件呢?别急,你可以使用 logstash 的 file input 插件。

需要注意的是,传输文件这种方式,必须在日志所在的机器上部署 logstash 。

应用

logstash 配置

(1)创建 logstash-input-file.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
input {
file {
path => ["/var/log/nginx/access.log"]
type => "nginx-access-log"
start_position => "beginning"
}
}

output {
if [type] == "nginx-access-log" {
elasticsearch {
hosts => ["localhost:9200"]
index => "nginx-access-log"
}
}
}

(2)执行 logstash,使用 -f 来指定你的配置文件:bin/logstash -f logstash-input-file.conf

更多配置项可以参考:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html

6. 小技巧

6.1. 启动、终止应用

如果你的 logstash 每次都是通过指定配置文件方式启动。不妨建立一个启动脚本。

1
2
# cd xxx 进入 logstash 安装目录下的 bin 目录
logstash -f logstash.conf

如果你的 logstash 运行在 linux 系统下,不妨使用 nohup 来启动一个守护进程。这样做的好处在于,即使关闭终端,应用仍会运行。

创建 startup.sh:

1
nohup ./logstash -f logstash.conf >> nohup.out 2>&1 &

终止应用没有什么好方法,你只能使用 ps -ef | grep logstash ,查出进程,将其 kill 。不过,我们可以写一个脚本来干这件事:

创建 shutdown.sh:

脚本不多解释,请自行领会作用。

1
2
PID=`ps -ef | grep logstash | awk '{ print $2}' | head -n 1`
kill -9 ${PID}

7. 参考资料

RPC 基础篇

RPC 简介

什么是 RPC

RPC 的全称是 Remote Procedure Call,即远程过程调用

RPC 的主要作用是:

  • 屏蔽远程调用跟本地调用的差异,让用户像调用本地一样去调用远程方法。
  • 隐藏底层网络通信的复杂性,让用户更聚焦于业务逻辑。

RPC 的架构定位

RPC 是微服务架构的基石,它提供了一种应用间通信的方式。

RPC 核心原理

RPC 是一种应用间通信的方式,它的通信流程中需要注意以下环节:

  • 传输方式:RPC 是一个远程调用,因此必然需要通过网络传输数据,且 RPC 常用于业务系统之间的数据交互,需要保证其可靠性,所以 RPC 一般默认采用 TCP 来传输。
  • 序列化:在网络中传输的数据只能是二进制数据,而 RPC 请求时,发送的都是对象。因此,请求方需要将请求参数转为二进制数据,即序列化。
  • 反序列化:RPC 响应方接受到请求,要将二进制数据转换为请求参数,需要反序列化
  • 协议:请求方和响应方要互相识别彼此的信息,需要约定好彼此数据的格式,即协议。大多数的协议至少分成两部分,分别是数据头和消息体。数据头一般用于身份识别,包括协议标识、数据大小、请求类型、序列化类型等信息;消息体主要是请求的业务参数信息和扩展属性等。
  • 动态代理:为了屏蔽底层通信细节,使用户聚焦自身业务,因此 RPC 框架一般引入了动态代理,通过依赖注入等技术,拦截方法调用,完成远程调用的通信逻辑。

下图诠释了以上环节是如何串联起来的:

RPC 协议

协议的作用

只有二进制才能在网络中传输,所以 RPC 请求在发送到网络中之前,他需要把方法调用的请求参数转成二进制;转成二进制后,写入本地 Socket 中,然后被网卡发送到网络设备中。

在传输过程中,RPC 并不会把请求参数的所有二进制数据整体一下子发送到对端机器上,中间可能会拆分成好几个数据包,也可能会合并其他请求的数据包(合并的前提是同一个 TCP 连接上的数据),至于怎么拆分合并,这其中的细节会涉及到系统参数配置和 TCP 窗口大小。对于服务提供方应用来说,他会从 TCP 通道里面收到很多的二进制数据,那这时候怎么识别出哪些二进制是第一个请求的呢?

这就好比让你读一篇没有标点符号的文章,你要怎么识别出每一句话到哪里结束呢?很简单啊,我们加上标点,完成断句就好了。

为了避免语义不一致的事情发生,我们就需要在发送请求的时候设定一个边界,然后在收到请求的时候按照这个设定的边界进行数据分割。这个边界语义的表达,就是我们所说的协议。

为何需要设计 RPC 协议

既然有了现成的 HTTP 协议,还有必要设计 RPC 协议吗?

有必要。因为 HTTP 这些通信标准协议,数据包中的实际请求数据相对于数据包本身要小很多,有很多无用的内容;并且 HTTP 属于无状态协议,无法将请求和响应关联,每次请求要重新建立连接。这对于高性能的 RPC 来说,HTTP 协议难以满足需求,所以有必要设计一个紧凑的私有协议

如何?

首先,必须先明确消息的边界,即确定消息的长度。因此,至少要分为:消息长度+消息内容两部分。

接下来,我们会发现,在使用过程中,仅消息长度,不足以明确通信中的很多细节:如序列化方式是怎样的?是否消息压缩?压缩格式是怎样的?如果协议发生变化,需要明确协议版本等等。

综上,一个 RPC 协议大概会由下图中的这些参数组成:

可扩展的协议

前面所述的协议属于定长协议头,那也就是说往后就不能再往协议头里加新参数了,如果加参
数就会导致线上兼容问题。

为了保证能平滑地升级改造前后的协议,我们有必要设计一种支持可扩展的协议。其关键在于让协议头支持可扩展,扩展后协议头的长度就不能定长了。那要实现读取不定长的协议头里面的内容,在这之前肯定需要一个固定的地方读取长度,所以我们需要一个固定的写入协议头的长度。整体协议就变成了三部分内容:固定部分、协议头内容、协议体内容。

RPC 序列化

有兴趣深入了解 JDK 序列化方式,可以参考:Java 序列化

由于,网络传输的数据必须是二进制数据,而调用方请求的出参、入参都是对象。因此,必须将对象转换可传输的二进制,并且要求转换算法是可逆的。

  • 序列化(serialize):序列化是将对象转换为二进制数据。
  • 反序列化(deserialize):反序列化是将二进制数据转换为对象。

序列化就是将对象转换成二进制数据的过程,而反序列就是反过来将二进制转换为对象的过程。

序列化技术

Java 领域,常见的序列化技术如下

序列化技术选型

市面上有如此多的序列化技术,那么我们在应用时如何选择呢?

序列化技术选型,需要考量的维度,根据重要性从高到低,依次有:

  • 安全性:是否存在漏洞。如果存在漏洞,就有被攻击的可能性。
  • 兼容性:版本升级后的兼容性是否很好,是否支持更多的对象类型,是否是跨平台、跨语言的。服务调用的稳定性与可靠性,要比服务的性能更加重要。
  • 性能
    • 时间开销:序列化、反序列化的耗时性能自然越小越好。
    • 空间开销:序列化后的数据越小越好,这样网络传输效率就高。
  • 易用性:类库是否轻量化,API 是否简单易懂。

鉴于以上的考量,序列化技术的选型建议如下:

  • JDK 序列化:性能较差,且有很多使用限制,不建议使用。
  • ThriftProtobuf:适用于对性能敏感,对开发体验要求不高
  • Hessian:适用于对开发体验敏感,性能有要求
  • JacksonGsonFastjson:适用于对序列化后的数据要求有良好的可读性(转为 json 、xml 形式)。

序列化问题

由于 RPC 每次通信,都要经过序列化、反序列化的过程,所以序列化方式,会直接影响 RPC 通信的性能。除了选择合适的序列化技术,如何合理使用序列化也非常重要。

RPC 序列化常见的使用不当的情况如下:

  • 对象过于复杂、庞大 - 对象过于复杂、庞大,会降低序列化、反序列化的效率,并增加传输开销,从而导致响应时延增大。

    • 过于复杂:存在多层的嵌套,比如 A 对象关联 B 对象,B 对象又聚合 C 对象,C 对象又关联聚合很多其他对象
    • 过于庞大:比如一个大 List 或者大 Map
  • 对象有复杂的继承关系 - 对象关系越复杂,就越浪费性能,同时又很容易出现序列化上的问题。大多数序列化框架在进行序列化时,如果发现类有继承关系,会不停地寻找父类,遍历属性。

  • 使用序列化框架不支持的类作为入参类 - 比如 Hessian 框架,他天然是不支持 LinkHashMap、LinkedHashSet 等,而且大多数情况下最好不要使用第三方集合类,如 Guava 中的集合类,很多开源的序列化框架都是优先支持编程语言原生的对象。因此如果入参是集合类,应尽量选用原生的、最为常用的集合类,如 HashMap、ArrayList。

序列化要点

前面已经列举了常见的序列化问题,既然明确了问题,就要针对性预防。RPC 序列化时要注意以下几点:

  1. 对象要尽量简单,没有太多的依赖关系,属性不要太多,尽量高内聚;
  2. 入参对象与返回值对象体积不要太大,更不要传太大的集合;
  3. 尽量使用简单的、常用的、开发语言原生的对象,尤其是集合类;
  4. 对象不要有复杂的继承关系,最好不要有父子类的情况。

RPC 通信

一次 RPC 调用,本质就是服务消费者与服务提供者间的一次网络信息交换的过程。可见,通信时 RPC 实现的核心。

常见的网络 IO 模型有:同步阻塞(BIO)、同步非阻塞(NIO)、异步非阻塞(AIO)。

IO 多路复用

IO 多路复用(Reactor 模式)在高并发场景下使用最为广泛,很多知名软件都应用了这一技术,如:Netty、Redis、Nginx 等。

IO 多路复用分为 select,poll 和 epoll。

什么是 IO 多路复用?字面上的理解,多路就是指多个通道,也就是多个网络连接的 IO,而复用就是指多个通道复用在一个复用器上。

零拷贝

系统内核处理 IO 操作分为两个阶段——等待数据和拷贝数据。等待数据,就是系统内核在等待网卡接收到数据后,把数据写到内核中;而拷贝数据,就是系统内核在获取到数据后,将数据拷贝到用户进程的空间中。

img

应用进程的每一次写操作,都会把数据写到用户空间的缓冲区中,再由 CPU 将数据拷贝到系统内核的缓冲区中,之后再由 DMA 将这份数据拷贝到网卡中,最后由网卡发送出去。这里我们可以看到,一次写操作数据要拷贝两次才能通过网卡发送出去,而用户进程的读操作则是将整个流程反过来,数据同样会拷贝两次才能让应用程序读取到数据。

应用进程的一次完整的读写操作,都需要在用户空间与内核空间中来回拷贝,并且每一次拷贝,都需要 CPU 进行一次上下文切换(由用户进程切换到系统内核,或由系统内核切换到用户进程),这样很浪费 CPU 和性能。

所谓的零拷贝,就是取消用户空间与内核空间之间的数据拷贝操作,应用进程每一次的读写操作,可以通过一种方式,直接将数据写入内核或从内核中读取数据,再通过 DMA 将内核中的数据拷贝到网卡,或将网卡中的数据 copy 到内核。

img

Netty 的零拷贝偏向于用户空间中对数据操作的优化,这对处理 TCP 传输中的拆包粘包问题有着重要的意义,对应用程序处理请求数据与返回数据也有重要的意义。

Netty 框架中很多内部的 ChannelHandler 实现类,都是通过 CompositeByteBuf、slice、wrap 操作来处理 TCP 传输中的拆包与粘包问题的。

Netty 的 ByteBuffer 可以采用 Direct Buffers,使用堆外直接内存进行 Socketd 的读写
操作,最终的效果与我刚才讲解的虚拟内存所实现的效果是一样的。

Netty 还提供 FileRegion 中包装 NIO 的 FileChannel.transferTo() 方法实现了零拷
贝,这与 Linux 中的 sendfile 方式在原理上也是一样的。

RPC 动态代理

RPC 的远程过程调用是通过反射+动态代理实现的。

img

RPC 框架会自动为要调用的接口生成一个代理类。当在项目中注入接口的时候,运行过程中实际绑定的就是这个接口生成的代理类。在接口方法被调用时,会被代理类拦截,这样,就可以在生成的代理类中,加入远程调用逻辑。

除了 JDK 默认的 InvocationHandler 能完成代理功能,还有很多其他的第三方框架也可以,比如像 Javassist、Byte Buddy 这样的框架。

反射+动态代理更多详情可以参考:深入理解 Java 反射和动态代理

参考资料

JVM 垃圾收集

程序计数器、虚拟机栈和本地方法栈这三个区域属于线程私有的,只存在于线程的生命周期内,线程结束之后也会消失,因此不需要对这三个区域进行垃圾回收。垃圾回收主要是针对 Java 堆和方法区进行

对象活着吗

引用计数算法

给对象添加一个引用计数器,当对象增加一个引用时计数器加 1,引用失效时计数器减 1。引用计数为 0 的对象可被回收。

两个对象出现循环引用的情况下,此时引用计数器永远不为 0,导致无法对它们进行回收。

1
2
3
4
5
6
7
8
9
10
public class ReferenceCountingGC {
public Object instance = null;

public static void main(String[] args) {
ReferenceCountingGC objectA = new ReferenceCountingGC();
ReferenceCountingGC objectB = new ReferenceCountingGC();
objectA.instance = objectB;
objectB.instance = objectA;
}
}

因为循环引用的存在,所以 Java 虚拟机不适用引用计数算法

可达性分析算法

通过 GC Roots 作为起始点进行搜索,JVM 将能够到达到的对象视为存活,不可达的对象视为死亡

可达性分析算法

可作为 GC Roots 的对象包括下面几种:

  • 虚拟机栈中引用的对象
  • 本地方法栈中引用的对象(Native 方法)
  • 方法区中,类静态属性引用的对象
  • 方法区中,常量引用的对象

引用类型

无论是通过引用计算算法判断对象的引用数量,还是通过可达性分析算法判断对象的引用链是否可达,判定对象是否可被回收都与引用有关。

Java 具有四种强度不同的引用类型。

强引用

被强引用(Strong Reference)关联的对象不会被垃圾收集器回收。

强引用:使用 new 一个新对象的方式来创建强引用。

1
Object obj = new Object();

软引用

被软引用(Soft Reference)关联的对象,只有在内存不够的情况下才会被回收。

软引用:使用 SoftReference 类来创建软引用。

1
2
3
Object obj = new Object();
SoftReference<Object> sf = new SoftReference<Object>(obj);
obj = null; // 使对象只被软引用关联

弱引用

被弱引用(Weak Reference)关联的对象一定会被垃圾收集器回收,也就是说它只能存活到下一次垃圾收集发生之前。

使用 WeakReference 类来实现弱引用。

1
2
3
Object obj = new Object();
WeakReference<Object> wf = new WeakReference<Object>(obj);
obj = null;

WeakHashMapEntry 继承自 WeakReference,主要用来实现缓存。

1
private static class Entry<K,V> extends WeakReference<Object> implements Map.Entry<K,V>

Tomcat 中的 ConcurrentCache 就使用了 WeakHashMap 来实现缓存功能。ConcurrentCache 采取的是分代缓存,经常使用的对象放入 eden 中,而不常用的对象放入 longterm。eden 使用 ConcurrentHashMap 实现,longterm 使用 WeakHashMap,保证了不常使用的对象容易被回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public final class ConcurrentCache<K, V> {

private final int size;

private final Map<K, V> eden;

private final Map<K, V> longterm;

public ConcurrentCache(int size) {
this.size = size;
this.eden = new ConcurrentHashMap<>(size);
this.longterm = new WeakHashMap<>(size);
}

public V get(K k) {
V v = this.eden.get(k);
if (v == null) {
v = this.longterm.get(k);
if (v != null)
this.eden.put(k, v);
}
return v;
}

public void put(K k, V v) {
if (this.eden.size() >= size) {
this.longterm.putAll(this.eden);
this.eden.clear();
}
this.eden.put(k, v);
}
}

虚引用

又称为幽灵引用或者幻影引用。一个对象是否有虚引用的存在,完全不会对其生存时间构成影响,也无法通过虚引用取得一个对象实例。

为一个对象设置虚引用关联的唯一目的就是能在这个对象被收集器回收时收到一个系统通知。

使用 PhantomReference 来实现虚引用。

1
2
3
Object obj = new Object();
PhantomReference<Object> pf = new PhantomReference<Object>(obj);
obj = null;

方法区的回收

因为方法区主要存放永久代对象,而永久代对象的回收率比年轻代差很多,因此在方法区上进行回收性价比不高。

主要是对常量池的回收和对类的卸载。

类的卸载条件很多,需要满足以下三个条件,并且满足了也不一定会被卸载:

  • 该类所有的实例都已经被回收,也就是 Java 堆中不存在该类的任何实例。
  • 加载该类的 ClassLoader 已经被回收。
  • 该类对应的 java.lang.Class 对象没有在任何地方被引用,也就无法在任何地方通过反射访问该类方法。

可以通过 -Xnoclassgc 参数来控制是否对类进行卸载。

在大量使用反射、动态代理、CGLib 等字节码框架、动态生成 JSP 以及 OSGi 这类频繁自定义 ClassLoader 的场景都需要虚拟机具备类卸载功能,以保证不会出现内存溢出。

finalize()

finalize() 类似 C++ 的析构函数,用来做关闭外部资源等工作。但是 try-finally 等方式可以做的更好,并且该方法运行代价高昂,不确定性大,无法保证各个对象的调用顺序,因此**最好不要使用 finalize()**。

当一个对象可被回收时,如果需要执行该对象的 finalize() 方法,那么就有可能通过在该方法中让对象重新被引用,从而实现自救。

垃圾收集算法

垃圾收集性能

垃圾收集器的性能指标主要有两点:

  • 停顿时间 - 停顿时间是因为 GC 而导致程序不能工作的时间长度。
  • 吞吐量 - 吞吐量关注在特定的时间周期内一个应用的工作量的最大值。对关注吞吐量的应用来说长暂停时间是可以接受的。由于高吞吐量的应用关注的基准在更长周期时间上,所以快速响应时间不在考虑之内。

标记 - 清除(Mark-Sweep)

将需要回收的对象进行标记,然后清理掉被标记的对象。

不足:

  • 标记和清除过程效率都不高;
  • 会产生大量不连续的内存碎片,导致无法给大对象分配内存。

标记 - 整理(Mark-Compact)

让所有存活的对象都向一端移动,然后直接清理掉端边界以外的内存。

这种做法能够解决内存碎片化的问题,但代价是压缩算法的性能开销。

复制(Copying)

将内存划分为大小相等的两块,每次只使用其中一块,当这一块内存用完了就将还存活的对象复制到另一块上面,然后再把使用过的内存空间进行一次清理。

主要不足是只使用了内存的一半。

现在的商业虚拟机都采用这种收集算法来回收年轻代,但是并不是将内存划分为大小相等的两块,而是分为一块较大的 Eden 空间和两块较小的 Survior 空间,每次使用 Eden 空间和其中一块 Survivor。在回收时,将 Eden 和 Survivor 中还存活着的对象一次性复制到另一块 Survivor 空间上,最后清理 Eden 和使用过的那一块 Survivor。HotSpot 虚拟机的 Eden 和 Survivor 的大小比例默认为 8:1(可以通过参数 -XX:SurvivorRatio 来调整比例),保证了内存的利用率达到 90 %。如果每次回收有多于 10% 的对象存活,那么一块 Survivor 空间就不够用了,此时需要依赖于老年代进行分配担保,也就是借用老年代的空间存储放不下的对象。

分代收集

现在的商业虚拟机采用分代收集算法,它根据对象存活周期将内存划分为几块,不同块采用适当的收集算法。

一般将 Java 堆分为年轻代和老年代。

  • 年轻代使用:复制 算法
  • 老年代使用:标记 - 清理 或者 标记 - 整理 算法

新生代

新生代是大部分对象创建和销毁的区域,在通常的 Java 应用中,绝大部分对象生命周期都是很短暂的。其内部又分为 Eden 区域,作为对象初始分配的区域;两个 Survivor,有时候也叫 fromto 区域,被用来放置从 Minor GC 中保留下来的对象。

JVM 会随意选取一个 Survivor 区域作为 to,然后会在 GC 过程中进行区域间拷贝,也就是将 Eden 中存活下来的对象和 from 区域的对象,拷贝到这个to区域。这种设计主要是为了防止内存的碎片化,并进一步清理无用对象。

Java 虚拟机会记录 Survivor 区中的对象一共被来回复制了几次。如果一个对象被复制的次数为 15(对应虚拟机参数 -XX:+MaxTenuringThreshold),那么该对象将被晋升(promote)至老年代。另外,如果单个 Survivor 区已经被占用了 50%(对应虚拟机参数 -XX:TargetSurvivorRatio),那么较高复制次数的对象也会被晋升至老年代。

老年代

放置长生命周期的对象,通常都是从 Survivor 区域拷贝过来的对象。当然,也有特殊情况,如果对象较大,JVM 会试图直接分配在 Eden 其他位置上;如果对象太大,完全无法在新生代找到足够长的连续空闲空间,JVM 就会直接分配到老年代。

永久代

这部分就是早期 Hotspot JVM 的方法区实现方式了,储存 Java 类元数据、常量池、Intern 字符串缓存。在 JDK 8 之后就不存在永久代这块儿了。

JVM 参数

这里顺便提一下,JVM 允许对堆空间大小、各代空间大小进行设置,以调整 JVM GC。

配置 描述
-Xss 虚拟机栈大小。
-Xms 堆空间初始值。
-Xmx 堆空间最大值。
-Xmn 新生代空间大小。
-XX:NewSize 新生代空间初始值。
-XX:MaxNewSize 新生代空间最大值。
-XX:NewRatio 新生代与年老代的比例。默认为 2,意味着老年代是新生代的 2 倍。
-XX:SurvivorRatio 新生代中调整 eden 区与 survivor 区的比例,默认为 8。即 eden 区为 80% 的大小,两个 survivor 分别为 10% 的大小。
-XX:PermSize 永久代空间的初始值。
-XX:MaxPermSize 永久代空间的最大值。

垃圾收集器

以上是 HotSpot 虚拟机中的 7 个垃圾收集器,连线表示垃圾收集器可以配合使用。

注:G1 垃圾收集器既可以回收年轻代内存,也可以回收老年代内存。而其他垃圾收集器只能针对特定代的内存进行回收。

串行收集器

串行收集器(Serial)是最基本、发展历史最悠久的收集器。

串行收集器是 client 模式下的默认收集器配置。因为在客户端模式下,分配给虚拟机管理的内存一般来说不会很大。Serial 收集器收集几十兆甚至一两百兆的年轻代停顿时间可以控制在一百多毫秒以内,只要不是太频繁,这点停顿是可以接受的。

串行收集器采用单线程 stop-the-world 的方式进行收集。当内存不足时,串行 GC 设置停顿标识,待所有线程都进入安全点(Safepoint)时,应用线程暂停,串行 GC 开始工作,采用单线程方式回收空间并整理内存

Serial / Serial Old 收集器运行示意图

单线程意味着复杂度更低、占用内存更少,垃圾回收效率高;但同时也意味着不能有效利用多核优势。事实上,串行收集器特别适合堆内存不高、单核甚至双核 CPU 的场合。

Serial 收集器

开启选项:-XX:+UseSerialGC

打开此开关后,使用 Serial + Serial Old 收集器组合来进行内存回收。

Serial Old 收集器

Serial Old 是 Serial 收集器的老年代版本,也是给 Client 模式下的虚拟机使用。如果用在 Server 模式下,它有两大用途:

  • 在 JDK 1.5 以及之前版本(Parallel Old 诞生以前)中与 Parallel Scavenge 收集器搭配使用。
  • 作为 CMS 收集器的后备预案,在并发收集发生 Concurrent Mode Failure 时使用。

并行收集器

开启选项:-XX:+UseParallelGC

打开此开关后,使用 Parallel Scavenge + Serial Old 收集器组合来进行内存回收。

开启选项:-XX:+UseParallelOldGC

打开此开关后,使用 Parallel Scavenge + Parallel Old 收集器组合来进行内存回收。

其他收集器都是以关注停顿时间为目标,而并行收集器是以关注吞吐量(Throughput)为目标的垃圾收集器

  • 停顿时间越短就越适合需要与用户交互的程序,良好的响应速度能提升用户体验;
  • 而高吞吐量则可以高效率地利用 CPU 时间,尽快完成程序的运算任务,主要适合在后台运算而不需要太多交互的任务。
1
吞吐量 = 运行用户代码时间 / (运行用户代码时间 + 垃圾收集时间)

并行收集器是 server 模式下的默认收集器。

并行收集器与串行收集器工作模式相似,都是 stop-the-world 方式,只是暂停时并行地进行垃圾收集。并行收集器年轻代采用复制算法,老年代采用标记-整理,在回收的同时还会对内存进行压缩。并行收集器适合对吞吐量要求远远高于延迟要求的场景,并且在满足最差延时的情况下,并行收集器将提供最佳的吞吐量。

在注重吞吐量以及 CPU 资源敏感的场合,都可以优先考虑 Parallel Scavenge 收集器 + Parallel Old 收集器。

Parallel / Parallel Old 收集器运行示意图

Parallel Scavenge 收集器

Parallel Scavenge 收集器提供了两个参数用于精确控制吞吐量,分别是:

  • -XX:MaxGCPauseMillis - 控制最大垃圾收集停顿时间,收集器将尽可能保证内存回收时间不超过设定值。
  • -XX:GCTimeRatio - 直接设置吞吐量大小的(值为大于 0 且小于 100 的整数)。

缩短停顿时间是以牺牲吞吐量和年轻代空间来换取的:年轻代空间变小,垃圾回收变得频繁,导致吞吐量下降。

Parallel Scavenge 收集器还提供了一个参数 -XX:+UseAdaptiveSizePolicy,这是一个开关参数,打开参数后,就不需要手工指定年轻代的大小(-Xmn)、Eden 和 Survivor 区的比例(-XX:SurvivorRatio)、晋升老年代对象年龄(-XX:PretenureSizeThreshold)等细节参数了,虚拟机会根据当前系统的运行情况收集性能监控信息,动态调整这些参数以提供最合适的停顿时间或者最大的吞吐量,这种方式称为 GC 自适应的调节策略(GC Ergonomics)。

Parallel Old 收集器

是 Parallel Scavenge 收集器的老年代版本,使用多线程和 “标记-整理” 算法

并发标记清除收集器

开启选项:-XX:+UseConcMarkSweepGC

打开此开关后,使用 CMS + ParNew + Serial Old 收集器组合来进行内存回收。

并发标记清除收集器是以获取最短停顿时间为目标。

开启后,年轻代使用 ParNew 收集器;老年代使用 CMS 收集器,如果 CMS 产生的碎片过多,导致无法存放浮动垃圾,JVM 会出现 Concurrent Mode Failure ,此时使用 Serial Old 收集器来替代 CMS 收集器清理碎片。

CMS 收集器

CMS 收集器是一种以获取最短停顿时间为目标的收集器。

CMS(Concurrent Mark Sweep),Mark Sweep 指的是标记 - 清除算法。

CMS 回收机制

CMS 收集器运行步骤如下:

  1. 初始标记:仅仅只是标记一下 GC Roots 能直接关联到的对象,速度很快,需要停顿。
  2. 并发标记:进行 GC Roots Tracing 的过程,它在整个回收过程中耗时最长,不需要停顿。
  3. 重新标记:为了修正并发标记期间因用户程序继续运作而导致标记产生变动的那一部分对象的标记记录,需要停顿。
  4. 并发清除:回收在标记阶段被鉴定为不可达的对象。不需要停顿。

在整个过程中耗时最长的并发标记和并发清除过程中,收集器线程都可以与用户线程一起工作,不需要进行停顿。

CMS 收集器运行示意图

CMS 回收年轻代详细步骤

(1)堆空间被分割为三块空间

img
年轻代分割成一个 Eden 区和两个 Survivor 区。年老代一个连续的空间。就地完成对象收集。除非有 FullGC 否则不会压缩。

(2)CMS 年轻代垃圾收集如何工作

年轻代被标为浅绿色,年老代被标记为蓝色。如果你的应用已经运行了一段时间,CMS 的堆看起来应该是这个样子。对象分散在年老代区域里。

img

使用 CMS,年老代对象就地释放。它们不会被来回移动。这个空间不会被压缩除非发生 FullGC。

(3)年轻代收集

从 Eden 和 Survivor 区复制活跃对象到另一个 Survivor 区。所有达到他们的年龄阈值的对象会晋升到年老代。

img
(4)年轻代回收之后

一次年轻代垃圾收集之后,Eden 区和其中一个 Survivor 区被清空。

img
最近晋升的对象以深蓝色显示在上图中,绿色的对象是年轻代幸免的还没有晋升到老年代对象。

CMS 回收年老代详细步骤

(1)CMS 的年老代收集

发生两次 stop the world 事件:初始标记和重新标记。当年老代达到特定的占用比例时,CMS 开始执行。

img

  • 初始标记是一个短暂暂停的、可达对象被标记的阶段。
  • 并发标记寻找活跃对象在应用连续执行时。
  • 最后,在重新标记阶段,寻找在之前并发标记阶段中丢失的对象。

(2)年老代收集-并发清除

在之前阶段没有被标记的对象会被就地释放。不进行压缩操作。

img
注意:未被标记的对象等于死亡对象

(3)年老代收集-清除之后

清除阶段之后,你可以看到大量内存被释放。你还可以注意到没有进行压缩操作。

img
最后,CMS 收集器会再次进入重新设置阶段,等待下一次垃圾收集时机的到来。

CMS 特点

CMS 收集器具有以下缺点:

  • 并发收集 - 并发指的是用户线程和 GC 线程同时运行。
  • 吞吐量低 - 低停顿时间是以牺牲吞吐量为代价的,导致 CPU 利用率不够高。
  • 无法处理浮动垃圾 - 可能出现 Concurrent Mode Failure。浮动垃圾是指并发清除阶段由于用户线程继续运行而产生的垃圾,这部分垃圾只能到下一次 GC 时才能进行回收。由于浮动垃圾的存在,因此需要预留出一部分内存,意味着 CMS 收集不能像其它收集器那样等待老年代快满的时候再回收。
    • 可以使用 -XX:CMSInitiatingOccupancyFraction 来改变触发 CMS 收集器工作的内存占用百分,如果这个值设置的太大,导致预留的内存不够存放浮动垃圾,就会出现 Concurrent Mode Failure,这时虚拟机将临时启用 Serial Old 收集器来替代 CMS 收集器。
  • 标记 - 清除算法导致的空间碎片,往往出现老年代空间剩余,但无法找到足够大连续空间来分配当前对象,不得不提前触发一次 Full GC。
    • 可以使用 -XX:+UseCMSCompactAtFullCollection ,用于在 CMS 收集器要进行 Full GC 时开启内存碎片的合并整理,内存整理的过程是无法并发的,空间碎片问题没有了,但是停顿时间不得不变长了。
    • 可以使用 -XX:CMSFullGCsBeforeCompaction ,用于设置执行多少次不压缩的 Full GC 后,来一次带压缩的(默认为 0,表示每次进入 Full GC 时都要进行碎片整理)。

ParNew 收集器

开启选项:-XX:+UseParNewGC

ParNew 收集器其实是 Serial 收集器的多线程版本。

ParNew 收集器运行示意图

是 Server 模式下的虚拟机首选年轻代收集器,除了性能原因外,主要是因为除了 Serial 收集器,只有它能与 CMS 收集器配合工作。

ParNew 收集器也是使用 -XX:+UseConcMarkSweepGC 后的默认年轻代收集器。

ParNew 收集器默认开启的线程数量与 CPU 数量相同,可以使用 -XX:ParallelGCThreads 参数来设置线程数。

G1 收集器

开启选项:-XX:+UseG1GC

前面提到的垃圾收集器一般策略是关注吞吐量或停顿时间。而 G1 是一种兼顾吞吐量和停顿时间的 GC 收集器。G1 是 Oracle JDK9 以后的默认 GC 收集器。G1 可以直观的设定停顿时间的目标,相比于 CMS GC,G1 未必能做到 CMS 在最好情况下的延时停顿,但是最差情况要好很多。

G1 最大的特点是引入分区的思路,弱化了分代的概念,合理利用垃圾收集各个周期的资源,解决了其他收集器甚至 CMS 的众多缺陷。

分代和分区

旧的垃圾收集器一般采取分代收集,Java 堆被分为年轻代、老年代和永久代。收集的范围都是整个年轻代或者整个老年代。

G1 取消了永久代,并把年轻代和老年代划分成多个大小相等的独立区域(Region),年轻代和老年代不再物理隔离。G1 可以直接对年轻代和老年代一起回收。

通过引入 Region 的概念,从而将原来的一整块内存空间划分成多个的小空间,使得每个小空间可以单独进行垃圾回收。这种划分方法带来了很大的灵活性,使得可预测的停顿时间模型成为可能。通过记录每个 Region 垃圾回收时间以及回收所获得的空间(这两个值是通过过去回收的经验获得),并维护一个优先列表,每次根据允许的收集时间,优先回收价值最大的 Region。

每个 Region 都有一个 Remembered Set,用来记录该 Region 对象的引用对象所在的 Region。通过使用 Remembered Set,在做可达性分析的时候就可以避免全堆扫描。

G1 回收机制

G1 收集器运行示意图

如果不计算维护 Remembered Set 的操作,G1 收集器的运作大致可划分为以下几个步骤:

  1. 初始标记
  2. 并发标记
  3. 最终标记 - 为了修正在并发标记期间因用户程序继续运作而导致标记产生变动的那一部分标记记录,虚拟机将这段时间对象变化记录在线程的 Remembered Set Logs 里面,最终标记阶段需要把 Remembered Set Logs 的数据合并到 Remembered Set 中。这阶段需要停顿线程,但是可并行执行。
  4. 筛选回收 - 首先对各个 Region 中的回收价值和成本进行排序,根据用户所期望的 GC 停顿是时间来制定回收计划。此阶段其实也可以做到与用户程序一起并发执行,但是因为只回收一部分 Region,时间是用户可控制的,而且停顿用户线程将大幅度提高收集效率。

具备如下特点:

  • 空间整合:整体来看是基于“标记 - 整理”算法实现的收集器,从局部(两个 Region 之间)上来看是基于“复制”算法实现的,这意味着运行期间不会产生内存空间碎片。
  • 可预测的停顿:能让使用者明确指定在一个长度为 M 毫秒的时间片段内,消耗在 GC 上的时间不得超过 N 毫秒。

G1 回收年轻代详细步骤

(1)G1 初始堆空间

堆空间是一个被分成许多固定大小区域的内存块。

img
Java 虚拟机启动时选定区域大小。Java 虚拟机通常会指定 2000 个左右的大小相等、每个大小范围在 1 到 32M 的区域。

(2)G1 堆空间分配

实际上,这些区域被映射成 Eden、Survivor、年老代空间的逻辑表述形式。

img
图片中的颜色表明了哪个区域被关联上什么角色。活跃对象从一个区域疏散(复制、移动)到另一个区域。区域被设计为并行的方式收集,可以暂停或者不暂停所有的其它用户线程。

明显的区域可以被分配成 Eden、Survivor、Old 区域。另外,有第四种类型的区域叫做*极大区域(Humongous regions)*。这些区域被设计成保持标准区域大小的 50%或者更大的对象。它们被保存在一个连续的区域集合里。最后,最后一个类型的区域就是堆空间里没有使用的区域。

注意:写作此文章时,收集极大对象时还没有被优化。因此,你应该避免创建这个大小的对象。

(3)G1 的年轻代

堆空间被分割成大约 2000 个区域。最小 1M,最大 32M,蓝色区域保持年老代对象,绿色区域保持年轻代对象。

img
注意:区域没有必要像旧的收集器一样是保持连续的。

(4)G1 的年轻代收集

活跃对象会被疏散(复制、移动)到一个或多个 survivor 区域。如果达到晋升总阈值,对象会晋升到年老代区域。

img
这是一个 stop the world 暂停。为下一次年轻代垃圾回收计算 Eden 和 Survivor 的大小。保留审计信息有助于计算大小。类似目标暂停时间的事情会被考虑在内。

这个方法使重调区域大小变得很容易,按需把它们调大或调小。

(5)G1 年轻代回收的尾声

活跃对象被疏散到 Survivor 或者年老代区域。

img
最近晋升的对象显示为深蓝色。Survivor 区域显示为绿色。

关于 G1 的年轻代回收做以下总结:

  • 堆空间是一块单独的内存空间被分割成多个区域。
  • 年轻代内存是由一组非连续的区域组成。这使得需要重调大小变得容易。
  • 年轻代垃圾回收是 stop the world 事件,所有应用线程都会因此操作暂停。
  • 年轻代垃圾收集使用多线程并行回收。
  • 活跃对象被复制到新的 Survivor 区或者年老代区域。

G1 回收年老代详细步骤

(1)初始标记阶段

年轻代垃圾收集肩负着活跃对象初始标记的任务。在日志文件中被标为GC pause (young)(inital-mark)

img
(2)并发标记阶段

如果发现空区域(“X”标示的),在重新标记阶段它们会被马上清除掉。当然,决定活性的审计信息也在此时被计算。

img
(3)重新标记阶段

空的区域被清除和回收掉。所有区域的活性在此时计算。

img
(4)复制/清理阶段

G1 选择活性最低的区域,这些区域能够以最快的速度回收。然后这些区域会在年轻代垃圾回收过程中被回收。在日志中被指示为*[GC pause (mixed)]*。所以年轻代和年老代在同一时间被回收。

img
(5)复制/清理阶段之后

被选择的区域已经被回收和压缩到图中显示的深蓝色区和深绿色区中。

img

总结

收集器 串行/并行/并发 年轻代/老年代 收集算法 目标 适用场景
Serial 串行 年轻代 复制 响应速度优先 单 CPU 环境下的 Client 模式
Serial Old 串行 老年代 标记-整理 响应速度优先 单 CPU 环境下的 Client 模式、CMS 的后备预案
ParNew 串行 + 并行 年轻代 复制算法 响应速度优先 多 CPU 环境时在 Server 模式下与 CMS 配合
Parallel Scavenge 串行 + 并行 年轻代 复制算法 吞吐量优先 在后台运算而不需要太多交互的任务
Parallel Old 串行 + 并行 老年代 标记-整理 吞吐量优先 在后台运算而不需要太多交互的任务
CMS 并行 + 并发 老年代 标记-清除 响应速度优先 集中在互联网站或 B/S 系统服务端上的 Java 应用
G1 并行 + 并发 年轻代 + 老年代 标记-整理 + 复制算法 响应速度优先 面向服务端应用,将来替换 CMS

内存分配与回收策略

对象的内存分配,也就是在堆上分配。主要分配在年轻代的 Eden 区上,少数情况下也可能直接分配在老年代中。

Minor GC

Eden 区空间不足时,触发 Minor GC

Minor GC 发生在年轻代上,因为年轻代对象存活时间很短,因此 Minor GC 会频繁执行,执行的速度一般也会比较快。

Minor GC 工作流程:

  1. Java 应用不断创建对象,通常都是分配在 Eden 区域,当其空间不足时(达到设定的阈值),触发 minor GC。仍然被引用的对象(绿色方块)存活下来,被复制到 JVM 选择的 Survivor 区域,而没有被引用的对象(黄色方块)则被回收。

  2. 经过一次 Minor GC,Eden 就会空闲下来,直到再次达到 Minor GC 触发条件。这时候,另外一个 Survivor 区域则会成为 To 区域,Eden 区域的存活对象和 From 区域对象,都会被复制到 To 区域,并且存活的年龄计数会被加 1。

  3. 类似第二步的过程会发生很多次,直到有对象年龄计数达到阈值,这时候就会发生所谓的晋升(Promotion)过程,如下图所示,超过阈值的对象会被晋升到老年代。这个阈值是可以通过 -XX:MaxTenuringThreshold 参数指定。

Full GC

Full GC 发生在老年代上,老年代对象和年轻代的相反,其存活时间长,因此 Full GC 很少执行,而且执行速度会比 Minor GC 慢很多。

内存分配策略

(一)对象优先在 Eden 分配

大多数情况下,对象在年轻代 Eden 区分配,当 Eden 区空间不够时,发起 Minor GC。

(二)大对象直接进入老年代

大对象是指需要连续内存空间的对象,最典型的大对象是那种很长的字符串以及数组。

经常出现大对象会提前触发垃圾收集以获取足够的连续空间分配给大对象。

-XX:PretenureSizeThreshold,大于此值的对象直接在老年代分配,避免在 Eden 区和 Survivor 区之间的大量内存复制。

(三)长期存活的对象进入老年代

为对象定义年龄计数器,对象在 Eden 出生并经过 Minor GC 依然存活,将移动到 Survivor 中,年龄就增加 1 岁,增加到一定年龄则移动到老年代中。

-XX:MaxTenuringThreshold 用来定义年龄的阈值。

(四)动态对象年龄判定

虚拟机并不是永远地要求对象的年龄必须达到 MaxTenuringThreshold 才能晋升老年代,如果在 Survivor 区中相同年龄所有对象大小的总和大于 Survivor 空间的一半,则年龄大于或等于该年龄的对象可以直接进入老年代,无需等到 MaxTenuringThreshold 中要求的年龄。

(五)空间分配担保

在发生 Minor GC 之前,虚拟机先检查老年代最大可用的连续空间是否大于年轻代所有对象总空间,如果条件成立的话,那么 Minor GC 可以确认是安全的;如果不成立的话虚拟机会查看 HandlePromotionFailure 设置值是否允许担保失败,如果允许那么就会继续检查老年代最大可用的连续空间是否大于历次晋升到老年代对象的平均大小,如果大于,将尝试着进行一次 Minor GC,尽管这次 Minor GC 是有风险的;如果小于,或者 HandlePromotionFailure 设置不允许冒险,那这时也要改为进行一次 Full GC。

Full GC 的触发条件

对于 Minor GC,其触发条件非常简单,当 Eden 区空间满时,就将触发一次 Minor GC。而 Full GC 则相对复杂,有以下条件:

(1)调用 System.gc()

此方法的调用是建议虚拟机进行 Full GC,虽然只是建议而非一定,但很多情况下它会触发 Full GC,从而增加 Full GC 的频率,也即增加了间歇性停顿的次数。因此强烈建议能不使用此方法就不要使用,让虚拟机自己去管理它的内存。可通过 -XX:DisableExplicitGC 来禁止 RMI 调用 System.gc()

(2)老年代空间不足

老年代空间不足的常见场景为前文所讲的大对象直接进入老年代、长期存活的对象进入老年代等,当执行 Full GC 后空间仍然不足,则抛出 java.lang.OutOfMemoryError: Java heap space。为避免以上原因引起的 Full GC,调优时应尽量做到让对象在 Minor GC 阶段被回收、让对象在年轻代多存活一段时间以及不要创建过大的对象及数组。

(3)方法区空间不足

JVM 规范中运行时数据区域中的方法区,在 HotSpot 虚拟机中又被习惯称为永久代,永久代中存放的是类的描述信息、常量、静态变量等数据,当系统中要加载的类、反射的类和调用的方法较多时,永久代可能会被占满,在未配置为采用 CMS GC 的情况下也会执行 Full GC。如果经过 Full GC 仍然回收不了,那么 JVM 会抛出 java.lang.OutOfMemoryError: PermGen space 错误。为避免永久代占满造成 Full GC 现象,可采用的方法为增大 Perm Gen 空间或转为使用 CMS GC。

(4)Minor GC 的平均晋升空间大小大于老年代可用空间

如果发现统计数据说之前 Minor GC 的平均晋升大小比目前老年代剩余的空间大,则不会触发 Minor GC 而是转为触发 Full GC。

(5)对象大小大于 To 区和老年代的可用内存

Eden 区、From 区向 To 区复制时,对象大小大于 To 区可用内存,则把该对象转存到老年代,且老年代的可用内存小于该对象大小。

参考资料

深入理解 Java 反射和动态代理

反射简介

img

什么是反射

反射(Reflection)是 Java 程序开发语言的特征之一,它允许运行中的 Java 程序获取自身的信息,并且可以操作类或对象的内部属性。

通过反射机制,可以在运行时访问 Java 对象的属性,方法,构造方法等。

反射的应用场景

反射的主要应用场景有:

  • 开发通用框架 - 反射最重要的用途就是开发各种通用框架。很多框架(比如 Spring)都是配置化的(比如通过 XML 文件配置 JavaBean、Filter 等),为了保证框架的通用性,它们可能需要根据配置文件加载不同的对象或类,调用不同的方法,这个时候就必须用到反射——运行时动态加载需要加载的对象。
  • 动态代理 - 在切面编程(AOP)中,需要拦截特定的方法,通常,会选择动态代理方式。这时,就需要反射技术来实现了。
  • 注解 - 注解本身仅仅是起到标记作用,它需要利用反射机制,根据注解标记去调用注解解释器,执行行为。如果没有反射机制,注解并不比注释更有用。
  • 可扩展性功能 - 应用程序可以通过使用完全限定名称创建可扩展性对象实例来使用外部的用户定义类。

反射的缺点

  • 性能开销 - 由于反射涉及动态解析的类型,因此无法执行某些 Java 虚拟机优化。因此,反射操作的性能要比非反射操作的性能要差,应该在性能敏感的应用程序中频繁调用的代码段中避免。
  • 破坏封装性 - 反射调用方法时可以忽略权限检查,因此可能会破坏封装性而导致安全问题。
  • 内部曝光 - 由于反射允许代码执行在非反射代码中非法的操作,例如访问私有字段和方法,所以反射的使用可能会导致意想不到的副作用,这可能会导致代码功能失常并可能破坏可移植性。反射代码打破了抽象,因此可能会随着平台的升级而改变行为。

反射机制

类加载过程

img

类加载的完整过程如下:

  1. 在编译时,Java 编译器编译好 .java 文件之后,在磁盘中产生 .class 文件。.class 文件是二进制文件,内容是只有 JVM 能够识别的机器码。
  2. JVM 中的类加载器读取字节码文件,取出二进制数据,加载到内存中,解析.class 文件内的信息。类加载器会根据类的全限定名来获取此类的二进制字节流;然后,将字节流所代表的静态存储结构转化为方法区的运行时数据结构;接着,在内存中生成代表这个类的 java.lang.Class 对象。
  3. 加载结束后,JVM 开始进行连接阶段(包含验证、准备、初始化)。经过这一系列操作,类的变量会被初始化。

Class 对象

要想使用反射,首先需要获得待操作的类所对应的 Class 对象。Java 中,无论生成某个类的多少个对象,这些对象都会对应于同一个 Class 对象。这个 Class 对象是由 JVM 生成的,通过它能够获悉整个类的结构。所以,java.lang.Class 可以视为所有反射 API 的入口点。

反射的本质就是:在运行时,把 Java 类中的各种成分映射成一个个的 Java 对象。

举例来说,假如定义了以下代码:

1
User user = new User();

步骤说明:

  1. JVM 加载方法的时候,遇到 new User(),JVM 会根据 User 的全限定名去加载 User.class
  2. JVM 会去本地磁盘查找 User.class 文件并加载 JVM 内存中。
  3. JVM 通过调用类加载器自动创建这个类对应的 Class 对象,并且存储在 JVM 的方法区。注意:一个类有且只有一个 Class 对象

方法的反射调用

方法的反射调用,也就是 Method.invoke 方法。

Method.invoke 方法源码:

1
2
3
4
5
6
7
8
9
10
11
public final class Method extends Executable {
...
public Object invoke(Object obj, Object... args) throws ... {
... // 权限检查
MethodAccessor ma = methodAccessor;
if (ma == null) {
ma = acquireMethodAccessor();
}
return ma.invoke(obj, args);
}
}

Method.invoke 方法实际上委派给 MethodAccessor 接口来处理。它有两个已有的具体实现:

  • NativeMethodAccessorImpl:本地方法来实现反射调用
  • DelegatingMethodAccessorImpl:委派模式来实现反射调用

每个 Method 实例的第一次反射调用都会生成一个委派实现(DelegatingMethodAccessorImpl),它所委派的具体实现便是一个本地实现(NativeMethodAccessorImpl)。本地实现非常容易理解。当进入了 Java 虚拟机内部之后,我们便拥有了 Method 实例所指向方法的具体地址。这时候,反射调用无非就是将传入的参数准备好,然后调用进入目标方法。

【示例】通过抛出异常方式 打印 Method.invoke 调用轨迹

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class MethodDemo01 {

public static void target(int i) {
new Exception("#" + i).printStackTrace();
}

public static void main(String[] args) throws Exception {
Class<?> clazz = Class.forName("io.github.dunwu.javacore.reflect.MethodDemo01");
Method method = clazz.getMethod("target", int.class);
method.invoke(null, 0);
}

}
// Output:
// java.lang.Exception: #0
// at io.github.dunwu.javacore.reflect.MethodDemo01.target(MethodDemo01.java:12)
// at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
// at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

先调用 DelegatingMethodAccessorImpl;然后调用 NativeMethodAccessorImpl,最后调用实际方法。

为什么反射调用DelegatingMethodAccessorImpl 作为中间层,而不是直接交给本地实现?

其实,Java 的反射调用机制还设立了另一种动态生成字节码的实现(下称动态实现),直接使用 invoke 指令来调用目标方法。之所以采用委派实现,便是为了能够在本地实现以及动态实现中切换。动态实现和本地实现相比,其运行效率要快上 20 倍。这是因为动态实现无需经过 Java 到 C++ 再到 Java 的切换,但由于生成字节码十分耗时,仅调用一次的话,反而是本地实现要快上 3 到 4 倍。

考虑到许多反射调用仅会执行一次,Java 虚拟机设置了一个阈值 15(可以通过 -Dsun.reflect.inflationThreshold 来调整),当某个反射调用的调用次数在 15 之下时,采用本地实现;当达到 15 时,便开始动态生成字节码,并将委派实现的委派对象切换至动态实现,这个过程我们称之为 Inflation。

【示例】执行 java -verbose:class MethodDemo02 启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class MethodDemo02 {

public static void target(int i) {
new Exception("#" + i).printStackTrace();
}

public static void main(String[] args) throws Exception {
Class<?> klass = Class.forName("io.github.dunwu.javacore.reflect.MethodDemo02");
Method method = klass.getMethod("target", int.class);
for (int i = 0; i < 20; i++) {
method.invoke(null, i);
}
}

}

输出内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// ...省略
java.lang.Exception: #14
at io.github.dunwu.javacore.reflect.MethodDemo02.target(MethodDemo02.java:13)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at io.github.dunwu.javacore.reflect.MethodDemo02.main(MethodDemo02.java:20)
[Loaded sun.reflect.ClassFileConstants from D:\Tools\Java\jdk1.8.0_192\jre\lib\rt.jar]
[Loaded sun.reflect.AccessorGenerator from D:\Tools\Java\jdk1.8.0_192\jre\lib\rt.jar]
[Loaded sun.reflect.MethodAccessorGenerator from D:\Tools\Java\jdk1.8.0_192\jre\lib\rt.jar]
[Loaded sun.reflect.ByteVectorFactory from D:\Tools\Java\jdk1.8.0_192\jre\lib\rt.jar]
[Loaded sun.reflect.ByteVector from D:\Tools\Java\jdk1.8.0_192\jre\lib\rt.jar]
[Loaded sun.reflect.ByteVectorImpl from D:\Tools\Java\jdk1.8.0_192\jre\lib\rt.jar]
[Loaded sun.reflect.ClassFileAssembler from D:\Tools\Java\jdk1.8.0_192\jre\lib\rt.jar]
[Loaded sun.reflect.UTF8 from D:\Tools\Java\jdk1.8.0_192\jre\lib\rt.jar]
[Loaded sun.reflect.Label from D:\Tools\Java\jdk1.8.0_192\jre\lib\rt.jar]
[Loaded sun.reflect.Label$PatchInfo from D:\Tools\Java\jdk1.8.0_192\jre\lib\rt.jar]
[Loaded java.util.ArrayList$Itr from D:\Tools\Java\jdk1.8.0_192\jre\lib\rt.jar]
[Loaded sun.reflect.MethodAccessorGenerator$1 from D:\Tools\Java\jdk1.8.0_192\jre\lib\rt.jar]
[Loaded sun.reflect.ClassDefiner from D:\Tools\Java\jdk1.8.0_192\jre\lib\rt.jar]
[Loaded sun.reflect.ClassDefiner$1 from D:\Tools\Java\jdk1.8.0_192\jre\lib\rt.jar]
[Loaded sun.reflect.GeneratedMethodAccessor1 from __JVM_DefineClass__]
java.lang.Exception: #15
at io.github.dunwu.javacore.reflect.MethodDemo02.target(MethodDemo02.java:13)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at io.github.dunwu.javacore.reflect.MethodDemo02.main(MethodDemo02.java:20)
[Loaded java.util.concurrent.ConcurrentHashMap$ForwardingNode from D:\Tools\Java\jdk1.8.0_192\jre\lib\rt.jar]
java.lang.Exception: #16
at io.github.dunwu.javacore.reflect.MethodDemo02.target(MethodDemo02.java:13)
at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at io.github.dunwu.javacore.reflect.MethodDemo02.main(MethodDemo02.java:20)
// ...省略

可以看到,从第 16 次开始后,都是使用 DelegatingMethodAccessorImpl ,不再使用本地实现 NativeMethodAccessorImpl

反射调用的开销

方法的反射调用会带来不少性能开销,原因主要有三个:

  • 变长参数方法导致的 Object 数组
  • 基本类型的自动装箱、拆箱
  • 还有最重要的方法内联

Class.forName 会调用本地方法,Class.getMethod 则会遍历该类的公有方法。如果没有匹配到,它还将遍历父类的公有方法。可想而知,这两个操作都非常费时。

注意,以 getMethod 为代表的查找方法操作,会返回查找得到结果的一份拷贝。因此,我们应当避免在热点代码中使用返回 Method 数组的 getMethods 或者 getDeclaredMethods 方法,以减少不必要的堆空间消耗。在实践中,我们往往会在应用程序中缓存 Class.forNameClass.getMethod 的结果。

下面只关注反射调用本身的性能开销。

第一,由于 Method.invoke 是一个变长参数方法,在字节码层面它的最后一个参数会是 Object 数组(感兴趣的同学私下可以用 javap 查看)。Java 编译器会在方法调用处生成一个长度为传入参数数量的 Object 数组,并将传入参数一一存储进该数组中。

第二,由于 Object 数组不能存储基本类型,Java 编译器会对传入的基本类型参数进行自动装箱。

这两个操作除了带来性能开销外,还可能占用堆内存,使得 GC 更加频繁。(如果你感兴趣的话,可以用虚拟机参数 -XX:+PrintGC 试试。)那么,如何消除这部分开销呢?

使用反射

java.lang.reflect 包

Java 中的 java.lang.reflect 包提供了反射功能。java.lang.reflect 包中的类都没有 public 构造方法。

java.lang.reflect 包的核心接口和类如下:

  • Member 接口:反映关于单个成员(字段或方法)或构造函数的标识信息。
  • Field 类:提供一个类的域的信息以及访问类的域的接口。
  • Method 类:提供一个类的方法的信息以及访问类的方法的接口。
  • Constructor 类:提供一个类的构造函数的信息以及访问类的构造函数的接口。
  • Array 类:该类提供动态地生成和访问 JAVA 数组的方法。
  • Modifier 类:提供了 static 方法和常量,对类和成员访问修饰符进行解码。
  • Proxy 类:提供动态地生成代理类和类实例的静态方法。

获取 Class 对象

获取 Class 对象的三种方法:

(1)**Class.forName 静态方法**

【示例】使用 Class.forName 静态方法获取 Class 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package io.github.dunwu.javacore.reflect;

public class ReflectClassDemo01 {
public static void main(String[] args) throws ClassNotFoundException {
Class c1 = Class.forName("io.github.dunwu.javacore.reflect.ReflectClassDemo01");
System.out.println(c1.getCanonicalName());

Class c2 = Class.forName("[D");
System.out.println(c2.getCanonicalName());

Class c3 = Class.forName("[[Ljava.lang.String;");
System.out.println(c3.getCanonicalName());
}
}
//Output:
//io.github.dunwu.javacore.reflect.ReflectClassDemo01
//double[]
//java.lang.String[][]

使用类的完全限定名来反射对象的类。常见的应用场景为:在 JDBC 开发中常用此方法加载数据库驱动。

(2)类名 + .class

【示例】直接用类名 + .class 获取 Class 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ReflectClassDemo02 {
public static void main(String[] args) {
boolean b;
// Class c = b.getClass(); // 编译错误
Class c1 = boolean.class;
System.out.println(c1.getCanonicalName());

Class c2 = java.io.PrintStream.class;
System.out.println(c2.getCanonicalName());

Class c3 = int[][][].class;
System.out.println(c3.getCanonicalName());
}
}
//Output:
//boolean
//java.io.PrintStream
//int[][][]

(3)**ObjectgetClass 方法**

Object 类中有 getClass 方法,因为所有类都继承 Object 类。从而调用 Object 类来获取 Class 对象。

【示例】ObjectgetClass 方法获取 Class 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package io.github.dunwu.javacore.reflect;

import java.util.HashSet;
import java.util.Set;

public class ReflectClassDemo03 {
enum E {A, B}

public static void main(String[] args) {
Class c = "foo".getClass();
System.out.println(c.getCanonicalName());

Class c2 = ReflectClassDemo03.E.A.getClass();
System.out.println(c2.getCanonicalName());

byte[] bytes = new byte[1024];
Class c3 = bytes.getClass();
System.out.println(c3.getCanonicalName());

Set<String> set = new HashSet<>();
Class c4 = set.getClass();
System.out.println(c4.getCanonicalName());
}
}
//Output:
//java.lang.String
//io.github.dunwu.javacore.reflect.ReflectClassDemo.E
//byte[]
//java.util.HashSet

判断是否为某个类的实例

判断是否为某个类的实例有两种方式:

  1. instanceof 关键字
  2. Class 对象的 isInstance 方法(它是一个 Native 方法)

【示例】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class InstanceofDemo {
public static void main(String[] args) {
ArrayList arrayList = new ArrayList();
if (arrayList instanceof List) {
System.out.println("ArrayList is List");
}
if (List.class.isInstance(arrayList)) {
System.out.println("ArrayList is List");
}
}
}
//Output:
//ArrayList is List
//ArrayList is List

创建实例

通过反射来创建实例对象主要有两种方式:

  • Class 对象的 newInstance 方法。
  • Constructor 对象的 newInstance 方法。

【示例】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class NewInstanceDemo {
public static void main(String[] args)
throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
Class<?> c1 = StringBuilder.class;
StringBuilder sb = (StringBuilder) c1.newInstance();
sb.append("aaa");
System.out.println(sb.toString());

//获取String所对应的Class对象
Class<?> c2 = String.class;
//获取String类带一个String参数的构造器
Constructor constructor = c2.getConstructor(String.class);
//根据构造器创建实例
String str2 = (String) constructor.newInstance("bbb");
System.out.println(str2);
}
}
//Output:
//aaa
//bbb

创建数组实例

数组在 Java 里是比较特殊的一种类型,它可以赋值给一个对象引用。Java 中,通过 Array.newInstance 创建数组的实例

【示例】利用反射创建数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ReflectArrayDemo {
public static void main(String[] args) throws ClassNotFoundException {
Class<?> cls = Class.forName("java.lang.String");
Object array = Array.newInstance(cls, 25);
//往数组里添加内容
Array.set(array, 0, "Scala");
Array.set(array, 1, "Java");
Array.set(array, 2, "Groovy");
Array.set(array, 3, "Scala");
Array.set(array, 4, "Clojure");
//获取某一项的内容
System.out.println(Array.get(array, 3));
}
}
//Output:
//Scala

其中的 Array 类为 java.lang.reflect.Array 类。我们Array.newInstance 的原型是:

1
2
3
4
public static Object newInstance(Class<?> componentType, int length)
throws NegativeArraySizeException {
return newArray(componentType, length);
}

Field

Class 对象提供以下方法获取对象的成员(Field):

  • getFiled - 根据名称获取公有的(public)类成员。
  • getDeclaredField - 根据名称获取已声明的类成员。但不能得到其父类的类成员。
  • getFields - 获取所有公有的(public)类成员。
  • getDeclaredFields - 获取所有已声明的类成员。

示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ReflectFieldDemo {
class FieldSpy<T> {
public boolean[][] b = { {false, false}, {true, true} };
public String name = "Alice";
public List<Integer> list;
public T val;
}

public static void main(String[] args) throws NoSuchFieldException {
Field f1 = FieldSpy.class.getField("b");
System.out.format("Type: %s%n", f1.getType());

Field f2 = FieldSpy.class.getField("name");
System.out.format("Type: %s%n", f2.getType());

Field f3 = FieldSpy.class.getField("list");
System.out.format("Type: %s%n", f3.getType());

Field f4 = FieldSpy.class.getField("val");
System.out.format("Type: %s%n", f4.getType());
}
}
//Output:
//Type: class [[Z
//Type: class java.lang.String
//Type: interface java.util.List
//Type: class java.lang.Object

Method

Class 对象提供以下方法获取对象的方法(Method):

  • getMethod - 返回类或接口的特定方法。其中第一个参数为方法名称,后面的参数为方法参数对应 Class 的对象。
  • getDeclaredMethod - 返回类或接口的特定声明方法。其中第一个参数为方法名称,后面的参数为方法参数对应 Class 的对象。
  • getMethods - 返回类或接口的所有 public 方法,包括其父类的 public 方法。
  • getDeclaredMethods - 返回类或接口声明的所有方法,包括 public、protected、默认(包)访问和 private 方法,但不包括继承的方法。

获取一个 Method 对象后,可以用 invoke 方法来调用这个方法。

invoke 方法的原型为:

1
2
3
public Object invoke(Object obj, Object... args)
throws IllegalAccessException, IllegalArgumentException,
InvocationTargetException

【示例】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ReflectMethodDemo {
public static void main(String[] args)
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {

// 返回所有方法
Method[] methods1 = System.class.getDeclaredMethods();
System.out.println("System getDeclaredMethods 清单(数量 = " + methods1.length + "):");
for (Method m : methods1) {
System.out.println(m);
}

// 返回所有 public 方法
Method[] methods2 = System.class.getMethods();
System.out.println("System getMethods 清单(数量 = " + methods2.length + "):");
for (Method m : methods2) {
System.out.println(m);
}

// 利用 Method 的 invoke 方法调用 System.currentTimeMillis()
Method method = System.class.getMethod("currentTimeMillis");
System.out.println(method);
System.out.println(method.invoke(null));
}
}

Constructor

Class 对象提供以下方法获取对象的构造方法(Constructor):

  • getConstructor - 返回类的特定 public 构造方法。参数为方法参数对应 Class 的对象。
  • getDeclaredConstructor - 返回类的特定构造方法。参数为方法参数对应 Class 的对象。
  • getConstructors - 返回类的所有 public 构造方法。
  • getDeclaredConstructors - 返回类的所有构造方法。

获取一个 Constructor 对象后,可以用 newInstance 方法来创建类实例。

【示例】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ReflectMethodConstructorDemo {
public static void main(String[] args)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
Constructor<?>[] constructors1 = String.class.getDeclaredConstructors();
System.out.println("String getDeclaredConstructors 清单(数量 = " + constructors1.length + "):");
for (Constructor c : constructors1) {
System.out.println(c);
}

Constructor<?>[] constructors2 = String.class.getConstructors();
System.out.println("String getConstructors 清单(数量 = " + constructors2.length + "):");
for (Constructor c : constructors2) {
System.out.println(c);
}

System.out.println("====================");
Constructor constructor = String.class.getConstructor(String.class);
System.out.println(constructor);
String str = (String) constructor.newInstance("bbb");
System.out.println(str);
}
}

绕开访问限制

有时候,我们需要通过反射访问私有成员、方法。可以使用 Constructor/Field/Method.setAccessible(true) 来绕开 Java 语言的访问限制。

动态代理

动态代理是一种方便运行时动态构建代理、动态处理代理方法调用的机制,很多场景都是利用类似机制做到的,比如用来包装 RPC 调用、面向切面的编程(AOP)。

实现动态代理的方式很多,比如 JDK 自身提供的动态代理,就是主要利用了上面提到的反射机制。还有其他的实现方式,比如利用传说中更高性能的字节码操作机制,类似 ASM、cglib(基于 ASM)、Javassist 等。

img

静态代理

静态代理其实就是指设计模式中的代理模式。

代理模式为其他对象提供一种代理以控制对这个对象的访问。

img

Subject 定义了 RealSubject 和 Proxy 的公共接口,这样就在任何使用 RealSubject 的地方都可以使用 Proxy 。

1
2
3
abstract class Subject {
public abstract void Request();
}

RealSubject 定义 Proxy 所代表的真实实体。

1
2
3
4
5
6
class RealSubject extends Subject {
@Override
public void Request() {
System.out.println("真实的请求");
}
}

Proxy 保存一个引用使得代理可以访问实体,并提供一个与 Subject 的接口相同的接口,这样代理就可以用来替代实体。

1
2
3
4
5
6
7
8
9
10
11
class Proxy extends Subject {
private RealSubject real;

@Override
public void Request() {
if (null == real) {
real = new RealSubject();
}
real.Request();
}
}

说明:

静态代理模式固然在访问无法访问的资源,增强现有的接口业务功能方面有很大的优点,但是大量使用这种静态代理,会使我们系统内的类的规模增大,并且不易维护;并且由于 Proxy 和 RealSubject 的功能本质上是相同的,Proxy 只是起到了中介的作用,这种代理在系统中的存在,导致系统结构比较臃肿和松散。

JDK 动态代理

为了解决静态代理的问题,就有了创建动态代理的想法:

在运行状态中,需要代理的地方,根据 Subject 和 RealSubject,动态地创建一个 Proxy,用完之后,就会销毁,这样就可以避免了 Proxy 角色的 class 在系统中冗杂的问题了。

img

Java 动态代理基于经典代理模式,引入了一个 InvocationHandlerInvocationHandler 负责统一管理所有的方法调用。

动态代理步骤:

  1. 获取 RealSubject 上的所有接口列表;
  2. 确定要生成的代理类的类名,默认为:com.sun.proxy.$ProxyXXXX
  3. 根据需要实现的接口信息,在代码中动态创建 该 Proxy 类的字节码;
  4. 将对应的字节码转换为对应的 class 对象;
  5. 创建 InvocationHandler 实例 handler,用来处理 Proxy 所有方法调用;
  6. Proxy 的 class 对象 以创建的 handler 对象为参数,实例化一个 proxy 对象。

从上面可以看出,JDK 动态代理的实现是基于实现接口的方式,使得 Proxy 和 RealSubject 具有相同的功能。

但其实还有一种思路:通过继承。即:让 Proxy 继承 RealSubject,这样二者同样具有相同的功能,Proxy 还可以通过重写 RealSubject 中的方法,来实现多态。CGLIB 就是基于这种思路设计的。

在 Java 的动态代理机制中,有两个重要的类(接口),一个是 InvocationHandler 接口、另一个则是 Proxy 类,这一个类和一个接口是实现我们动态代理所必须用到的。

InvocationHandler 接口

InvocationHandler 接口定义:

1
2
3
4
public interface InvocationHandler {
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable;
}

每一个动态代理类都必须要实现 InvocationHandler 这个接口,并且每个代理类的实例都关联到了一个 Handler,当我们通过代理对象调用一个方法的时候,这个方法的调用就会被转发为由 InvocationHandler 这个接口的 invoke 方法来进行调用。

我们来看看 InvocationHandler 这个接口的唯一一个方法 invoke 方法:

1
Object invoke(Object proxy, Method method, Object[] args) throws Throwable

参数说明:

  • proxy - 代理的真实对象。
  • method - 所要调用真实对象的某个方法的 Method 对象
  • args - 所要调用真实对象某个方法时接受的参数

如果不是很明白,等下通过一个实例会对这几个参数进行更深的讲解。

Proxy 类

Proxy 这个类的作用就是用来动态创建一个代理对象的类,它提供了许多的方法,但是我们用的最多的就是 newProxyInstance 这个方法:

1
public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces,  InvocationHandler h)  throws IllegalArgumentException

这个方法的作用就是得到一个动态的代理对象。

参数说明:

  • loader - 一个 ClassLoader 对象,定义了由哪个 ClassLoader 对象来对生成的代理对象进行加载。
  • interfaces - 一个 Class<?> 对象的数组,表示的是我将要给我需要代理的对象提供一组什么接口,如果我提供了一组接口给它,那么这个代理对象就宣称实现了该接口(多态),这样我就能调用这组接口中的方法了
  • h - 一个 InvocationHandler 对象,表示的是当我这个动态代理对象在调用方法的时候,会关联到哪一个 InvocationHandler 对象上

JDK 动态代理实例

上面的内容介绍完这两个接口(类)以后,我们来通过一个实例来看看我们的动态代理模式是什么样的:

首先我们定义了一个 Subject 类型的接口,为其声明了两个方法:

1
2
3
4
5
6
public interface Subject {

void hello(String str);

String bye();
}

接着,定义了一个类来实现这个接口,这个类就是我们的真实对象,RealSubject 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class RealSubject implements Subject {

@Override
public void hello(String str) {
System.out.println("Hello " + str);
}

@Override
public String bye() {
System.out.println("Goodbye");
return "Over";
}
}

下一步,我们就要定义一个动态代理类了,前面说个,每一个动态代理类都必须要实现 InvocationHandler 这个接口,因此我们这个动态代理类也不例外:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class InvocationHandlerDemo implements InvocationHandler {
// 这个就是我们要代理的真实对象
private Object subject;

// 构造方法,给我们要代理的真实对象赋初值
public InvocationHandlerDemo(Object subject) {
this.subject = subject;
}

@Override
public Object invoke(Object object, Method method, Object[] args)
throws Throwable {
// 在代理真实对象前我们可以添加一些自己的操作
System.out.println("Before method");

System.out.println("Call Method: " + method);

// 当代理对象调用真实对象的方法时,其会自动的跳转到代理对象关联的handler对象的invoke方法来进行调用
Object obj = method.invoke(subject, args);

// 在代理真实对象后我们也可以添加一些自己的操作
System.out.println("After method");
System.out.println();

return obj;
}
}

最后,来看看我们的 Client 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Client {
public static void main(String[] args) {
// 我们要代理的真实对象
Subject realSubject = new RealSubject();

// 我们要代理哪个真实对象,就将该对象传进去,最后是通过该真实对象来调用其方法的
InvocationHandler handler = new InvocationHandlerDemo(realSubject);

/*
* 通过Proxy的newProxyInstance方法来创建我们的代理对象,我们来看看其三个参数
* 第一个参数 handler.getClass().getClassLoader() ,我们这里使用handler这个类的ClassLoader对象来加载我们的代理对象
* 第二个参数realSubject.getClass().getInterfaces(),我们这里为代理对象提供的接口是真实对象所实行的接口,表示我要代理的是该真实对象,这样我就能调用这组接口中的方法了
* 第三个参数handler, 我们这里将这个代理对象关联到了上方的 InvocationHandler 这个对象上
*/
Subject subject = (Subject)Proxy.newProxyInstance(handler.getClass().getClassLoader(), realSubject
.getClass().getInterfaces(), handler);

System.out.println(subject.getClass().getName());
subject.hello("World");
String result = subject.bye();
System.out.println("Result is: " + result);
}
}

我们先来看看控制台的输出:

1
2
3
4
5
6
7
8
9
10
11
12
com.sun.proxy.$Proxy0
Before method
Call Method: public abstract void io.github.dunwu.javacore.reflect.InvocationHandlerDemo$Subject.hello(java.lang.String)
Hello World
After method

Before method
Call Method: public abstract java.lang.String io.github.dunwu.javacore.reflect.InvocationHandlerDemo$Subject.bye()
Goodbye
After method

Result is: Over

我们首先来看看 com.sun.proxy.$Proxy0 这东西,我们看到,这个东西是由 System.out.println(subject.getClass().getName()); 这条语句打印出来的,那么为什么我们返回的这个代理对象的类名是这样的呢?

1
2
Subject subject = (Subject)Proxy.newProxyInstance(handler.getClass().getClassLoader(), realSubject
.getClass().getInterfaces(), handler);

可能我以为返回的这个代理对象会是 Subject 类型的对象,或者是 InvocationHandler 的对象,结果却不是,首先我们解释一下为什么我们这里可以将其转化为 Subject 类型的对象?

原因就是:在 newProxyInstance 这个方法的第二个参数上,我们给这个代理对象提供了一组什么接口,那么我这个代理对象就会实现了这组接口,这个时候我们当然可以将这个代理对象强制类型转化为这组接口中的任意一个,因为这里的接口是 Subject 类型,所以就可以将其转化为 Subject 类型了。

同时我们一定要记住,通过 Proxy.newProxyInstance 创建的代理对象是在 jvm 运行时动态生成的一个对象,它并不是我们的 InvocationHandler 类型,也不是我们定义的那组接口的类型,而是在运行是动态生成的一个对象,并且命名方式都是这样的形式,以$开头,proxy 为中,最后一个数字表示对象的标号

接着我们来看看这两句

1
2
subject.hello("World");
String result = subject.bye();

这里是通过代理对象来调用实现的那种接口中的方法,这个时候程序就会跳转到由这个代理对象关联到的 handler 中的 invoke 方法去执行,而我们的这个 handler 对象又接受了一个 RealSubject 类型的参数,表示我要代理的就是这个真实对象,所以此时就会调用 handler 中的 invoke 方法去执行。

我们看到,在真正通过代理对象来调用真实对象的方法的时候,我们可以在该方法前后添加自己的一些操作,同时我们看到我们的这个 method 对象是这样的:

1
2
public abstract void io.github.dunwu.javacore.reflect.InvocationHandlerDemo$Subject.hello(java.lang.String)
public abstract java.lang.String io.github.dunwu.javacore.reflect.InvocationHandlerDemo$Subject.bye()

正好就是我们的 Subject 接口中的两个方法,这也就证明了当我通过代理对象来调用方法的时候,起实际就是委托由其关联到的 handler 对象的 invoke 方法中来调用,并不是自己来真实调用,而是通过代理的方式来调用的。

JDK 动态代理小结

代理类与委托类实现同一接口,主要是通过代理类实现 InvocationHandler 并重写 invoke 方法来进行动态代理的,在 invoke 方法中将对方法进行处理。

JDK 动态代理特点:

  • 优点:相对于静态代理模式,不需要硬编码接口,代码复用率高。

  • 缺点:强制要求代理类实现 InvocationHandler 接口。

CGLIB 动态代理

CGLIB 提供了与 JDK 动态代理不同的方案。很多框架,例如 Spring AOP 中,就使用了 CGLIB 动态代理。

CGLIB 底层,其实是借助了 ASM 这个强大的 Java 字节码框架去进行字节码增强操作。

CGLIB 动态代理的工作步骤:

  • 生成代理类的二进制字节码文件;
  • 加载二进制字节码,生成 Class 对象( 例如使用 Class.forName() 方法 );
  • 通过反射机制获得实例构造,并创建代理类对象。

CGLIB 动态代理特点:

优点:使用字节码增强,比 JDK 动态代理方式性能高。可以在运行时对类或者是接口进行增强操作,且委托类无需实现接口。

缺点:不能对 final 类以及 final 方法进行代理。

参考:深入理解 CGLIB 动态代理机制

参考资料

Java 面试总结

基础

工具类

String

String 类能被继承吗?

String,StringBuffer,StringBuilder 的区别。

String 类不能被继承。因为其被 final 修饰,所以无法被继承。

StringBuffer,StringBuilder 拼接字符串,使用 append 比 String 效率高。因为 String 会隐式 new String 对象。

StringBuffer 主要方法都用 synchronized 修饰,是线程安全的;而 StringBuilder 不是。

面向对象

抽象类和接口的区别?

类可以继承多个类么?接口可以继承多个接口么?类可以实现多个接口么?

类只能继承一个类,但是可以实现多个接口。接口可以继承多个接口。

继承和聚合的区别在哪?

一般,能用聚合就别用继承。

反射

⭐ 创建实例

反射创建实例有几种方式?

通过反射来创建实例对象主要有两种方式:

  • Class 对象的 newInstance 方法。
  • Constructor 对象的 newInstance 方法。

⭐ 加载实例

加载实例有几种方式?

Class.forName(“className”) 和 ClassLoader.laodClass(“className”) 有什么区别?

  • Class.forName("className") 加载的是已经初始化到 JVM 中的类。
  • ClassLoader.loadClass("className") 装载的是还没有初始化到 JVM 中的类。

⭐⭐ 动态代理

动态代理有几种实现方式?有什么特点?

JDK 动态代理和 CGLIB 动态代理有什么区别?

(1)JDK 方式

代理类与委托类实现同一接口,主要是通过代理类实现 InvocationHandler 并重写 invoke 方法来进行动态代理的,在 invoke 方法中将对方法进行处理。

JDK 动态代理特点:

  • 优点:相对于静态代理模式,不需要硬编码接口,代码复用率高。
  • 缺点:强制要求代理类实现 InvocationHandler 接口。

(2)CGLIB

CGLIB 底层,其实是借助了 ASM 这个强大的 Java 字节码框架去进行字节码增强操作。

CGLIB 动态代理特点:

优点:使用字节码增强,比 JDK 动态代理方式性能高。可以在运行时对类或者是接口进行增强操作,且委托类无需实现接口。

缺点:不能对 final 类以及 final 方法进行代理。

JDK8

其他

⭐ hashcode

==运算符了,为什么还需要 equals 啊?

说一说你对 java.lang.Object 对象中 hashCode 和 equals 方法的理解。在什么场景下需
要重新实现这两个方法。

有没有可能 2 个不相等的对象有相同的 hashcode

(1)有==运算符了,为什么还需要 equals 啊?

equals 等价于==,而==运算符是判断两个对象是不是同一个对象,即他们的地址是否相等。而覆写 equals 更多的是追求两个对象在逻辑上的相等,你可以说是值相等,也可说是内容相等

(2)说一说你对 java.lang.Object 对象中 hashCode 和 equals 方法的理解。在什么场景下需
要重新实现这两个方法。

在集合查找时,hashcode 能大大降低对象比较次数,提高查找效率!

(3)有没有可能 2 个不相等的对象有相同的 hashcode

有可能。

  • 如果两个对象 equals,Java 运行时环境会认为他们的 hashcode 一定相等。
  • 如果两个对象不 equals,他们的 hashcode 有可能相等。
  • 如果两个对象 hashcode 相等,他们不一定 equals。
  • 如果两个对象 hashcode 不相等,他们一定不 equals。

IO

NIO

什么是 NIO?

NIO 和 BIO、AIO 有何差别?

序列化

⭐ 序列化问题

序列化、反序列化有哪些问题?如何解决?

Java 的序列化能保证对象状态的持久保存,但是遇到一些对象结构复杂的情况还是难以处理,这里归纳一下:

  • 当父类继承 Serializable 接口时,所有子类都可以被序列化。
  • 子类实现了 Serializable 接口,父类没有,则父类的属性不会被序列化(不报错,数据丢失),子类的属性仍可以正确序列化。
  • 如果序列化的属性是对象,则这个对象也必须实现 Serializable 接口,否则会报错。
  • 在反序列化时,如果对象的属性有修改或删减,则修改的部分属性会丢失,但不会报错。
  • 在反序列化时,如果 serialVersionUID 被修改,则反序列化时会失败。

容器

List

ArrayList 和 LinkedList 有什么区别?

ArrayList 是数组链表,访问效率更高。

LinkedList 是双链表,数据有序存储。

Map

请描述 HashMap 的实现原理?

并发

并发简介

什么是进程?什么是线程?进程和线程的区别?

  • 什么是进程?
    • 简言之,进程可视为一个正在运行的程序。
    • 进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动。进程是操作系统进行资源分配的基本单位。
  • 什么是线程?
    • 线程是操作系统进行调度的基本单位。
  • 进程 vs. 线程
    • 一个程序至少有一个进程,一个进程至少有一个线程。
    • 线程比进程划分更细,所以执行开销更小,并发性更高。
    • 进程是一个实体,拥有独立的资源;而同一个进程中的多个线程共享进程的资源。

并发(多线程)编程的好处是什么?

  • 更有效率的利用多处理器核心
  • 更快的响应时间
  • 更好的编程模型

并发一定比串行更快吗?

答:否。

要点:创建线程和线程上下文切换有一定开销

说明:即使是单核处理器也支持多线程。CPU 通过给每个线程分配时间切片的算法来循环执行任务,当前任务执行一个时间片后会切换到下一个任务。但是,在切换前会保持上一个任务的状态,以便下次切换回这个任务时,可以再加载这个任务的状态。所以任务从保存到再加载的过程就是一次上下文切换

引申

  • 如何减少上下文切换?
    • 尽量少用锁
    • CAS 算法
    • 线程数要合理
    • 协程:在单线程中实现多任务调度,并在单线程中维持多个任务的切换

如何让正在运行的线程暂停一段时间?

我们可以使用 Thread 类的 Sleep() 方法让线程暂停一段时间。

需要注意的是,这并不会让线程终止,一旦从休眠中唤醒线程,线程的状态将会被改变为 Runnable,并且根据线程调度,它将得到执行。

什么是线程调度器(Thread Scheduler)和时间分片(Time Slicing)?

线程调度器是一个操作系统服务,它负责为 Runnable 状态的线程分配 CPU 时间。一旦我们创建一个线程并启动它,它的执行便依赖于线程调度器的实现。

时间分片是指将可用的 CPU 时间分配给可用的 Runnable 线程的过程。

分配 CPU 时间可以基于线程优先级或者线程等待的时间。线程调度并不受到 Java 虚拟机控制,所以由应用程序来控制它是更好的选择(也就是说不要让你的程序依赖于线程的优先级)。

在多线程中,什么是上下文切换(context-switching)?

上下文切换是存储和恢复 CPU 状态的过程,它使得线程执行能够从中断点恢复执行。上下文切换是多任务操作系统和多线程环境的基本特征。

如何确保线程安全?

  • 原子类(atomic concurrent classes)
  • volatile 关键字
  • 不变类和线程安全类

什么是死锁(Deadlock)?如何分析和避免死锁?

死锁是指两个以上的线程永远相互阻塞的情况,这种情况产生至少需要两个以上的线程和两个以上的资源。

分析死锁,我们需要查看 Java 应用程序的线程转储。我们需要找出那些状态为 BLOCKED 的线程和他们等待的资源。每个资源都有一个唯一的 id,用这个 id 我们可以找出哪些线程已经拥有了它的对象锁。

避免嵌套锁,只在需要的地方使用锁和避免无限期等待是避免死锁的通常办法。

线程基础

Java 线程生命周期中有哪些状态?各状态之间如何切换?

img

java.lang.Thread.State 中定义了 6 种不同的线程状态,在给定的一个时刻,线程只能处于其中的一个状态。

以下是各状态的说明,以及状态间的联系:

  • 开始(New) - 还没有调用 start() 方法的线程处于此状态。
  • 可运行(Runnable) - 已经调用了 start() 方法的线程状态。此状态意味着,线程已经准备好了,一旦被线程调度器分配了 CPU 时间片,就可以运行线程。
  • 阻塞(Blocked) - 阻塞状态。线程阻塞的线程状态等待监视器锁定。处于阻塞状态的线程正在等待监视器锁定,以便在调用 Object.wait() 之后输入同步块/方法或重新输入同步块/方法。
  • 等待(Waiting) - 等待状态。一个线程处于等待状态,是由于执行了 3 个方法中的任意方法:
    • Object.wait()
    • Thread.join()
    • LockSupport.park()
  • 定时等待(Timed waiting) - 等待指定时间的状态。一个线程处于定时等待状态,是由于执行了以下方法中的任意方法:
    • Thread.sleep(sleeptime)
    • Object.wait(timeout)
    • Thread.join(timeout)
    • LockSupport.parkNanos(timeout)
    • LockSupport.parkUntil(timeout)
  • 终止(Terminated) - 线程 run() 方法执行结束,或者因异常退出了 run() 方法,则该线程结束生命周期。死亡的线程不可再次复生。

👉 参考阅读:JavaThread Methods and Thread States
👉 参考阅读:Java 线程的 5 种状态及切换(透彻讲解)

创建线程有哪些方式?这些方法各自利弊是什么?

创建线程主要有三种方式:

1. 继承 Thread

  • 定义 Thread 类的子类,并重写该类的 run() 方法,该 run() 方法的方法体就代表了线程要完成的任务。因此把 run() 方法称为执行体。
  • 创建 Thread 子类的实例,即创建了线程对象。
  • 调用线程对象的 start() 方法来启动该线程。

2. 实现 Runnable 接口

  • 定义 Runnable 接口的实现类,并重写该接口的 run() 方法,该 run() 方法的方法体同样是该线程的线程执行体。
  • 创建 Runnable 实现类的实例,并以此实例作为 Thread 对象,该 Thread 对象才是真正的线程对象。
  • 调用线程对象的 start() 方法来启动该线程。

3. 通过 Callable 接口和 Future 接口

  • 创建 Callable 接口的实现类,并实现 call() 方法,该 call() 方法将作为线程执行体,并且有返回值。
  • 创建 Callable 实现类的实例,使用 FutureTask 类来包装 Callable 对象,该 FutureTask 对象封装了该 Callable 对象的 call() 方法的返回值。
  • 使用 FutureTask 对象作为 Thread 对象的 target 创建并启动新线程。
  • 调用 FutureTask 对象的 get() 方法来获得子线程执行结束后的返回值

三种创建线程方式对比

  • 实现 Runnable 接口优于继承 Thread 类,因为根据开放封闭原则——实现接口更便于扩展;
  • 实现 Runnable 接口的线程没有返回值;而使用 Callable / Future 方式可以让线程有返回值。

👉 参考阅读:Java 创建线程的三种方式及其对比

什么是 CallableFuture?什么是 FutureTask

什么是 CallableFuture

Java 5 在 concurrency 包中引入了 Callable 接口,它和 Runnable 接口很相似,但它可以返回一个对象或者抛出一个异常。

Callable 接口使用泛型去定义它的返回类型。Executors 类提供了一些有用的方法去在线程池中执行 Callable 内的任务。由于 Callable 任务是并行的,我们必须等待它返回的结果。Future 对象为我们解决了这个问题。在线程池提交 Callable 任务后返回了一个 Future 对象,使用它我们可以知道 Callable 任务的状态和得到 Callable 返回的执行结果。Future 提供了 get() 方法让我们可以等待 Callable 结束并获取它的执行结果。

什么是 FutureTask

FutureTaskFuture 的一个基础实现,我们可以将它同 Executors 使用处理异步任务。通常我们不需要使用 FutureTask 类,单当我们打算重写 Future 接口的一些方法并保持原来基础的实现是,它就变得非常有用。我们可以仅仅继承于它并重写我们需要的方法。阅读 Java FutureTask 例子,学习如何使用它。

👉 参考阅读:Java 并发编程:Callable、Future 和 FutureTask

start()run() 有什么区别?可以直接调用 Thread 类的 run() 方法么?

  • run() 方法是线程的执行体。
  • start() 方法负责启动线程,然后 JVM 会让这个线程去执行 run() 方法。

可以直接调用 Thread 类的 run() 方法么?

  • 可以。但是如果直接调用 Threadrun() 方法,它的行为就会和普通的方法一样。
  • 为了在新的线程中执行我们的代码,必须使用 start() 方法。

sleep()yield()join() 方法有什么区别?为什么 sleep()yield() 方法是静态(static)的?

yield()

  • yield() 方法可以让当前正在执行的线程暂停,但它不会阻塞该线程,它只是将该线程从 Running 状态转入 Runnable 状态。
  • 当某个线程调用了 yield() 方法暂停之后,只有优先级与当前线程相同,或者优先级比当前线程更高的处于就绪状态的线程才会获得执行的机会。

sleep()

  • sleep() 方法需要指定等待的时间,它可以让当前正在执行的线程在指定的时间内暂停执行,进入 Blocked 状态。
  • 该方法既可以让其他同优先级或者高优先级的线程得到执行的机会,也可以让低优先级的线程得到执行机会。
  • 但是,sleep() 方法不会释放“锁标志”,也就是说如果有 synchronized 同步块,其他线程仍然不能访问共享数据。

join()

  • join() 方法会使当前线程转入 Blocked 状态,等待调用 join() 方法的线程结束后才能继续执行。

为什么 sleep()yield() 方法是静态(static)的?

  • Thread 类的 sleep()yield() 方法将处理 Running 状态的线程。所以在其他处于非 Running 状态的线程上执行这两个方法是没有意义的。这就是为什么这些方法是静态的。它们可以在当前正在执行的线程中工作,并避免程序员错误的认为可以在其他非运行线程调用这些方法。

👉 参考阅读:Java 线程中 yield 与 join 方法的区别
👉 参考阅读:sleep(),wait(),yield()和 join()方法的区别

Java 的线程优先级如何控制?高优先级的 Java 线程一定先执行吗?

Java 中的线程优先级如何控制

  • Java 中的线程优先级的范围是 [1,10],一般来说,高优先级的线程在运行时会具有优先权。可以通过 thread.setPriority(Thread.MAX_PRIORITY) 的方式设置,默认优先级为 5

高优先级的 Java 线程一定先执行吗

  • 即使设置了线程的优先级,也无法保证高优先级的线程一定先执行
  • 原因:这是因为 Java 线程优先级依赖于操作系统的支持,然而,不同的操作系统支持的线程优先级并不相同,不能很好的和 Java 中线程优先级一一对应。
  • 结论:Java 线程优先级控制并不可靠。

什么是守护线程?为什么要用守护线程?如何创建守护线程?

什么是守护线程

  • 守护线程(Daemon Thread)是在后台执行并且不会阻止 JVM 终止的线程。
  • 与守护线程(Daemon Thread)相反的,叫用户线程(User Thread),也就是非守护线程。

为什么要用守护线程

  • 守护线程的优先级比较低,用于为系统中的其它对象和线程提供服务。典型的应用就是垃圾回收器。

如何创建守护线程

  • 使用 thread.setDaemon(true) 可以设置 thread 线程为守护线程。
  • 注意点:
    • 正在运行的用户线程无法设置为守护线程,所以 thread.setDaemon(true) 必须在 thread.start() 之前设置,否则会抛出 llegalThreadStateException 异常;
    • 一个守护线程创建的子线程依然是守护线程。
    • 不要认为所有的应用都可以分配给守护线程来进行服务,比如读写操作或者计算逻辑。

👉 参考阅读:Java 中守护线程的总结

线程间是如何通信的?

当线程间是可以共享资源时,线程间通信是协调它们的重要的手段。Object 类中 wait(), notify()notifyAll() 方法可以用于线程间通信关于资源的锁的状态。

👉 参考阅读:Java 并发编程:线程间协作的两种方式:wait、notify、notifyAll 和 Condition

为什么线程通信的方法 wait(), notify()notifyAll() 被定义在 Object 类里?

Java 的每个对象中都有一个锁(monitor,也可以成为监视器) 并且 wait()notify() 等方法用于等待对象的锁或者通知其他线程对象的监视器可用。在 Java 的线程中并没有可供任何对象使用的锁和同步器。这就是为什么这些方法是 Object 类的一部分,这样 Java 的每一个类都有用于线程间通信的基本方法

为什么 wait(), notify()notifyAll() 必须在同步方法或者同步块中被调用?

当一个线程需要调用对象的 wait() 方法的时候,这个线程必须拥有该对象的锁,接着它就会释放这个对象锁并进入等待状态直到其他线程调用这个对象上的 notify() 方法。同样的,当一个线程需要调用对象的 notify() 方法时,它会释放这个对象的锁,以便其他在等待的线程就可以得到这个对象锁。

由于所有的这些方法都需要线程持有对象的锁,这样就只能通过同步来实现,所以他们只能在同步方法或者同步块中被调用。

并发机制的底层实现

👉 参考阅读:Java 并发核心机制

⭐⭐⭐ synchronized

synchronized 有什么作用?

synchronized 的原理是什么?

同步方法和同步块,哪个更好?

JDK1.6 对synchronized 做了哪些优化?

使用 synchronized 修饰静态方法和非静态方法有什么区别?

作用

synchronized 可以保证在同一个时刻,只有一个线程可以执行某个方法或者某个代码块

synchronized 有 3 种应用方式:

  • 同步实例方法 - 对于普通同步方法,锁是当前实例对象
  • 同步静态方法 - 对于静态同步方法,锁是当前类的 Class 对象
  • 同步代码块 - 对于同步方法块,锁是 synchonized 括号里配置的对象

原理

synchronized 经过编译后,会在同步块的前后分别形成 monitorentermonitorexit 这两个字节码指令,这两个字节码指令都需要一个引用类型的参数来指明要锁定和解锁的对象。如果 synchronized 明确制定了对象参数,那就是这个对象的引用;如果没有明确指定,那就根据 synchronized 修饰的是实例方法还是静态方法,去对对应的对象实例或 Class 对象来作为锁对象。

synchronized 同步块对同一线程来说是可重入的,不会出现锁死问题。

synchronized 同步块是互斥的,即已进入的线程执行完成前,会阻塞其他试图进入的线程。

优化

Java 1.6 以后,synchronized 做了大量的优化,其性能已经与 LockReadWriteLock 基本上持平。

synchronized 的优化是将锁粒度分为不同级别,synchronized 会根据运行状态动态的由低到高调整锁级别(偏向锁 -> 轻量级锁 -> 重量级锁),以减少阻塞。

同步方法 or 同步块?

  • 同步块是更好的选择。
  • 因为它不会锁住整个对象(当然你也可以让它锁住整个对象)。同步方法会锁住整个对象,哪怕这个类中有多个不相关联的同步块,这通常会导致他们停止执行并需要等待获得这个对象上的锁。

volatile

volatile 有什么作用?

volatile 的原理是什么?

volatile 能代替锁吗?

volatilesynchronized 的区别?

volatile 无法替代 synchronized ,因为 volatile 无法保证操作的原子性

作用

volatile 关键字修饰的变量有两层含义:

  • 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
  • 禁止指令重排序

原理

观察加入 volatile 关键字和没有加入 volatile 关键字时所生成的汇编代码发现,加入 volatile 关键字时,会多出一个 lock 前缀指令

lock 前缀指令实际上相当于一个内存屏障(也成内存栅栏),内存屏障会提供 3 个功能:

  • 它确保指令重排序时不会把其后面的指令排到内存屏障之前的位置,也不会把前面的指令排到内存屏障的后面;即在执行到内存屏障这句指令时,在它前面的操作已经全部完成;
  • 它会强制将对缓存的修改操作立即写入主存;
  • 如果是写操作,它会导致其他 CPU 中对应的缓存行无效。

volatilesynchronized 的区别?

  • volatile 本质是在告诉 jvm 当前变量在寄存器(工作内存)中的值是不确定的,需要从主存中读取; synchronized 则是锁定当前变量,只有当前线程可以访问该变量,其他线程被阻塞住。
  • volatile 仅能使用在变量级别;synchronized 则可以使用在变量、方法、和类级别的。
  • volatile 仅能实现变量的修改可见性,不能保证原子性;而 synchronized 则可以保证变量的修改可见性和原子性
  • volatile 不会造成线程的阻塞;synchronized 可能会造成线程的阻塞。
  • volatile 标记的变量不会被编译器优化;synchronized 标记的变量可以被编译器优化。

⭐⭐ CAS

什么是 CAS?

CAS 有什么作用?

CAS 的原理是什么?

CAS 的三大问题?

作用

CAS(Compare and Swap),字面意思为比较并交换。CAS 有 3 个操作数,分别是:内存值 V,旧的预期值 A,要修改的新值 B。当且仅当预期值 A 和内存值 V 相同时,将内存值 V 修改为 B,否则什么都不做。

原理

Java 主要利用 Unsafe 这个类提供的 CAS 操作。Unsafe 的 CAS 依赖的是 JV M 针对不同的操作系统实现的 Atomic::cmpxchg 指令。

三大问题

  1. ABA 问题:因为 CAS 需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是 A,变成了 B,又变成了 A,那么使用 CAS 进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA 问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么 A-B-A 就会变成 1A-2B-3A。
  2. 循环时间长开销大。自旋 CAS 如果长时间不成功,会给 CPU 带来非常大的执行开销。如果 JVM 能支持处理器提供的 pause 指令那么效率会有一定的提升,pause 指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起 CPU 流水线被清空(CPU pipeline flush),从而提高 CPU 的执行效率。
  3. 只能保证一个共享变量的原子操作。当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量 i = 2,j=a,合并一下 ij=2a,然后用 CAS 来操作 ij。从 Java1.5 开始 JDK 提供了 AtomicReference 类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行 CAS 操作。

ThreadLocal

ThreadLocal 有什么作用?

ThreadLocal 的原理是什么?

如何解决 ThreadLocal 内存泄漏问题?

作用

ThreadLocal 是一个存储线程本地副本的工具类

原理

Thread 类中维护着一个 ThreadLocal.ThreadLocalMap 类型的成员 threadLocals。这个成员就是用来存储当前线程独占的变量副本。

ThreadLocalMapThreadLocal 的内部类,它维护着一个 Entry 数组, Entry 用于保存键值对,其 key 是 ThreadLocal 对象,value 是传递进来的对象(变量副本)。 Entry 继承了 WeakReference ,所以是弱引用。

内存泄漏问题

ThreadLocalMap 的 Entry 继承了 WeakReference,所以它的 key (ThreadLocal 对象)是弱引用,而 value (变量副本)是强引用。

  • 如果 ThreadLocal 对象没有外部强引用来引用它,那么 ThreadLocal 对象会在下次 GC 时被回收。
  • 此时,Entry 中的 key 已经被回收,但是 value 由于是强引用不会被垃圾收集器回收。如果创建 ThreadLocal 的线程一直持续运行,那么 value 就会一直得不到回收,产生内存泄露。

那么如何避免内存泄漏呢?方法就是:使用 ThreadLocalset 方法后,显示的调用 remove 方法

内存模型

什么是 Java 内存模型

  • Java 内存模型即 Java Memory Model,简称 JMM。JMM 定义了 JVM 在计算机内存(RAM)中的工作方式。JMM 是隶属于 JVM 的。
  • 并发编程领域两个关键问题:线程间通信和线程间同步
  • 线程间通信机制
    • 共享内存 - 线程间通过写-读内存中的公共状态来隐式进行通信。
    • 消息传递 - java 中典型的消息传递方式就是 wait()和 notify()。
  • 线程间同步机制
    • 在共享内存模型中,必须显示指定某个方法或某段代码在线程间互斥地执行。
    • 在消息传递模型中,由于发送消息必须在接收消息之前,因此同步是隐式进行的。
  • Java 的并发采用的是共享内存模型
  • JMM 决定一个线程对共享变量的写入何时对另一个线程可见。
  • 线程之间的共享变量存储在主内存(main memory)中,每个线程都有一个私有的本地内存(local memory),本地内存中存储了该线程以读/写共享变量的副本。
  • JMM 把内存分成了两部分:线程栈区和堆区
    • 线程栈
      • JVM 中运行的每个线程都拥有自己的线程栈,线程栈包含了当前线程执行的方法调用相关信息,我们也把它称作调用栈。随着代码的不断执行,调用栈会不断变化。
      • 线程栈还包含了当前方法的所有本地变量信息。线程中的本地变量对其它线程是不可见的。
    • 堆区
      • 堆区包含了 Java 应用创建的所有对象信息,不管对象是哪个线程创建的,其中的对象包括原始类型的封装类(如 Byte、Integer、Long 等等)。不管对象是属于一个成员变量还是方法中的本地变量,它都会被存储在堆区。
    • 一个本地变量如果是原始类型,那么它会被完全存储到栈区。
    • 一个本地变量也有可能是一个对象的引用,这种情况下,这个本地引用会被存储到栈中,但是对象本身仍然存储在堆区。
    • 对于一个对象的成员方法,这些方法中包含本地变量,仍需要存储在栈区,即使它们所属的对象在堆区。
    • 对于一个对象的成员变量,不管它是原始类型还是包装类型,都会被存储到堆区。

img

👉 参考阅读:全面理解 Java 内存模型

同步容器和并发容器

👉 参考阅读:Java 并发容器

⭐ 同步容器

什么是同步容器?

有哪些常见同步容器?

它们是如何实现线程安全的?

同步容器真的线程安全吗?

类型

VectorStackHashtable

作用/原理

同步容器的同步原理就是在方法上用 synchronized 修饰。 synchronized 可以保证在同一个时刻,只有一个线程可以执行某个方法或者某个代码块

synchronized 的互斥同步会产生阻塞和唤醒线程的开销。显然,这种方式比没有使用 synchronized 的容器性能要差。

线程安全

同步容器真的绝对安全吗?

其实也未必。在做复合操作(非原子操作)时,仍然需要加锁来保护。常见复合操作如下:

  • 迭代:反复访问元素,直到遍历完全部元素;
  • 跳转:根据指定顺序寻找当前元素的下一个(下 n 个)元素;
  • 条件运算:例如若没有则添加等;

⭐⭐⭐ ConcurrentHashMap

请描述 ConcurrentHashMap 的实现原理?

ConcurrentHashMap 为什么放弃了分段锁?

基础数据结构原理和 HashMap 一样,JDK 1.7 采用 数组+单链表;JDK 1.8 采用数组+单链表+红黑树。

并发安全特性的实现:

JDK 1.7:

  • 使用分段锁,设计思路是缩小锁粒度,提高并发吞吐。
  • 写数据时,会使用可重入锁去锁住分段(segment)。

JDK 1.8:

  • 取消分段锁,直接采用 transient volatile HashEntry<K,V>[] table 保存数据,采用 table 数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。
  • 写数据时,使用是 CAS + synchronized
    • 根据 key 计算出 hashcode 。
    • 判断是否需要进行初始化。
    • f 即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功。
    • 如果当前位置的 hashcode == MOVED == -1,则需要进行扩容。
    • 如果都不满足,则利用 synchronized 锁写入数据。
    • 如果数量大于 TREEIFY_THRESHOLD 则要转换为红黑树。

⭐⭐ CopyOnWriteArrayList

CopyOnWriteArrayList 的作用?

CopyOnWriteArrayList 的原理?

作用

CopyOnWrite 字面意思为写入时复制。CopyOnWriteArrayList 是线程安全的 ArrayList。

原理

  • CopyOnWriteAarrayList 中,读操作不同步,因为它们在内部数组的快照上工作,所以多个迭代器可以同时遍历而不会相互阻塞(1,2,4)。
  • 所有的写操作都是同步的。他们在备份数组(3)的副本上工作。写操作完成后,后备阵列将被替换为复制的阵列,并释放锁定。支持数组变得易变,所以替换数组的调用是原子(5)。
  • 写操作后创建的迭代器将能够看到修改的结构(6,7)。
  • 写时复制集合返回的迭代器不会抛出 ConcurrentModificationException,因为它们在数组的快照上工作,并且无论后续的修改(2,4)如何,都会像迭代器创建时那样完全返回元素。

并发锁

👉 参考阅读:Java 并发锁

⭐⭐ 锁类型

Java 中有哪些锁?

这些锁有什么特性?

可重入锁

  • ReentrantLockReentrantReadWriteLock 是可重入锁。这点,从其命名也不难看出。
  • synchronized 也是一个可重入锁

公平锁与非公平锁

  • synchronized 只支持非公平锁
  • ReentrantLockReentrantReadWriteLock,默认是非公平锁,但支持公平锁

独享锁与共享锁

  • synchronizedReentrantLock 只支持独享锁
  • ReentrantReadWriteLock 其写锁是独享锁,其读锁是共享锁。读锁是共享锁使得并发读是非常高效的,读写,写读 ,写写的过程是互斥的。

悲观锁与乐观锁

  • 悲观锁在 Java 中的应用就是通过使用 synchronizedLock 显示加锁来进行互斥同步,这是一种阻塞同步。

  • 乐观锁在 Java 中的应用就是采用 CAS 机制(CAS 操作通过 Unsafe 类提供,但这个类不直接暴露为 API,所以都是间接使用,如各种原子类)。

偏向锁、轻量级锁、重量级锁

Java 1.6 以前,重量级锁一般指的是 synchronized ,而轻量级锁指的是 volatile

Java 1.6 以后,针对 synchronized 做了大量优化,引入 4 种锁状态: 无锁状态、偏向锁、轻量级锁和重量级锁。锁可以单向的从偏向锁升级到轻量级锁,再从轻量级锁升级到重量级锁 。

分段锁

分段锁其实是一种锁的设计,并不是具体的一种锁。典型:JDK1.7 之前的 ConcurrentHashMap

显示锁和内置锁

  • 内置锁:synchronized
  • 显示锁:ReentrantLockReentrantReadWriteLock 等。

⭐⭐ AQS

什么是 AQS?

AQS 的作用是什么?

AQS 的原理?

作用

AbstractQueuedSynchronizer(简称 AQS)是队列同步器,顾名思义,其主要作用是处理同步。它是并发锁和很多同步工具类的实现基石(如 ReentrantLockReentrantReadWriteLockSemaphore 等)。

AQS 提供了对独享锁与共享锁的支持

原理

(1)数据结构

  • state - AQS 使用一个整型的 volatile 变量来 维护同步状态
    • 这个整数状态的意义由子类来赋予,如ReentrantLock 中该状态值表示所有者线程已经重复获取该锁的次数,Semaphore 中该状态值表示剩余的许可数量。
  • headtail - AQS 维护了一个 Node 类型(AQS 的内部类)的双链表来完成同步状态的管理。这个双链表是一个双向的 FIFO 队列,通过 headtail 指针进行访问。当 有线程获取锁失败后,就被添加到队列末尾

(2)获取独占锁

AQS 中使用 acquire(int arg) 方法获取独占锁,其大致流程如下:

  1. 先尝试获取同步状态,如果获取同步状态成功,则结束方法,直接返回。
  2. 如果获取同步状态不成功,AQS 会不断尝试利用 CAS 操作将当前线程插入等待同步队列的队尾,直到成功为止。
  3. 接着,不断尝试为等待队列中的线程节点获取独占锁。

(3)释放独占锁

AQS 中使用 release(int arg) 方法释放独占锁,其大致流程如下:

  1. 先尝试获取解锁线程的同步状态,如果获取同步状态不成功,则结束方法,直接返回。
  2. 如果获取同步状态成功,AQS 会尝试唤醒当前线程节点的后继节点。

(4)获取共享锁

AQS 中使用 acquireShared(int arg) 方法获取共享锁。

acquireShared 方法和 acquire 方法的逻辑很相似,区别仅在于自旋的条件以及节点出队的操作有所不同。

成功获得共享锁的条件如下:

  • tryAcquireShared(arg) 返回值大于等于 0 (这意味着共享锁的 permit 还没有用完)。
  • 当前节点的前驱节点是头结点。

(5)释放共享锁

AQS 中使用 releaseShared(int arg) 方法释放共享锁。

releaseShared 首先会尝试释放同步状态,如果成功,则解锁一个或多个后继线程节点。释放共享锁和释放独享锁流程大体相似,区别在于:

对于独享模式,如果需要 SIGNAL,释放仅相当于调用头节点的 unparkSuccessor

⭐⭐ ReentrantLock

什么是 ReentrantLock?

什么是可重入锁?

ReentrantLock 有什么用?

ReentrantLock 原理?

作用

ReentrantLock 提供了一组无条件的、可轮询的、定时的以及可中断的锁操作

ReentrantLock 的特性如下:

  • ReentrantLock 提供了与 synchronized 相同的互斥性、内存可见性和可重入性
  • ReentrantLock 支持公平锁和非公平锁(默认)两种模式。
  • ReentrantLock 实现了 Lock 接口,支持了 synchronized 所不具备的灵活性
    • synchronized 无法中断一个正在等待获取锁的线程
    • synchronized 无法在请求获取一个锁时无休止地等待

原理

ReentrantLock 基于其内部类 ReentrantLock.Sync 实现,Sync 继承自 AQS。它有两个子类:

  • ReentrantLock.FairSync - 公平锁。
  • ReentrantLock.NonfairSync - 非公平锁。

本质上,就是基于 AQS 实现。

⭐ ReentrantReadWriteLock

ReentrantReadWriteLock 是什么?

ReentrantReadWriteLock 的作用?

ReentrantReadWriteLock 的原理?

作用

ReentrantReadWriteLock 是一个可重入的读写锁。**ReentrantReadWriteLock 维护了一对读写锁,将读写锁分开,有利于提高并发效率**。

原理

ReentrantReadWriteLock 本质上也是基于 AQS 实现。有三个核心字段:

  • sync - 内部类 ReentrantReadWriteLock.Sync 对象。与 ReentrantLock 类似,它有两个子类:ReentrantReadWriteLock.FairSyncReentrantReadWriteLock.NonfairSync ,分别表示公平锁和非公平锁的实现。
  • readerLock - 内部类 ReentrantReadWriteLock.ReadLock 对象,这是一把读锁。
  • writerLock - 内部类 ReentrantReadWriteLock.WriteLock 对象,这是一把写锁。

⭐ Condition

Condition 有什么用?

使用 Lock 的线程,彼此如何通信?

作用

可以理解为,什么样的锁配什么样的钥匙。

内置锁(synchronized)配合内置条件队列(waitnotifynotifyAll ),显式锁(Lock)配合显式条件队列(Condition

⭐⭐ 死锁

如何避免死锁?

  • 避免一个线程同时获取多个锁
  • 避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占用一个资源
  • 尝试使用定时锁 lock.tryLock(timeout),避免锁一直不能释放
  • 对于数据库锁,加锁和解锁必须在一个数据库连接中里,否则会出现解锁失败的情况。

原子变量类

👉 参考阅读:Java 原子类

⭐ 原子类简介

为什么要用原子类?

用过哪些原子类?

作用

常规的锁(Locksychronized)由于是阻塞式的,势必影响并发吞吐量。

volatile 号称轻量级的锁,但不能保证原子性。

为了兼顾原子性和锁的性能问题,所以引入了原子类。

类型

原子变量类可以分为 4 组:

  • 基本类型
    • AtomicBoolean - 布尔类型原子类
    • AtomicInteger - 整型原子类
    • AtomicLong - 长整型原子类
  • 引用类型
    • AtomicReference - 引用类型原子类
    • AtomicMarkableReference - 带有标记位的引用类型原子类
    • AtomicStampedReference - 带有版本号的引用类型原子类
  • 数组类型
    • AtomicIntegerArray - 整形数组原子类
    • AtomicLongArray - 长整型数组原子类
    • AtomicReferenceArray - 引用类型数组原子类
  • 属性更新器类型
    • AtomicIntegerFieldUpdater - 整型字段的原子更新器。
    • AtomicLongFieldUpdater - 长整型字段的原子更新器。
    • AtomicReferenceFieldUpdater - 原子更新引用类型里的字段。

⭐ 原子类的原理

  1. 处理器实现原子操作:使用总线锁保证原子性,使用缓存锁保证原子性(修改内存地址,缓存一致性机制:阻止同时修改由 2 个以上的处理器缓存的内存区域数据)
  2. JAVA 实现原子操作:循环使用 CAS (自旋 CAS)实现原子操作

并发工具类

👉 参考阅读:Java 并发工具类

⭐ CountDownLatch

CountDownLatch 作用?

CountDownLatch 原理?

作用

字面意思为 递减计数锁。用于控制一个或者多个线程等待多个线程。

CountDownLatch 维护一个计数器 count,表示需要等待的事件数量。countDown 方法递减计数器,表示有一个事件已经发生。调用 await 方法的线程会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。

img

原理

CountDownLatch 是基于 AQS(AbstractQueuedSynchronizer) 实现的。

⭐ CyclicBarrier

CyclicBarrier 有什么用?

CyclicBarrier 的原理是什么?

CyclicBarrier 和 CountDownLatch 有什么区别?

作用

字面意思是 循环栅栏。**CyclicBarrier 可以让一组线程等待至某个状态(遵循字面意思,不妨称这个状态为栅栏)之后再全部同时执行**。之所以叫循环栅栏是因为:当所有等待线程都被释放以后,CyclicBarrier 可以被重用。

CyclicBarrier 维护一个计数器 count。每次执行 await 方法之后,count 加 1,直到计数器的值和设置的值相等,等待的所有线程才会继续执行。

img

原理

CyclicBarrier 是基于 ReentrantLockCondition 实现的。

区别

CyclicBarrierCountDownLatch 都可以用来让一组线程等待其它线程。与 CyclicBarrier 不同的是,CountdownLatch 不能重用。

⭐ Semaphore

Semaphore 作用?

作用

字面意思为 信号量Semaphore 用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。

Semaphore 管理着一组虚拟的许可(permit),permit 的初始数量可通过构造方法来指定。每次执行 acquire 方法可以获取一个 permit,如果没有就等待;而 release 方法可以释放一个 permit。

  • Semaphore 可以用于实现资源池,如数据库连接池。
  • Semaphore 可以用于将任何一种容器变成有界阻塞容器。

img

线程池

👉 参考阅读:Java 线程池

⭐⭐ ThreadPoolExecutor

ThreadPoolExecutor 有哪些参数,各自有什么用?

ThreadPoolExecutor 工作原理?

原理

img

参数

java.uitl.concurrent.ThreadPoolExecutor 类是 Executor 框架中最核心的一个类。

ThreadPoolExecutor 有四个构造方法,前三个都是基于第四个实现。第四个构造方法定义如下:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {

参数说明:

  • corePoolSize - 核心线程数量。当有新任务通过 execute 方法提交时 ,线程池会执行以下判断:
    • 如果运行的线程数少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的。
    • 如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当 workQueue 满时才创建新的线程去处理任务;
    • 如果设置的 corePoolSizemaximumPoolSize 相同,则创建的线程池的大小是固定的。这时如果有新任务提交,若 workQueue 未满,则将请求放入 workQueue 中,等待有空闲的线程去从 workQueue 中取任务并处理;
    • 如果运行的线程数量大于等于 maximumPoolSize,这时如果 workQueue 已经满了,则使用 handler 所指定的策略来处理任务;
    • 所以,任务提交时,判断的顺序为 corePoolSize => workQueue => maximumPoolSize
  • maximumPoolSize - 最大线程数量
    • 如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。
    • 值得注意的是:如果使用了无界的任务队列这个参数就没什么效果。
  • keepAliveTime线程保持活动的时间
    • 当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime
    • 所以,如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
  • unit - keepAliveTime 的时间单位。有 7 种取值。可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
  • workQueue - 等待执行的任务队列。用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。
    • ArrayBlockingQueue - 有界阻塞队列
      • 此队列是基于数组的先进先出队列(FIFO)
      • 此队列创建时必须指定大小。
    • LinkedBlockingQueue - 无界阻塞队列
      • 此队列是基于链表的先进先出队列(FIFO)
      • 如果创建时没有指定此队列大小,则默认为 Integer.MAX_VALUE
      • 吞吐量通常要高于 ArrayBlockingQueue
      • 使用 LinkedBlockingQueue 意味着: maximumPoolSize 将不起作用,线程池能创建的最大线程数为 corePoolSize,因为任务等待队列是无界队列。
      • Executors.newFixedThreadPool 使用了这个队列。
    • SynchronousQueue - 不会保存提交的任务,而是将直接新建一个线程来执行新来的任务
      • 每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。
      • 吞吐量通常要高于 LinkedBlockingQueue
      • Executors.newCachedThreadPool 使用了这个队列。
    • PriorityBlockingQueue - 具有优先级的无界阻塞队列
  • threadFactory - 线程工厂。可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
  • handler - 饱和策略。它是 RejectedExecutionHandler 类型的变量。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。线程池支持以下策略:
    • AbortPolicy - 丢弃任务并抛出异常。这也是默认策略。
    • DiscardPolicy - 丢弃任务,但不抛出异常。
    • DiscardOldestPolicy - 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
    • CallerRunsPolicy - 只用调用者所在的线程来运行任务。
    • 如果以上策略都不能满足需要,也可以通过实现 RejectedExecutionHandler 接口来定制处理策略。如记录日志或持久化不能处理的任务。

⭐ Executors

Executors 提供了哪些内置的线程池?

这些线程池各自有什么特性?适合用于什么场景?

Executors 为 Executor,ExecutorService,ScheduledExecutorService,ThreadFactory 和 Callable 类提供了一些工具方法。

(1)newSingleThreadExecutor

创建一个单线程的线程池

只会创建唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。 如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它

单工作线程最大的特点是:可保证顺序地执行各个任务

(2)newFixedThreadPool

创建一个固定大小的线程池

每次提交一个任务就会新创建一个工作线程,如果工作线程数量达到线程池最大线程数,则将提交的任务存入到阻塞队列中

FixedThreadPool 是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。

(3)newCachedThreadPool

创建一个可缓存的线程池

  • 如果线程池大小超过处理任务所需要的线程数,就会回收部分空闲的线程;
  • 如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为 1 分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
  • 此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说 JVM)能够创建的最大线程大小。 因此,使用 CachedThreadPool 时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。

(4)newScheduleThreadPool

创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

JVM

内存管理

OOM

参考资料

Mysql 优化

慢查询

慢查询日志可以帮我们找到执行慢的 SQL。

可以通过以下命令查看慢查询日志是否开启:

1
2
3
4
5
6
7
mysql> show variables like '%slow_query_log';
+----------------+-------+
| Variable_name | Value |
+----------------+-------+
| slow_query_log | ON |
+----------------+-------+
1 row in set (0.02 sec)

启停慢查询日志开关:

1
2
3
4
5
# 开启慢查询日志
mysql > set global slow_query_log='ON';

# 关闭慢查询日志
mysql > set global slow_query_log='OFF';

查看慢查询的时间阈值:

1
2
3
4
5
6
7
mysql> show variables like '%long_query_time%';
+-----------------+-----------+
| Variable_name | Value |
+-----------------+-----------+
| long_query_time | 10.000000 |
+-----------------+-----------+
1 row in set (0.02 sec)

设置慢查询的时间阈值:

1
mysql > set global long_query_time = 3;

MySQL 自带了一个 mysqldumpslow 工具,用于统计慢查询日志(这个工具是个 Perl 脚本,需要先安装好 Perl)。

mysqldumpslow 命令的具体参数如下:

  • -s - 采用 order 排序的方式,排序方式可以有以下几种。分别是 c(访问次数)、t(查询时间)、l(锁定时间)、r(返回记录)、ac(平均查询次数)、al(平均锁定时间)、ar(平均返回记录数)和 at(平均查询时间)。其中 at 为默认排序方式。
  • -t - 返回前 N 条数据 。
  • -g - 后面可以是正则表达式,对大小写不敏感。

比如想要按照查询时间排序,查看前两条 SQL 语句,可以执行如下命令:

1
perl mysqldumpslow.pl -s t -t 2 "C:\ProgramData\MySQL\MySQL Server 8.0\Data\slow.log"

执行计划(EXPLAIN)

“执行计划”是对 SQL 查询语句在数据库中执行过程的描述。 如果要分析某条 SQL 的性能问题,通常需要先查看 SQL 的执行计划,排查每一步 SQL 执行是否存在问题。

很多数据库都支持执行计划,Mysql 也不例外。在 Mysql 中,用户可以通过 EXPLAIN 命令查看优化器针对指定 SQL 生成的逻辑执行计划。

【示例】Mysql 执行计划示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
mysql> explain select * from user_info where id = 2\G
*************************** 1. row ***************************
id: 1
select_type: SIMPLE
table: user_info
partitions: NULL
type: const
possible_keys: PRIMARY
key: PRIMARY
key_len: 8
ref: const
rows: 1
filtered: 100.00
Extra: NULL
1 row in set, 1 warning (0.00 sec)

执行计划返回结果参数说明:

  • id - SELECT 查询的标识符。每个 SELECT 都会自动分配一个唯一的标识符。
  • select_type - SELECT 查询的类型。
    • SIMPLE - 表示此查询不包含 UNION 查询或子查询。
    • PRIMARY - 表示此查询是最外层的查询。
    • UNION - 表示此查询是 UNION 的第二或随后的查询。
    • DEPENDENT UNION - UNION 中的第二个或后面的查询语句, 取决于外面的查询。
    • UNION RESULT - UNION 的结果。
    • SUBQUERY - 子查询中的第一个 SELECT
    • DEPENDENT SUBQUERY - 子查询中的第一个 SELECT, 取决于外面的查询. 即子查询依赖于外层查询的结果。
  • table - 查询的是哪个表,如果给表起别名了,则显示别名。
  • partitions - 匹配的分区。
  • type - 表示从表中查询到行所执行的方式,查询方式是 SQL 优化中一个很重要的指标,执行效率由高到低依次为:
    • system/const - 表中只有一行数据匹配。此时根据索引查询一次就能找到对应的数据。如果是 B+ 树索引,我们知道此时索引构造成了多个层级的树,当查询的索引在树的底层时,查询效率就越低。const 表示此时索引在第一层,只需访问一层便能得到数据。
    • eq_ref - 使用唯一索引扫描。常见于多表连接中使用主键和唯一索引作为关联条件。
    • ref - 非唯一索引扫描。还可见于唯一索引最左原则匹配扫描。
    • range - 索引范围扫描。比如 <>between 等操作。
    • index - 索引全表扫描。此时遍历整个索引树。
    • ALL - 表示全表扫描。需要遍历全表来找到对应的行。
  • possible_keys - 此次查询中可能选用的索引。
  • key - 此次查询中实际使用的索引。如果这一项为 NULL,说明没有使用索引。
  • ref - 哪个字段或常数与 key 一起被使用。
  • rows - 显示此查询一共扫描了多少行,这个是一个估计值。
  • filtered - 表示此查询条件所过滤的数据的百分比。
  • extra - 额外的信息。
    • Using filesort - 当查询语句中包含 GROUP BY 操作,而且无法利用索引完成排序操作的时候, 这时不得不选择相应的排序算法进行,甚至可能会通过文件排序,效率是很低的,所以要避免这种问题的出现。
    • Using temporary - 使了用临时表保存中间结果,MySQL 在对查询结果排序时使用临时表,常见于排序 ORDER BY 和分组查询 GROUP BY。效率低,要避免这种问题的出现。
    • Using index - 所需数据只需在索引即可全部获得,不须要再到表中取数据,也就是使用了覆盖索引,避免了回表操作,效率不错。

更多内容请参考:MySQL 性能优化神器 Explain 使用分析

optimizer trace

在 MySQL 5.6 及之后的版本中,我们可以使用 optimizer trace 功能查看优化器生成执行计划的整个过程。有了这个功能,我们不仅可以了解优化器的选择过程,更可以了解每一个执行环节的成本,然后依靠这些信息进一步优化查询。

如下代码所示,打开 optimizer_trace 后,再执行 SQL 就可以查询 information_schema.OPTIMIZER_TRACE 表查看执行计划了,最后可以关闭 optimizer_trace 功能:

1
2
3
4
SET optimizer_trace="enabled=on";
SELECT * FROM person WHERE NAME >'name84059' AND create_time>'2020-01-24 05:00
SELECT * FROM information_schema.OPTIMIZER_TRACE;
SET optimizer_trace="enabled=off";

SQL 优化

SQL 优化基本思路

使用 EXPLAIN 命令查看当前 SQL 是否使用了索引,优化后,再通过执行计划(EXPLAIN)来查看优化效果。

SQL 优化的基本思路:

  • 只返回必要的列 - 最好不要使用 SELECT * 语句。

  • 只返回必要的行 - 使用 WHERE 子查询语句进行过滤查询,有时候也需要使用 LIMIT 语句来限制返回的数据。

  • 缓存重复查询的数据 - 应该考虑在客户端使用缓存,尽量不要使用 Mysql 服务器缓存(存在较多问题和限制)。

  • 使用索引覆盖查询

优化分页

当需要分页操作时,通常会使用 LIMIT 加上偏移量的办法实现,同时加上合适的 ORDER BY 字句。如果有对应的索引,通常效率会不错,否则,MySQL 需要做大量的文件排序操作

一个常见的问题是当偏移量非常大的时候,比如:LIMIT 1000000 20 这样的查询,MySQL 需要查询 1000020 条记录然后只返回 20 条记录,前面的 1000000 条都将被抛弃,这样的代价非常高。

针对分页优化,有以下两种方案

(1)方案 - 延迟关联

优化这种查询一个最简单的办法就是尽可能的使用覆盖索引扫描,而不是查询所有的列。然后根据需要做一次关联查询再返回所有的列。对于偏移量很大时,这样做的效率会提升非常大。考虑下面的查询:

1
SELECT film_id,description FROM film ORDER BY title LIMIT 1000000,5;

如果这张表非常大,那么这个查询最好改成下面的样子:

1
2
3
4
SELECT film.film_id,film.description
FROM film INNER JOIN (
SELECT film_id FROM film ORDER BY title LIMIT 50,5
) AS tmp USING(film_id);

这里的延迟关联将大大提升查询效率,让 MySQL 扫描尽可能少的页面,获取需要访问的记录后在根据关联列回原表查询所需要的列。

(2)方案 - 书签方式

有时候如果可以使用书签记录上次取数据的位置,那么下次就可以直接从该书签记录的位置开始扫描,这样就可以避免使用 OFFSET,比如下面的查询:

1
2
3
4
-- 原语句
SELECT id FROM t LIMIT 1000000, 10;
-- 优化语句
SELECT id FROM t WHERE id > 1000000 LIMIT 10;

其他优化的办法还包括使用预先计算的汇总表,或者关联到一个冗余表,冗余表中只包含主键列和需要做排序的列。

优化 JOIN

优化子查询

尽量使用 JOIN 语句来替代子查询。因为子查询是嵌套查询,而嵌套查询会新创建一张临时表,而临时表的创建与销毁会占用一定的系统资源以及花费一定的时间,同时对于返回结果集比较大的子查询,其对查询性能的影响更大。

小表驱动大表

JOIN 查询时,应该用小表驱动大表。因为 JOIN 时,MySQL 内部会先遍历驱动表,再去遍历被驱动表。

比如 left join,左表就是驱动表,A 表小于 B 表,建立连接的次数就少,查询速度就被加快了。

1
select name from A left join B ;

适当冗余字段

增加冗余字段可以减少大量的连表查询,因为多张表的连表查询性能很低,所有可以适当的增加冗余字段,以减少多张表的关联查询,这是以空间换时间的优化策略

避免 JOIN 太多表

《阿里巴巴 Java 开发手册》规定不要 join 超过三张表,第一 join 太多降低查询的速度,第二 join 的 buffer 会占用更多的内存。

如果不可避免要 join 多张表,可以考虑使用数据异构的方式异构到 ES 中查询。

优化 UNION

MySQL 执行 UNION 的策略是:先创建临时表,然后将各个查询结果填充到临时表中,最后再进行查询。很多优化策略在 UNION 查询中都会失效,因为它无法利用索引。

最好将 WHERELIMIT 等子句下推到 UNION 的各个子查询中,以便优化器可以充分利用这些条件进行优化。

此外,尽量使用 UNION ALL,避免使用 UNION

UNIONUNION ALL 都是将两个结果集合并为一个,两个要联合的 SQL 语句字段个数必须一样,而且字段类型要“相容”(一致)

  • UNION 需要进行去重扫描,因此消息较低;而 UNION ALL 不会进行去重。
  • UNION 会按照字段的顺序进行排序;而 UNION ALL 只是简单的将两个结果合并就返回。

优化 COUNT() 查询

COUNT() 有两种作用:

  • 统计某个列值的数量。统计列值时,要求列值是非 NULL 的,它不会统计 NULL
  • 统计行数。

统计列值时,要求列值是非空的,它不会统计 NULL。如果确认括号中的表达式不可能为空时,实际上就是在统计行数。最简单的就是当使用 COUNT(*) 时,并不是我们所想象的那样扩展成所有的列,实际上,它会忽略所有的列而直接统计行数。

我们最常见的误解也就在这儿,在括号内指定了一列却希望统计结果是行数,而且还常常误以为前者的性能会更好。但实际并非这样,如果要统计行数,直接使用 COUNT(*),意义清晰,且性能更好。

(1)简单优化

1
2
3
4
SELECT count(*) FROM world.city WHERE id > 5;

SELECT (SELECT count(*) FROM world.city) - count(*)
FROM world.city WHERE id <= 5;

(2)使用近似值

有时候某些业务场景并不需要完全精确的统计值,可以用近似值来代替,EXPLAIN 出来的行数就是一个不错的近似值,而且执行 EXPLAIN 并不需要真正地去执行查询,所以成本非常低。通常来说,执行 COUNT() 都需要扫描大量的行才能获取到精确的数据,因此很难优化,MySQL 层面还能做得也就只有覆盖索引了。如果不还能解决问题,只有从架构层面解决了,比如添加汇总表,或者使用 Redis 这样的外部缓存系统。

优化查询方式

切分大查询

一个大查询如果一次性执行的话,可能一次锁住很多数据、占满整个事务日志、耗尽系统资源、阻塞很多小的但重要的查询。

1
DELEFT FROM messages WHERE create < DATE_SUB(NOW(), INTERVAL 3 MONTH);
1
2
3
4
5
rows_affected = 0
do {
rows_affected = do_query(
"DELETE FROM messages WHERE create < DATE_SUB(NOW(), INTERVAL 3 MONTH) LIMIT 10000")
} while rows_affected > 0

分解大连接查询

将一个大连接查询(JOIN)分解成对每一个表进行一次单表查询,然后将结果在应用程序中进行关联,这样做的好处有:

  • 让缓存更高效。对于连接查询,如果其中一个表发生变化,那么整个查询缓存就无法使用。而分解后的多个查询,即使其中一个表发生变化,对其它表的查询缓存依然可以使用。
  • 分解成多个单表查询,这些单表查询的缓存结果更可能被其它查询使用到,从而减少冗余记录的查询。
  • 减少锁竞争;
  • 在应用层进行连接,可以更容易对数据库进行拆分,从而更容易做到高性能和可扩展。
  • 查询本身效率也可能会有所提升。例如下面的例子中,使用 IN() 代替连接查询,可以让 MySQL 按照 ID 顺序进行查询,这可能比随机的连接要更高效。
1
2
3
4
SELECT * FROM tag
JOIN tag_post ON tag_post.tag_id=tag.id
JOIN post ON tag_post.post_id=post.id
WHERE tag.tag='mysql';
1
2
3
SELECT * FROM tag WHERE tag='mysql';
SELECT * FROM tag_post WHERE tag_id=1234;
SELECT * FROM post WHERE post.id IN (123,456,567,9098,8904);

索引优化

通过索引覆盖查询,可以优化排序、分组。

详情见 Mysql 索引

数据结构优化

良好的逻辑设计和物理设计是高性能的基石。

数据类型优化

数据类型优化基本原则

  • 更小的通常更好 - 越小的数据类型通常会更快,占用更少的磁盘、内存,处理时需要的 CPU 周期也更少。
    • 例如:整型比字符类型操作代价低,因而会使用整型来存储 IP 地址,使用 DATETIME 来存储时间,而不是使用字符串。
  • 简单就好 - 如整型比字符型操作代价低。
    • 例如:很多软件会用整型来存储 IP 地址。
    • 例如:**UNSIGNED 表示不允许负值,大致可以使正数的上限提高一倍**。
  • 尽量避免 NULL - 可为 NULL 的列会使得索引、索引统计和值比较都更复杂。

类型的选择

  • 整数类型通常是标识列最好的选择,因为它们很快并且可以使用 AUTO_INCREMENT

  • ENUMSET 类型通常是一个糟糕的选择,应尽量避免。

  • 应该尽量避免用字符串类型作为标识列,因为它们很消耗空间,并且通常比数字类型慢。对于 MD5SHAUUID 这类随机字符串,由于比较随机,所以可能分布在很大的空间内,导致 INSERT 以及一些 SELECT 语句变得很慢。

    • 如果存储 UUID ,应该移除 - 符号;更好的做法是,用 UNHEX() 函数转换 UUID 值为 16 字节的数字,并存储在一个 BINARY(16) 的列中,检索时,可以通过 HEX() 函数来格式化为 16 进制格式。

表设计

应该避免的设计问题:

  • 太多的列 - 设计者为了图方便,将大量冗余列加入表中,实际查询中,表中很多列是用不到的。这种宽表模式设计,会造成不小的性能代价,尤其是 ALTER TABLE 非常耗时。
  • 太多的关联 - 所谓的实体 - 属性 - 值(EAV)设计模式是一个常见的糟糕设计模式。Mysql 限制了每个关联操作最多只能有 61 张表,但 EAV 模式需要许多自关联。
  • 枚举 - 尽量不要用枚举,因为添加和删除字符串(枚举选项)必须使用 ALTER TABLE
  • 尽量避免 NULL

范式和反范式

范式化目标是尽量减少冗余,而反范式化则相反

范式化的优点:

  • 比反范式更节省空间
  • 更新操作比反范式快
  • 更少需要 DISTINCTGROUP BY 语句

范式化的缺点:

  • 通常需要关联查询。而关联查询代价较高,如果是分表的关联查询,代价更是高昂。

在真实世界中,很少会极端地使用范式化或反范式化。实际上,应该权衡范式和反范式的利弊,混合使用。

索引优化

索引优化应该是查询性能优化的最有效手段。

如果想详细了解索引特性请参考:Mysql 索引

何时使用索引

  • 对于非常小的表,大部分情况下简单的全表扫描更高效。
  • 对于中、大型表,索引非常有效。
  • 对于特大型表,建立和使用索引的代价将随之增长。可以考虑使用分区技术。
  • 如果表的数量特别多,可以建立一个元数据信息表,用来查询需要用到的某些特性。

索引优化策略

  • 索引基本原则
    • 索引不是越多越好,不要为所有列都创建索引。
    • 要尽量避免冗余和重复索引。
    • 要考虑删除未使用的索引。
    • 尽量的扩展索引,不要新建索引。
    • 频繁作为 WHERE 过滤条件的列应该考虑添加索引。
  • 独立的列 - “独立的列” 是指索引列不能是表达式的一部分,也不能是函数的参数。
  • 前缀索引 - 索引很长的字符列,可以索引开始的部分字符,这样可以大大节约索引空间。
  • 最左匹配原则 - 将选择性高的列或基数大的列优先排在多列索引最前列。
  • 使用索引来排序 - 索引最好既满足排序,又用于查找行。这样,就可以使用索引来对结果排序。
  • =IN 可以乱序 - 不需要考虑 =IN 等的顺序
  • 覆盖索引
  • 自增字段作主键

数据模型和业务

  • 表字段比较复杂、易变动、结构难以统一的情况下,可以考虑使用 Nosql 来代替关系数据库表存储,如 ElasticSearch、MongoDB。
  • 在高并发情况下的查询操作,可以使用缓存(如 Redis)代替数据库操作,提高并发性能。
  • 数据量增长较快的表,需要考虑水平分表或分库,避免单表操作的性能瓶颈。
  • 除此之外,我们应该通过一些优化,尽量避免比较复杂的 JOIN 查询操作,例如冗余一些字段,减少 JOIN 查询;创建一些中间表,减少 JOIN 查询。

参考资料

Mysql 事务

不是所有的 Mysql 存储引擎都实现了事务处理。支持事务的存储引擎有:InnoDBNDB Cluster。不支持事务的存储引擎,代表有:MyISAM

用户可以根据业务是否需要事务处理(事务处理可以保证数据安全,但会增加系统开销),选择合适的存储引擎。

事务简介

事务概念

“事务”指的是满足 ACID 特性的一组操作。事务内的 SQL 语句,要么全执行成功,要么全执行失败。可以通过 Commit 提交一个事务,也可以使用 Rollback 进行回滚。

ACID

ACID 是数据库事务正确执行的四个基本要素。

  • 原子性(Atomicity)
    • 事务被视为不可分割的最小单元,事务中的所有操作要么全部提交成功,要么全部失败回滚。
    • 回滚可以用日志来实现,日志记录着事务所执行的修改操作,在回滚时反向执行这些修改操作即可。
  • 一致性(Consistency)
    • 数据库在事务执行前后都保持一致性状态。
    • 在一致性状态下,所有事务对一个数据的读取结果都是相同的。
  • 隔离性(Isolation)
    • 一个事务所做的修改在最终提交以前,对其它事务是不可见的。
  • 持久性(Durability)
    • 一旦事务提交,则其所做的修改将会永远保存到数据库中。即使系统发生崩溃,事务执行的结果也不能丢失。
    • 可以通过数据库备份和恢复来实现,在系统发生奔溃时,使用备份的数据库进行数据恢复。

一个支持事务(Transaction)中的数据库系统,必需要具有这四种特性,否则在事务过程(Transaction processing)当中无法保证数据的正确性。

  • 只有满足一致性,事务的执行结果才是正确的。
  • 在无并发的情况下,事务串行执行,隔离性一定能够满足。此时只要能满足原子性,就一定能满足一致性。
  • 在并发的情况下,多个事务并行执行,事务不仅要满足原子性,还需要满足隔离性,才能满足一致性。
  • 事务满足持久化是为了能应对系统崩溃的情况。

事务操作

事务相关的语句如下:

  • BEGIN / START TRANSACTION - 用于标记事务的起始点
  • START TRANSACTION WITH CONSISTENT SNAPSHOT - 用于标记事务的起始点
  • SAVEPOINT - 用于创建保存点。方便后续针对保存点进行回滚。一个事务中可以存在多个保存点。
  • RELEASE SAVEPOINT - 删除某个保存点。
  • ROLLBACK TO - 用于回滚到指定的保存点。如果没有设置保存点,则回退到 START TRANSACTION 语句处。
  • COMMIT - 提交事务
  • SET TRANSACTION - 设置事务的隔离级别。

注意:

两种开启事务的命令,启动时机是不同的:

  • 执行了 BEGIN / START TRANSACTION 命令后,并不代表事务立刻启动,而是当执行了增删查操作时,才真正启动事务。
  • 执行了 START TRANSACTION WITH CONSISTENT SNAPSHOT 命令,会立刻启动事务。

事务处理示例:

(1)创建一张示例表

1
2
3
4
5
6
7
8
9
10
-- 撤销表 user
DROP TABLE IF EXISTS `user`;

-- 创建表 user
CREATE TABLE `user` (
`id` INT(10) UNSIGNED NOT NULL COMMENT 'Id',
`username` VARCHAR(64) NOT NULL DEFAULT 'default' COMMENT '用户名',
`password` VARCHAR(64) NOT NULL DEFAULT 'default' COMMENT '密码',
`email` VARCHAR(64) NOT NULL DEFAULT 'default' COMMENT '邮箱'
) COMMENT ='用户表';

(2)执行事务操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
-- 开始事务
START TRANSACTION;

-- 插入操作 A
INSERT INTO `user`
VALUES (1, 'root1', 'root1', 'xxxx@163.com');

-- 创建保留点 updateA
SAVEPOINT `updateA`;

-- 插入操作 B
INSERT INTO `user`
VALUES (2, 'root2', 'root2', 'xxxx@163.com');

-- 回滚到保留点 updateA
ROLLBACK TO `updateA`;

-- 提交事务,只有操作 A 生效
COMMIT;

(3)查询结果

1
SELECT * FROM `user`;

结果:

1
2
3
4
5
6
7
mysql> SELECT * FROM user;
+----+----------+----------+--------------+
| id | username | password | email |
+----+----------+----------+--------------+
| 1 | root1 | root1 | xxxx@163.com |
+----+----------+----------+--------------+
1 row in set (0.02 sec)

AUTOCOMMIT

MySQL 默认采用隐式提交策略(autocommit。每执行一条语句就把这条语句当成一个事务然后进行提交。当出现 START TRANSACTION 语句时,会关闭隐式提交;当 COMMITROLLBACK 语句执行后,事务会自动关闭,重新恢复隐式提交。

通过 set autocommit=0 可以取消自动提交,直到 set autocommit=1 才会提交;autocommit 标记是针对每个连接而不是针对服务器的。

1
2
3
4
5
6
7
8
-- 查看 AUTOCOMMIT
SHOW VARIABLES LIKE 'AUTOCOMMIT';

-- 关闭 AUTOCOMMIT
SET autocommit = 0;

-- 开启 AUTOCOMMIT
SET autocommit = 1;

并发一致性问题

在并发环境下,事务的隔离性很难保证,因此会出现很多并发一致性问题。

丢失修改

“丢失修改”是指一个事务的更新操作被另外一个事务的更新操作替换

如下图所示,T1 和 T2 两个事务对同一个数据进行修改,T1 先修改,T2 随后修改,T2 的修改覆盖了 T1 的修改。

脏读

“脏读(dirty read)”是指当前事务可以读取其他事务未提交的数据

如下图所示,T1 修改一个数据,T2 随后读取这个数据。如果 T1 撤销了这次修改,那么 T2 读取的数据是脏数据。

不可重复读

“不可重复读(non-repeatable read)”是指一个事务内多次读取同一数据,过程中,该数据被其他事务所修改,导致当前事务多次读取的数据可能不一致

如下图所示,T2 读取一个数据,T1 对该数据做了修改。如果 T2 再次读取这个数据,此时读取的结果和第一次读取的结果不同。

幻读

“幻读(phantom read)”是指一个事务内多次读取同一范围的数据,过程中,其他事务在该数据范围新增了数据,导致当前事务未发现新增数据

事务 T1 读取某个范围内的记录时,事务 T2 在该范围内插入了新的记录,T1 再次读取这个范围的数据,此时读取的结果和和第一次读取的结果不同。

事务隔离级别

事务隔离级别简介

为了解决以上提到的“并发一致性问题”,SQL 标准提出了四种“事务隔离级别”来应对这些问题。事务隔离级别等级越高,越能保证数据的一致性和完整性,但是执行效率也越低。因此,设置数据库的事务隔离级别时需要做一下权衡。

事务隔离级别从低到高分别是:

  • “读未提交(read uncommitted)” - 是指,事务中的修改,即使没有提交,对其它事务也是可见的
  • “读已提交(read committed)” ** - 是指,事务提交后,其他事务才能看到它的修改**。换句话说,一个事务所做的修改在提交之前对其它事务是不可见的。
    • 读已提交解决了脏读的问题
    • 读已提交是大多数数据库的默认事务隔离级别,如 Oracle。
  • “可重复读(repeatable read)” - 是指:保证在同一个事务中多次读取同样数据的结果是一样的
    • 可重复读解决了不可重复读问题
    • 可重复读是 InnoDB 存储引擎的默认事务隔离级别
  • 串行化(serializable ) - 是指,强制事务串行执行,对于同一行记录,加读写锁,一旦出现锁冲突,必须等前面的事务释放锁。
    • 串行化解决了幻读问题。由于强制事务串行执行,自然避免了所有的并发问题。
    • 串行化策略会在读取的每一行数据上都加锁,这可能导致大量的超时和锁竞争。这对于高并发应用基本上是不可接受的,所以一般不会采用这个级别。

事务隔离级别对并发一致性问题的解决情况:

隔离级别 丢失修改 脏读 不可重复读 幻读
读未提交 ✔️️️
读已提交 ✔️️️ ✔️️️
可重复读 ✔️️️ ✔️️️ ✔️️️
可串行化 ✔️️️ ✔️️️ ✔️️️ ✔️️️

查看和设置事务隔离级别

可以通过 SHOW VARIABLES LIKE 'transaction_isolation' 语句查看事务隔离级别。

【示例】查看事务隔离示例

1
2
3
4
5
6
7
mysql> SHOW VARIABLES LIKE 'transaction_isolation';
+-----------------------+-----------------+
| Variable_name | Value |
+-----------------------+-----------------+
| transaction_isolation | REPEATABLE-READ |
+-----------------------+-----------------+
1 row in set (0.03 sec)

MySQL 提供了 SET TRANSACTION 语句,该语句可以改变单个会话或全局的事务隔离级别。语法格式如下:

1
SET [SESSION | GLOBAL] TRANSACTION ISOLATION LEVEL {READ UNCOMMITTED | READ COMMITTED | REPEATABLE READ | SERIALIZABLE}

其中,SESSIONGLOBAL 关键字用来指定修改的事务隔离级别的范围:

  • SESSION - 表示修改的事务隔离级别,将应用于当前会话内的所有事务。
  • GLOBAL - 表示修改的事务隔离级别,将应用于所有会话内的所有事务(即全局修改),且当前已经存在的会话不受影响;
  • 如果省略 SESSIONGLOBAL,表示修改的事务隔离级别,将应用于当前会话内的下一个还未开始的事务。

【示例】设置事务隔离示例

1
2
3
4
5
6
7
8
9
10
11
-- 设置事务隔离级别为 READ UNCOMMITTED
SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;

-- 设置事务隔离级别为 READ COMMITTED
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;

-- 设置事务隔离级别为 REPEATABLE READ
SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;

-- 设置事务隔离级别为 SERIALIZABLE
SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE;

事务隔离级别实现方式

Mysql 中的事务功能是在存储引擎层实现的,并非所有存储引擎都支持事务功能。InnoDB 是 Mysql 的首先事务存储引擎。

四种隔离级别具体是如何实现的呢?

以 InnoDB 的事务实现来说明:

  • 对于“读未提交”隔离级别的事务来说,因为可以读到未提交事务修改的数据,所以直接读取最新的数据就好了;
  • 对于“串行化”隔离级别的事务来说,通过加读写锁的方式来避免并行访问;
  • 对于“读提交”和“可重复读”隔离级别的事务来说,它们都是通过 ReadView 来实现的,区别仅在于创建 ReadView 的时机不同。ReadView 可以理解为一个数据快照。
    • “读提交”隔离级别是在“每个语句执行前”都会重新生成一个 ReadView
    • “可重复读”隔离级别是在“启动事务时”生成一个 ReadView,然后整个事务期间都在用这个 ReadView。

关于 ReadView 更多细节,将在 MVCC 章节中阐述。

MVCC

当前读和快照读

在高并发场景下,多事务同时执行,可能会出现种种并发一致性问题。最常见,也是最容易想到的解决问题思路就是:对访问的数据加锁,通过强制互斥来解决问题。但是,加锁就意味着阻塞,势必会增加响应时间,降低系统整体吞吐量。在大多数真实的业务场景中,读请求远大于写请求,由于读请求并不会修改数据,自然也不存在一致性问题,因此为占大多数的读请求加锁是一种不必要的开销。那么,我们很自然的会想到,如果只针对写操作加锁,不就能大大提升吞吐量了吗?没错,有一种名为“写时复制(Copy-On-Write,简称 COW)”的技术,正是基于这个想法而设计,并广泛应用于各种软件领域,例如:Java 中的 CopyOnWriteArrayList 等容器;Redis 中的 RDB 持久化过程。

Copy-On-Write 的核心思想是:假设有多个请求需要访问相同的数据,先为这份数据生成一个副本(也可以称为快照)。然后将读写分离,所有的读请求都直接访问原数据;所有的写请求都访问副本数据,为了实现并发一致性,写数据时需要通过加锁保证每次写操作只能由一个写请求完成。当写操作完成后,用副本数据替换原数据。

在 Mysql 中,也采用了 Copy-On-Write 设计思想,将读写分离。

  • 这里的“写”指的是当前读。“当前读”,顾名思义,指的是读取记录当前的数据。为了保证读取当前数据时,没有其他事务修改,因此需要对读取记录加锁。当前读的场景有下面几种:
    • INSERT - 插入操作
    • UPDATE - 更新操作
    • DELETE - 删除操作
    • SELECT ... LOCK IN SHARE MODE - 加共享锁(读锁)
    • SELECT ... FOR UPDATE - 加独享锁(写锁)
  • 这里的“读”指的是快照读。“快照读”,顾名思义,指的是读取记录的某个历史快照版本。不加锁的普通 SELECT 都属于快照读,例如:SELECT ... FROM。采用快照读的前提是,事务隔离级别不是串行化级别。串行化级别下的快照读会退化成当前读。快照读的实现是基于 MVCC。

什么是 MVCC

前文提到,快照读的实现是基于 MVCC。那么,什么是 MVCC 呢?

MVCC 是 Multi Version Concurrency Control 的缩写,即“多版本并发控制”。MVCC 的设计目标是提高数据库的并发性,采用非阻塞的方式去处理读/写并发冲突,可以将其看成一种乐观锁。

不仅是 Mysql,包括 Oracle、PostgreSQL 等其他关系型数据库都实现了各自的 MVCC,实现机制没有统一标准。MVCC 是 InnoDB 存储引擎实现事务隔离级别的一种具体方式。其主要用于实现读已提交和可重复读这两种隔离级别。而未提交读隔离级别总是读取最新的数据行,要求很低,无需使用 MVCC。可串行化隔离级别需要对所有读取的行都加锁,单纯使用 MVCC 无法实现。

MVCC 实现原理

MVCC 的实现原理,主要基于隐式字段、UndoLog、ReadView 来实现。

隐式字段

InnoDB 存储引擎中,数据表的每行记录,除了用户显示定义的字段以外,还有几个数据库隐式定义的字段:

  • DB_ROW_ID - 隐藏的自增 ID,如果数据表没有指定主键,InnoDB 会自动基于 row_id 产生一个聚簇索引。
  • DB_TRX_ID - 最近修改的事务 ID。事务对某条聚簇索引记录进行改动时,就会把该事务的事务 id 记录在 trx_id 隐藏列里;
  • DB_ROLL_PTR - 回滚指针,指向这条记录的上一个版本。

UndoLog

MVCC 的多版本指的是多个版本的快照,快照存储在 UndoLog 中。该日志通过回滚指针 roll_pointer 把一个数据行的所有快照链接起来,构成一个版本链

ReadView

ReadView 就是事务进行快照读时产生的读视图(快照)

ReadView 有四个重要的字段:

  • m_ids - 指的是在创建 ReadView 时,当前数据库中“活跃事务”的事务 ID 列表。注意:这是一个列表,“活跃事务”指的就是,启动了但还没提交的事务
  • min_trx_id - 指的是在创建 ReadView 时,当前数据库中“活跃事务”中事务 id 最小的事务,也就是 m_ids 的最小值。
  • max_trx_id - 这个并不是 m_ids 的最大值,而是指创建 ReadView 时当前数据库中应该给下一个事务分配的 ID 值,也就是全局事务中最大的事务 ID 值 + 1;
  • creator_trx_id - 指的是创建该 ReadView 的事务的事务 ID。

在创建 ReadView 后,我们可以将记录中的 trx_id 划分为三种情况:

  • 已提交事务
  • 已启动但未提交的事务
  • 未启动的事务

ReadView 如何判断版本链中哪个版本可见?

一个事务去访问记录的时候,除了自己的更新记录总是可见之外,还有这几种情况:

  • trx_id == creator_trx_id - 表示 trx_id 版本记录由 ReadView 所代表的当前事务产生,当然可以访问。
  • trx_id < min_trx_id - 表示 trx_id 版本记录是在创建 ReadView 之前已提交的事务生成的,当前事务可以访问。
  • trx_id >= max_trx_id - 表示 trx_id 版本记录是在创建 ReadView 之后才启动的事务生成的,当前事务不可以访问。
  • min_trx_id <= trx_id < max_trx_id - 需要判断 trx_id 是否在 m_ids 列表中
    • 如果 trx_idm_ids 列表中,表示生成 trx_id 版本记录的事务依然活跃(未提交事务),当前事务不可以访问。
    • 如果 trx_id 不在 m_ids 列表中,表示生成 trx_id 版本记录的事务已提交,当前事务可以访问。

这种通过“版本链”来控制并发事务访问同一个记录时的行为就叫 MVCC(多版本并发控制)。

MVCC 如何实现多种事务隔离级别

对于“读已提交”和“可重复读”隔离级别的事务来说,它们都是通过 MVCC 的 ReadView 机制来实现的,区别仅在于创建 ReadView 的时机不同。ReadView 可以理解为一个数据快照。

  • “读已提交”隔离级别,会在“每个语句执行前”都会重新生成一个 ReadView。
  • “可重复读”隔离级别,会在“启动事务时”生成一个 ReadView,然后整个事务期间都在复用这个 ReadView。

MySQL InnoDB 引擎的默认隔离级别虽然是“可重复读”,但是它很大程度上避免幻读现象(并不是完全解决了),解决的方案有两种:

  • 针对快照读(普通 select 语句),通过 MVCC 方式解决了幻读,因为可重复读隔离级别下,事务执行过程中看到的数据,一直跟这个事务启动时看到的数据是一致的,即使中途有其他事务插入了一条数据,是查询不出来这条数据的,所以就很好了避免幻读问题。
  • 针对当前读(select … for update 等语句),通过 Next-Key Lock(记录锁+间隙锁)方式解决了幻读,因为当执行 select … for update 语句的时候,会加上 Next-Key Lock,如果有其他事务在 Next-Key Lock 锁范围内插入了一条记录,那么这个插入语句就会被阻塞,无法成功插入,所以就很好了避免幻读问题。

MVCC 实现可重复读

可重复读隔离级别只有在启动事务时才会创建 ReadView,然后整个事务期间都使用这个 ReadView。这样就保证了在事务期间读到的数据都是事务启动前的记录。

举个例子,假设有两个事务依次执行以下操作:

  • 初始,表中 id = 1 的 value 列值为 100。
  • 事务 2 读取数据,value 为 100;
  • 事务 1 将 value 设为 200;
  • 事务 2 读取数据,value 为 100;
  • 事务 1 提交事务;
  • 事务 2 读取数据,value 依旧为 100;

以上操作,如下图所示。T2 事务在事务过程中,是否可以看到 T1 事务的修改,可以根据 ReadView 中描述的规则去判断。

从图中不难看出:

  • 对于 trx_id = 100 的版本记录,比对 T2 事务 ReadView ,trx_id < min_trx_id,因此在 T2 事务中的任意时刻都可见;
  • 对于 trx_id = 101 的版本记录,比对 T2 事务 ReadView ,可以看出 min_trx_id <= trx_id < max_trx_id ,且 trx_idm_ids 中,因此 T2 事务中不可见。

综上所述,在 T2 事务中,自始至终只能看到 trx_id = 100 的版本记录。

MVCC 实现读已提交

读已提交隔离级别每次读取数据时都会创建一个 ReadView。这意味着,事务期间的多次读取同一条数据,前后读取的数据可能会出现不一致——因为,这期间可能有另外一个事务修改了该记录,并提交了事务。

举个例子,假设有两个事务依次执行以下操作:

  • 初始,表中 id = 1 的 value 列值为 100。
  • 事务 2 读取数据(创建 ReadView),value 为 0;
  • 事务 1 将 value 设为 100;
  • 事务 2 读取数据(创建 ReadView),value 为 0;
  • 事务 1 提交事务;
  • 事务 2 读取数据(创建 ReadView),value 为 100;

以上操作,如下图所示,T2 事务在事务过程中,是否可以看到其他事务的修改,可以根据 ReadView 中描述的规则去判断。

从图中不难看出:

  • 对于 trx_id = 100 的版本记录,比对 T2 事务 ReadView ,trx_id < min_trx_id,因此在 T2 事务中的任意时刻都可见;
  • 对于 trx_id = 101 的版本记录,比对 T2 事务 ReadView ,可以看出第二次查询时(T1 更新未提交),min_trx_id <= trx_id < max_trx_id ,且 trx_idm_ids 中,因此 T2 事务中不可见;而第三次查询时(T1 更新已提交),trx_id < min_trx_id,因此在 T2 事务中可见;

综上所述,在 T2 事务中,当 T1 事务提交前,可读取到的是 trx_id = 100 的版本记录;当 T1 事务提交后,可读取到的是 trx_id = 101 的版本记录。

MVCC + Next-Key Lock 解决幻读

MySQL InnoDB 引擎的默认隔离级别虽然是“可重复读”,但是它很大程度上避免幻读现象(并不是完全解决了)。针对快照读和当前读,InnoDB 的处理方式各不相同。

快照读是如何避免幻读的?

针对快照读(普通 SELECT 语句),通过 MVCC 方式解决了幻读,因为可重复读隔离级别下,事务执行过程中看到的数据,一直跟这个事务启动时看到的数据是一致的,即使中途有其他事务插入了一条数据,是查询不出来这条数据的,所以就很好了避免幻读问题。

当前读是如何避免幻读的?

针对当前读SELECT ... FOR UPDATE 等语句),通过 Next-Key Lock(记录锁+间隙锁)方式解决了幻读,因为当执行 SELECT ... FOR UPDATE 语句的时候,会加上 Next-Key Lock,如果有其他事务在 Next-Key Lock 锁范围内插入了一条记录,那么这个插入语句就会被阻塞,无法成功插入,所以就很好的避免了幻读问题。

幻读被完全解决了吗?

可重复读隔离级别下虽然很大程度上避免了幻读,但是还是没有能完全解决幻读

【示例】幻读案例一

环境:存储引擎为 InnoDB;事务隔离级别为可重复读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
-- --------------------------------------------------------------------------------------
-- 实验说明:以下 SQL 脚本必须严格按照顺序执行,并且事务 A 和事务 B 必须在不同的 Client 中执行。
-- ----------------------------------------------------------------------------------------

-- --------------------------------------------------------------------- (1)数据初始化

-- 创建表 test
CREATE TABLE `test` (
`id` INT(10) UNSIGNED PRIMARY KEY AUTO_INCREMENT,
`value` INT(10) NOT NULL
);

-- 数据初始化
INSERT INTO `test` (`id`, `value`) VALUES (1, 1);
INSERT INTO `test` (`id`, `value`) VALUES (2, 2);
INSERT INTO `test` (`id`, `value`) VALUES (3, 3);

-- --------------------------------------------------------------------- (2)事务 A

BEGIN;

-- 查询 id = 4 的记录
SELECT * FROM `test` WHERE `id` = 4;
-- 结果为空

-- --------------------------------------------------------------------- (3)事务 B

BEGIN;

INSERT INTO `test` (`id`, `value`) VALUES (4, 4);

COMMIT;

-- --------------------------------------------------------------------- (4)事务 A

-- 查询 id = 4 的记录
SELECT * FROM `test` WHERE `id` = 4;
-- 结果依然为空

-- 成功更新本应看不到的记录 id = 4
UPDATE `test` SET `value` = 0 WHERE `id` = 4;

-- 再一次查询 id = 4 的记录
SELECT * FROM `test` WHERE `id` = 4;
-- 结果为:
-- +----+-------+
-- | id | value |
-- +----+-------+
-- | 4 | 0 |
-- +----+-------+

COMMIT;

以上示例代码的时序图如下:

【示例】幻读案例二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
-- --------------------------------------------------------------------- (1)数据初始化

-- 创建表 test
CREATE TABLE `test` (
`id` INT(10) UNSIGNED PRIMARY KEY AUTO_INCREMENT,
`value` INT(10) NOT NULL
);

-- 数据初始化
INSERT INTO `test` (`id`, `value`) VALUES (1, 1);
INSERT INTO `test` (`id`, `value`) VALUES (2, 2);
INSERT INTO `test` (`id`, `value`) VALUES (3, 3);

-- --------------------------------------------------------------------- (2)事务 A

BEGIN;

-- 查询 id > 2 的记录数
SELECT COUNT(*) FROM `test` WHERE `id` > 2;
-- 结果为:
-- +----------+
-- | count(*) |
-- +----------+
-- | 1 |
-- +----------+

-- --------------------------------------------------------------------- (3)事务 B

BEGIN;

INSERT INTO `test` (`id`, `value`) VALUES (4, 4);

COMMIT;

-- --------------------------------------------------------------------- (4)事务 A

-- 查询 id > 2 的记录数
SELECT COUNT(*) FROM `test` WHERE `id` > 2 FOR UPDATE;
-- 结果为:
-- +----------+
-- | count(*) |
-- +----------+
-- | 2 |
-- +----------+

COMMIT;

要避免这类特殊场景下发生幻读的现象的话,就是尽量在开启事务之后,马上执行 select ... for update 这类当前读的语句,因为它会对记录加 Next-Key Lock,从而避免其他事务插入一条新记录。

分布式事务

在单一数据节点中,事务仅限于对单一数据库资源的访问控制,称之为 本地事务。几乎所有的成熟的关系型数据库都提供了对本地事务的原生支持。

分布式事务指的是事务操作跨越多个节点,并且要求满足事务的 ACID 特性。

分布式事务的常见方案如下:

  • 两阶段提交(2PC) - 将事务的提交过程分为两个阶段来进行处理:准备阶段和提交阶段。参与者将操作成败通知协调者,再由协调者根据所有参与者的反馈情报决定各参与者是否要提交操作还是中止操作。
  • 三阶段提交(3PC) - 与二阶段提交不同的是,引入超时机制。同时在协调者和参与者中都引入超时机制。将二阶段的准备阶段拆分为 2 个阶段,插入了一个 preCommit 阶段,使得原先在二阶段提交中,参与者在准备之后,由于协调者发生崩溃或错误,而导致参与者处于无法知晓是否提交或者中止的“不确定状态”所产生的可能相当长的延时的问题得以解决。
  • 补偿事务(TCC)
    • Try - 操作作为一阶段,负责资源的检查和预留。
    • Confirm - 操作作为二阶段提交操作,执行真正的业务。
    • Cancel - 是预留资源的取消。
  • 本地消息表 - 在事务主动发起方额外新建事务消息表,事务发起方处理业务和记录事务消息在本地事务中完成,轮询事务消息表的数据发送事务消息,事务被动方基于消息中间件消费事务消息表中的事务。
  • MQ 事务 - 基于 MQ 的分布式事务方案其实是对本地消息表的封装。
  • SAGA - Saga 事务核心思想是将长事务拆分为多个本地短事务,由 Saga 事务协调器协调,如果正常结束那就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。

分布式事务方案分析:

  • 2PC/3PC 依赖于数据库,能够很好的提供强一致性和强事务性,但相对来说延迟比较高,比较适合传统的单体应用,在同一个方法中存在跨库操作的情况,不适合高并发和高性能要求的场景。
  • TCC 适用于执行时间确定且较短,实时性要求高,对数据一致性要求高,比如互联网金融企业最核心的三个服务:交易、支付、账务。
  • 本地消息表/MQ 事务 都适用于事务中参与方支持操作幂等,对一致性要求不高,业务上能容忍数据不一致到一个人工检查周期,事务涉及的参与方、参与环节较少,业务上有对账/校验系统兜底。
  • Saga 事务 由于 Saga 事务不能保证隔离性,需要在业务层控制并发,适合于业务场景事务并发操作同一资源较少的情况。 Saga 相比缺少预提交动作,导致补偿动作的实现比较麻烦,例如业务是发送短信,补偿动作则得再发送一次短信说明撤销,用户体验比较差。Saga 事务较适用于补偿动作容易处理的场景。

分布式事务详细说明、分析请参考:分布式事务基本原理

事务最佳实践

高并发场景下的事务到底该如何调优?

尽量使用低级别事务隔离

结合业务场景,尽量使用低级别事务隔离

避免行锁升级表锁

在 InnoDB 中,行锁是通过索引实现的,如果不通过索引条件检索数据,行锁将会升级到表锁。我们知道,表锁是会严重影响到整张表的操作性能的,所以应该尽力避免。

缩小事务范围

有时候,数据库并发访问量太大,会出现以下异常:

1
MySQLQueryInterruptedException: Query execution was interrupted

高并发时对一条记录进行更新的情况下,由于更新记录所在的事务还可能存在其他操作,导致一个事务比较长,当有大量请求进入时,就可能导致一些请求同时进入到事务中。

又因为锁的竞争是不公平的,当多个事务同时对一条记录进行更新时,极端情况下,一个更新操作进去排队系统后,可能会一直拿不到锁,最后因超时被系统打断踢出。

img

如上图中的操作,虽然都是在一个事务中,但锁的申请在不同时间,只有当其他操作都执行完,才会释放所有锁。因为扣除库存是更新操作,属于行锁,这将会影响到其他操作该数据的事务,所以我们应该尽量避免长时间地持有该锁,尽快释放该锁。又因为先新建订单和先扣除库存都不会影响业务,所以我们可以将扣除库存操作放到最后,也就是使用执行顺序 1,以此尽量减小锁的持有时间。

在 InnoDB 事务中,行锁是在需要的时候才加上的,但并不是不需要了就立刻释放,而是要等到事务结束时才释放。这个就是两阶段锁协议。

知道了这个设定,对我们使用事务有什么帮助呢?那就是,如果你的事务中需要锁多个行,要把最可能造成锁冲突、最可能影响并发度的锁尽量往后放。

参考资料

Kafka 快速入门

Apache Kafka 是一款开源的消息引擎系统,也是一个分布式流计算平台,此外,还可以作为数据存储

Kafka 简介

Apache Kafka 是一款开源的消息引擎系统,也是一个分布式流计算平台,此外,还可以作为数据存储

img

Kafka 的功能

Kafka 的核心功能如下:

  • 消息引擎 - Kafka 可以作为一个消息引擎系统。
  • 流处理 - Kafka 可以作为一个分布式流处理平台。
  • 存储 - Kafka 可以作为一个安全的分布式存储。

Kafka 的特性

Kafka 的设计目标:

  • 高性能
    • 分区、分段、索引:基于分区机制提供并发处理能力。分段、索引提升了数据读写的查询效率。
    • 顺序读写:使用顺序读写提升磁盘 IO 性能。
    • 零拷贝:利用零拷贝技术,提升网络 I/O 效率。
    • 页缓存:利用操作系统的 PageCache 来缓存数据(典型的利用空间换时间)
    • 批量读写:批量读写可以有效提升网络 I/O 效率。
    • 数据压缩:Kafka 支持数据压缩,可以有效提升网络 I/O 效率。
    • pull 模式:Kafka 架构基于 pull 模式,可以自主控制消费策略,提升传输效率。
  • 高可用
    • 持久化:Kafka 所有的消息都存储在磁盘,天然支持持久化。
    • 副本机制:Kafka 的 Broker 集群支持副本机制,可以通过冗余,来保证其整体的可用性。
    • 选举 Leader:Kafka 基于 ZooKeeper 支持选举 Leader,实现了故障转移能力。
  • 伸缩性
    • 分区:Kafka 的分区机制使得其具有良好的伸缩性。

Kafka 术语

  • 消息:Kafka 的数据单元被称为消息。消息由字节数组组成。
  • 批次:批次就是一组消息,这些消息属于同一个主题和分区。
  • 主题(Topic):Kafka 消息通过主题进行分类。主题就类似数据库的表。
    • 不同主题的消息是物理隔离的;
    • 同一个主题的消息保存在一个或多个 Broker 上。但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处。
    • 主题有一个或多个分区。
  • 分区(Partition):分区是一个有序不变的消息序列,消息以追加的方式写入分区,然后以先入先出的顺序读取。Kafka 通过分区来实现数据冗余和伸缩性。
  • 消息偏移量(Offset):表示分区中每条消息的位置信息,是一个单调递增且不变的值。
  • 生产者(Producer):生产者是向主题发布新消息的 Kafka 客户端。生产者可以将数据发布到所选择的主题中。生产者负责将记录分配到主题中的哪一个分区中。
  • 消费者(Consumer):消费者是从主题订阅新消息的 Kafka 客户端。消费者通过检查消息的偏移量来区分消息是否已读。
  • 消费者群组(Consumer Group):多个消费者共同构成的一个群组,同时消费多个分区以实现高并发。
    • 每个消费者属于一个特定的消费者群组(可以为每个消费者指定消费者群组,若不指定,则属于默认的群组)。
    • 群组中,一个消费者可以消费多个分区
    • 群组中,每个分区只能被指定给一个消费
  • 再均衡(Rebalance):消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。分区再均衡是 Kafka 消费者端实现高可用的重要手段。
  • Broker - 一个独立的 Kafka 服务器被称为 Broker。Broker 接受来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存;消费者向 Broker 请求消息,Broker 负责返回已提交的消息。
  • 副本(Replica):Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。

Kafka 发行版本

Kafka 主要有以下发行版本:

  • Apache Kafka:也称社区版 Kafka。优势在于迭代速度快,社区响应度高,使用它可以让你有更高的把控度;缺陷在于仅提供基础核心组件,缺失一些高级的特性。
  • Confluent Kafka:Confluent 公司提供的 Kafka。优势在于集成了很多高级特性且由 Kafka 原班人马打造,质量上有保证;缺陷在于相关文档资料不全,普及率较低,没有太多可供参考的范例。
  • CDH/HDP Kafka:大数据云公司提供的 Kafka,内嵌 Apache Kafka。优势在于操作简单,节省运维成本;缺陷在于把控度低,演进速度较慢。

Kafka 重大版本

Kafka 有以下重大版本:

  • 0.8
    • 正式引入了副本机制
    • 至少升级到 0.8.2.2
  • 0.9
    • 增加了基础的安全认证 / 权限功能
    • 新版本 Producer API 在这个版本中算比较稳定
  • 0.10
    • 引入了 Kafka Streams
    • 至少升级到 0.10.2.2
    • 修复了一个可能导致 Producer 性能降低的 Bug
    • 使用新版本 Consumer API
  • 0.11
    • 提供幂等性 Producer API 以及事务
    • 对 Kafka 消息格式做了重构
    • 至少升级到 0.11.0.3
  • 1.0 和 2.0
    • Kafka Streams 的改进

Kafka 服务端使用入门

步骤一、获取 Kafka

下载最新的 Kafka 版本并解压到本地。

1
2
$ tar -xzf kafka_2.13-2.7.0.tgz
$ cd kafka_2.13-2.7.0

步骤二、启动 Kafka 环境

注意:本地必须已安装 Java8

执行以下指令,保证所有服务按照正确的顺序启动:

1
2
3
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties

打开另一个终端会话,并执行:

1
2
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties

一旦所有服务成功启动,您就已经成功运行了一个基本的 kafka 环境。

步骤三、创建一个 TOPIC 并存储您的事件

Kafka 是一个分布式事件流处理平台,它可以让您通过各种机制读、写、存储并处理事件(events,也被称为记录或消息)

示例事件包括付款交易,手机的地理位置更新,运输订单,物联网设备或医疗设备的传感器测量等等。 这些事件被组织并存储在主题中(topics)。 简单来说,主题类似于文件系统中的文件夹,而事件是该文件夹中的文件。

因此,在您写入第一个事件之前,您必须先创建一个 Topic。执行以下指令:

1
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

所有的 Kafka 命令行工具都有附加可选项:不加任何参数,运行 kafka-topics.sh 命令会显示使用信息。例如,会显示新 Topic 的分区数等细节。

1
2
3
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic:quickstart-events PartitionCount:1 ReplicationFactor:1 Configs:
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0

步骤四、向 Topic 写入 Event

Kafka 客户端和 Kafka Broker 的通信是通过网络读写 Event。一旦收到信息,Broker 会将其以您需要的时间(甚至永久化)、容错化的方式存储。

执行 kafka-console-producer.sh 命令将 Event 写入 Topic。默认,您输入的任意行会作为独立 Event 写入 Topic:

1
2
3
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event

您可以通过 Ctrl-C 在任何时候中断 kafka-console-producer.sh

步骤五、读 Event

执行 kafka-console-consumer.sh 以读取写入 Topic 中的 Event

1
2
3
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

您可以通过 Ctrl-C 在任何时候中断 kafka-console-consumer.sh

由于 Event 被持久化存储在 Kafka 中,因此您可以根据需要任意多次地读取它们。 您可以通过打开另一个终端会话并再次重新运行上一个命令来轻松地验证这一点。

步骤六、通过 KAFKA CONNECT 将数据作为事件流导入/导出

您可能有大量数据,存储在传统的关系数据库或消息队列系统中,并且有许多使用这些系统的应用程序。 通过 Kafka Connect,您可以将来自外部系统的数据持续地导入到 Kafka 中,反之亦然。 因此,将已有系统与 Kafka 集成非常容易。为了使此过程更加容易,有数百种此类连接器可供使用。

需要了解有关如何将数据导入和导出 Kafka 的更多信息,可以参考:Kafka Connect section 章节。

步骤七、使用 Kafka Streams 处理事件

一旦将数据作为 Event 存储在 Kafka 中,就可以使用 Kafka Streams 的 Java / Scala 客户端。它允许您实现关键任务的实时应用程序和微服务,其中输入(和/或)输出数据存储在 Kafka Topic 中。

Kafka Streams 结合了 Kafka 客户端编写和部署标准 Java 和 Scala 应用程序的简便性,以及 Kafka 服务器集群技术的优势,使这些应用程序具有高度的可伸缩性、弹性、容错性和分布式。该库支持一次性处理,有状态的操作,以及聚合、窗口化化操作、join、基于事件时间的处理等等。

1
2
3
4
5
6
7
8
KStream<String, String> textLines = builder.stream("quickstart-events");

KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
.groupBy((keyIgnored, word) -> word)
.count();

wordCounts.toStream().to("output-topic"), Produced.with(Serdes.String(), Serdes.Long()));

Kafka Streams demoapp development tutorial 展示了如何从头到尾的编码并运行一个流式应用。

步骤八、终止 Kafka 环境

  1. 如果尚未停止,请使用 Ctrl-C 停止生产者和消费者客户端。
  2. 使用 Ctrl-C 停止 Kafka 代理。
  3. 最后,使用 Ctrl-C 停止 ZooKeeper 服务器。

如果您还想删除本地 Kafka 环境的所有数据,包括您在此过程中创建的所有事件,请执行以下命令:

1
$ rm -rf /tmp/kafka-logs /tmp/zookeeper

Kafka Java 客户端使用入门

引入 maven 依赖

Stream API 的 maven 依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.1.0</version>
</dependency>

其他 API 的 maven 依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>

Kafka 核心 API

Kafka 有 5 个核心 API

  • Producer API - 允许一个应用程序发布一串流式数据到一个或者多个 Kafka Topic。
  • Consumer API - 允许一个应用程序订阅一个或多个 Kafka Topic,并且对发布给他们的流式数据进行处理。
  • Streams API - 允许一个应用程序作为一个流处理器,消费一个或者多个 Kafka Topic 产生的输入流,然后生产一个输出流到一个或多个 Kafka Topic 中去,在输入输出流中进行有效的转换。
  • Connector API - 允许构建并运行可重用的生产者或者消费者,将 Kafka Topic 连接到已存在的应用程序或数据库。例如,连接到一个关系型数据库,捕捉表的所有变更内容。
  • Admin API - 支持管理和检查 Topic,Broker,ACL 和其他 Kafka 对象。

发送消息

发送并忽略返回

代码如下,直接通过 send 方法来发送

1
2
3
4
5
6
7
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}

同步发送

代码如下,与“发送并忘记”的方式区别在于多了一个 get 方法,会一直阻塞等待 Broker 返回结果:

1
2
3
4
5
6
7
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}

异步发送

代码如下,异步方式相对于“发送并忽略返回”的方式的不同在于:在异步返回时可以执行一些操作,如记录错误或者成功日志。

首先,定义一个 callback

1
2
3
4
5
6
7
8
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}

然后,使用这个 callback

1
2
3
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());

发送消息示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
* Kafka 生产者生产消息示例 生产者配置参考:https://kafka.apache.org/documentation/#producerconfigs
*/
public class ProducerDemo {
private static final String HOST = "localhost:9092";

public static void main(String[] args) {
// 1. 指定生产者的配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, HOST);
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

// 2. 使用配置初始化 Kafka 生产者
Producer<String, String> producer = new KafkaProducer<>(properties);

try {
// 3. 使用 send 方法发送异步消息
for (int i = 0; i < 100; i++) {
String msg = "Message " + i;
producer.send(new ProducerRecord<>("HelloWorld", msg));
System.out.println("Sent:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 4. 关闭生产者
producer.close();
}
}
}

消费消息流程

消费流程

具体步骤如下:

  1. 创建消费者。
  2. 订阅主题。除了订阅主题方式外还有使用指定分组的模式,但是常用方式都是订阅主题方式
  3. 轮询消息。通过 poll 方法轮询。
  4. 关闭消费者。在不用消费者之后,会执行 close 操作。close 操作会关闭 socket,并触发当前消费者群组的再均衡。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 1.构建KafkaCustomer
Consumer consumer = buildCustomer();

// 2.设置主题
consumer.subscribe(Arrays.asList(topic));

// 3.接受消息
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
System.out.println("customer Message---");
for (ConsumerRecord<String, String> record : records)

// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
} finally {
// 4.关闭消息
consumer.close();
}

创建消费者的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public Consumer buildCustomer() {
Properties props = new Properties();
// bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
props.put("bootstrap.servers", "localhost:9092");
// 消费者群组
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);

return consumer;
}

消费消息方式

分为订阅主题和指定分组两种方式:

  • 消费者分组模式。通过订阅主题方式时,消费者必须加入到消费者群组中,即消费者必须有一个自己的分组;
  • 独立消费者模式。这种模式就是消费者是独立的不属于任何消费者分组,自己指定消费那些 Partition

(1)订阅主题方式

1
consumer.subscribe(Arrays.asList(topic));

(2)独立消费者模式

通过 consumer 的 assign(Collection<TopicPartition> partitions) 方法来为消费者指定分区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void consumeMessageForIndependentConsumer(String topic){
// 1.构建KafkaCustomer
Consumer consumer = buildCustomer();

// 2.指定分区
// 2.1获取可用分区
List<PartitionInfo> partitionInfoList = buildCustomer().partitionsFor(topic);
// 2.2指定分区,这里是指定了所有分区,也可以指定个别的分区
if(null != partitionInfoList){
List<TopicPartition> partitions = Lists.newArrayList();
for(PartitionInfo partitionInfo : partitionInfoList){
partitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
}
consumer.assign(partitions);
}

// 3.接受消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
System.out.println("consume Message---");
for (ConsumerRecord<String, String> record : records) {

// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());

// 异步提交
consumer.commitAsync();


}
}
}

参考资料