Dunwu Blog

大道至简,知易行难

Git 帮助手册

国外网友制作了一张 Git Cheat Sheet,总结很精炼,各位不妨收藏一下。

本节选择性介绍 git 中比较常用的命令行场景。

img

安装

(1)Debian/Ubuntu 环境安装

如果你使用的系统是 Debian/Ubuntu , 安装命令为:

1
2
3
4
5
$ apt-get install libcurl4-gnutls-dev libexpat1-dev gettext \
> libz-dev libssl-dev
$ apt-get install git-core
$ git --version
git version 1.8.1.2

(2)Centos/RedHat 环境安装

如果你使用的系统是 Centos/RedHat ,安装命令为:

1
2
3
4
5
$ yum install curl-devel expat-devel gettext-devel \
> openssl-devel zlib-devel
$ yum -y install git-core
$ git --version
git version 1.7.1

(3)Windows 环境安装

Git 官方下载地址下载 exe 安装包。按照安装向导安装即可。

建议安装 Git Bash 这个 git 的命令行工具。

(4)Mac 环境安装

Git 官方下载地址下载 mac 安装包。按照安装向导安装即可。

配置

Git 自带一个 git config 的工具来帮助设置控制 Git 外观和行为的配置变量。 这些变量存储在三个不同的位置:

  • /etc/gitconfig 文件: 包含系统上每一个用户及他们仓库的通用配置。 如果使用带有 --system 选项的 git config 时,它会从此文件读写配置变量。
  • ~/.gitconfig~/.config/git/config 文件:只针对当前用户。 可以传递 --global 选项让 Git 读写此文件。
  • 当前使用仓库的 Git 目录中的 config 文件(就是 .git/config):针对该仓库。

每一个级别覆盖上一级别的配置,所以 .git/config 的配置变量会覆盖 /etc/gitconfig 中的配置变量。

在 Windows 系统中,Git 会查找 $HOME 目录下(一般情况下是 C:\Users\$USER)的 .gitconfig 文件。 Git 同样也会寻找 /etc/gitconfig 文件,但只限于 MSys 的根目录下,即安装 Git 时所选的目标位置。

配置用户信息

当安装完 Git 应该做的第一件事就是设置你的用户名称与邮件地址。 这样做很重要,因为每一个 Git 的提交都会使用这些信息,并且它会写入到你的每一次提交中,不可更改:

1
2
git config --global user.name "John Doe"
git config --global user.email johndoe@example.com

再次强调,如果使用了 --global 选项,那么该命令只需要运行一次,因为之后无论你在该系统上做任何事情, Git 都会使用那些信息。 当你想针对特定项目使用不同的用户名称与邮件地址时,可以在那个项目目录下运行没有 --global 选项的命令来配置。

很多 GUI 工具都会在第一次运行时帮助你配置这些信息。

给 Git 命令添加别名

在 OS X 和 Linux 下, 你的 Git 的配置文件储存在 ~/.gitconfig。我在[alias] 部分添加了一些快捷别名(和一些我容易拼写错误的),如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[alias]
a = add
amend = commit --amend
c = commit
ca = commit --amend
ci = commit -a
co = checkout
d = diff
dc = diff --changed
ds = diff --staged
f = fetch
loll = log --graph --decorate --pretty=oneline --abbrev-commit
m = merge
one = log --pretty=oneline
outstanding = rebase -i @{u}
s = status
unpushed = log @{u}
wc = whatchanged
wip = rebase -i @{u}
zap = fetch -p

缓存一个仓库的用户名和密码

你可能有一个仓库需要授权,这时你可以缓存用户名和密码,而不用每次推/拉(push/pull)的时候都输入,Credential helper 能帮你。

1
2
git config --global credential.helper cache
## Set git to use the credential memory cache
1
2
git config --global credential.helper 'cache --timeout=3600'
## Set the cache to timeout after 1 hour (setting is in seconds)

仓库

初始化仓库

1
$ git init

克隆仓库

1
2
3
4
# 通过 SSH
$ git clone ssh://user@domain.com/repo.git
# 通过 HTTP
$ git clone http://domain.com/user/repo.git

储藏

有时,我们需要在同一个项目的不同分支上工作。当需要切换分支时,偏偏本地的工作还没有完成,此时,提交修改显得不严谨,但是不提交代码又无法切换分支。这时,你可以使用 git stash 将本地的修改内容作为草稿储藏起来。

官方称之为储藏,但我个人更喜欢称之为存草稿。

1
2
3
4
5
6
7
8
9
10
11
12
# 1. 将修改作为当前分支的草稿保存
$ git stash

# 2. 查看草稿列表
$ git stash list
stash@{0}: WIP on master: 6fae349 :memo: Writing docs.

# 3.1 删除草稿
$ git stash drop stash@{0}

# 3.2 读取草稿
$ git stash apply stash@{0}

暂存

git add 命令用于将修改添加到暂存区。

暂存指定文件

1
git add xxx

暂存当前目录下所有修改

1
git add .

暂存所有修改

1
git add -A

暂存文件部分内容

暂存文件部分内容

1
git add --patch filename.x

-p 简写。这会打开交互模式, 你将能够用 s 选项来分隔提交(commit);

然而, 如果这个文件是新的, 会没有这个选择, 添加一个新文件时,这样做:

1
git add -N filename.x

然后, 你需要用 e 选项来手动选择需要添加的行,执行 git diff --cached 将会显示哪些行暂存了哪些行只是保存在本地了。

把暂存的内容变成未暂存,把未暂存的内容暂存起来

这个有点困难, 我能想到的最好的方法是先 stash 未暂存的内容, 然后重置(reset),再 pop 第一步 stashed 的内容, 最后再 add 它们。

1
2
3
4
git stash -k
git reset --hard
git stash pop
git add -A

提交

git commit 命令用于将修改保存到到本地仓库。

查看最近一次提交

1
git show

或者

1
git log -n1 -p

提交本地的所有修改

1
git commit -a

提交暂存的修改

1
git commit

把暂存的内容添加到上一次的提交

1
git commit --amend

附加消息提交

1
git commit -m 'commit message'

修改提交信息

如果你的提交信息写错了且这次提交(commit)还没有推送(push),可以使用以下命令修改:

1
git commit --amend

或者

1
git commit --amend -m 'xxxxxxx'

修改提交信息中的用户名和邮箱

1
git commit --amend --author "New Authorname <authoremail@mydomain.com>"

从提交中移除一个文件

1
2
3
git checkout HEAD^ myfile
git add -A
git commit --amend

删除最后一次提交

如果你需要删除推了的提交(pushed commits),你可以使用下面的方法。可是,这会不可逆的改变你的历史,也会搞乱那些已经从该仓库拉取(pulled)了的人的历史。简而言之,如果你不是很确定,千万不要这么做。

1
2
git reset HEAD^ --hard
git push -f [remote] [branch]

如果你还没有推到远程, 把 Git 重置(reset)到你最后一次提交前的状态就可以了(同时保存暂存的变化):

1
2
(my-branch*)$ git reset --soft HEAD@{1}

这只能在没有推送之前有用. 如果你已经推了, 唯一安全能做的是 git revert SHAofBadCommit, 那会创建一个新的提交(commit)用于撤消前一个提交的所有变化(changes); 或者, 如果你推的这个分支是 rebase-safe 的 (例如: 其它开发者不会从这个分支拉), 只需要使用 git push -f; 更多, 请参考 the above section

删除任意提交

同样的警告:不到万不得已的时候不要这么做.

1
2
git rebase --onto SHA1_OF_BAD_COMMIT^ SHA1_OF_BAD_COMMIT
git push -f [remote] [branch]

或者做一个 交互式 rebase 删除那些你想要删除的提交(commit)里所对应的行。

我尝试推一个修正后的提交(amended commit)到远程,但是报错

1
2
3
4
5
6
7
To https://github.com/yourusername/repo.git
! [rejected] mybranch -> mybranch (non-fast-forward)
error: failed to push some refs to 'https://github.com/tanay1337/webmaker.org.git'
hint: Updates were rejected because the tip of your current branch is behind
hint: its remote counterpart. Integrate the remote changes (e.g.
hint: 'git pull ...') before pushing again.
hint: See the 'Note about fast-forwards' in 'git push --help' for details.

注意, rebasing(见下面)和修正(amending)会用一个新的提交(commit)代替旧的, 所以如果之前你已经往远程仓库上推过一次修正前的提交(commit),那你现在就必须强推(force push) (-f)。 注意 – 总是 确保你指明一个分支!

1
(my-branch)$ git push origin mybranch -f

一般来说, 要避免强推. 最好是创建和推(push)一个新的提交(commit),而不是强推一个修正后的提交。后者会使那些与该分支或该分支的子分支工作的开发者,在源历史中产生冲突。

不小心强制重置,想找回内容

如果你意外的做了 git reset --hard, 你通常能找回你的提交(commit), 因为 Git 对每件事都会有日志,且都会保存几天。

1
(master)$ git reflog

你将会看到一个你过去提交(commit)的列表, 和一个重置的提交。 选择你想要回到的提交(commit)的 SHA,再重置一次:

1
(master)$ git reset --hard SHA1234

这样就完成了。

重置

撤销本地修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 移除缓存区的所有文件(i.e. 撤销上次git add)
$ git reset HEAD

# 将HEAD重置到上一次提交的版本,并将之后的修改标记为未添加到缓存区的修改
$ git reset <commit>

# 将HEAD重置到上一次提交的版本,并保留未提交的本地修改
$ git reset --keep <commit>

# 放弃工作目录下的所有修改
$ git reset --hard HEAD

# 将HEAD重置到指定的版本,并抛弃该版本之后的所有修改
$ git reset --hard <commit-hash>

# 用远端分支强制覆盖本地分支
$ git reset --hard <remote/branch> e.g., upstream/master, origin/my-feature

# 放弃某个文件的所有本地修改
$ git checkout HEAD <file>

删除添加.gitignore文件前错误提交的文件:

1
2
3
4
# 提交一条 git 记录,提交信息为 remove xyz file
$ git rm -r --cached .
$ git add .
$ git commit -m "remove xyz file"

撤销远程修改(创建一个新的提交,并回滚到指定版本):

1
2
# revert 哈希号为 commit-hash 的记录
$ git revert <commit-hash>

彻底删除指定版本:

1
2
3
# 执行下面命令后,commit-hash 提交后的记录都会被彻底删除,使用需谨慎
$ git reset --hard <commit-hash>
$ git push -f

更新

1
2
3
4
5
6
7
8
# 下载远程端版本,但不合并到HEAD中
$ git fetch <remote>

# 将远程端版本合并到本地版本中
$ git pull origin master

# 以rebase方式将远端分支与本地合并
$ git pull --rebase <remote> <branch>

推送

推送提交到远程仓库

1
git push remote <remote> <branch>

发布标签

1
git push --tags

未暂存

未暂存(Unstaged)的内容

把未暂存的内容移动到一个新分支

  • git checkout -b my-branch

我想把未暂存的内容移动到另一个已存在的分支

1
2
3
git stash
git checkout my-branch
git stash pop

丢弃本地未提交的变化

如果你只是想重置源(origin)和你本地(local)之间的一些提交(commit),你可以:

1
2
3
4
5
6
7
8
## one commit
$ git reset --hard HEAD^
## two commits
$ git reset --hard HEAD^^
## four commits
$ git reset --hard HEAD~4
## or
$ git checkout -f

重置某个特殊的文件, 你可以用文件名做为参数:

1
git reset filename

我想丢弃某些未暂存的内容

如果你想丢弃工作拷贝中的一部分内容,而不是全部。

签出(checkout)不需要的内容,保留需要的。

1
2
$ git checkout -p
## Answer y to all of the snippets you want to drop

另外一个方法是使用 stash, Stash 所有要保留下的内容, 重置工作拷贝, 重新应用保留的部分。

1
2
3
4
$ git stash -p
## Select all of the snippets you want to save
$ git reset --hard
$ git stash pop

或者, stash 你不需要的部分, 然后 stash drop。

1
2
3
$ git stash -p
## Select all of the snippets you don't want to save
$ git stash drop

分支

分支(Branches)

列出所有的分支

1
git branch

列出所有的远端分支

1
git branch -r

基于当前分支创建新分支

1
git branch <new-branch>

基于远程分支创建新分支

1
git branch --track <new-branch> <remote-branch>

删除本地分支

1
git branch -d <branch>

强制删除本地分支

注意:强制删除本地分支,将会丢失未合并的修改

1
git branch -D <branch>

删除远程分支

1
2
git push <remote> :<branch> (since Git v1.5.0)
git push <remote> --delete <branch> (since Git v1.7.0)

切换分支

1
git checkout <branch>

创建并切换到新分支

1
git checkout -b <branch>

我从错误的分支拉取了内容,或把内容拉取到了错误的分支

这是另外一种使用 git reflog 情况,找到在这次错误拉(pull) 之前 HEAD 的指向。

1
2
3
(master)$ git reflog
ab7555f HEAD@{0}: pull origin wrong-branch: Fast-forward
c5bc55a HEAD@{1}: checkout: checkout message goes here

重置分支到你所需的提交(desired commit):

1
git reset --hard c5bc55a

完成。

我想扔掉本地的提交(commit),以便我的分支与远程的保持一致

先确认你没有推(push)你的内容到远程。

git status 会显示你领先(ahead)源(origin)多少个提交:

1
2
3
4
5
(my-branch)$ git status
## On branch my-branch
## Your branch is ahead of 'origin/my-branch' by 2 commits.
## (use "git push" to publish your local commits)
#

一种方法是:

1
(master)$ git reset --hard origin/my-branch

我需要提交到一个新分支,但错误的提交到了 master

在 master 下创建一个新分支,不切换到新分支,仍在 master 下:

1
(master)$ git branch my-branch

把 master 分支重置到前一个提交:

1
(master)$ git reset --hard HEAD^

HEAD^HEAD^1 的简写,你可以通过指定要设置的HEAD来进一步重置。

或者, 如果你不想使用 HEAD^, 找到你想重置到的提交(commit)的 hash(git log 能够完成), 然后重置到这个 hash。 使用git push 同步内容到远程。

例如, master 分支想重置到的提交的 hash 为a13b85e:

1
2
(master)$ git reset --hard a13b85e
HEAD is now at a13b85e

签出(checkout)刚才新建的分支继续工作:

1
(master)$ git checkout my-branch

我想保留来自另外一个 ref-ish 的整个文件

假设你正在做一个原型方案(原文为 working spike (see note)), 有成百的内容,每个都工作得很好。现在, 你提交到了一个分支,保存工作内容:

1
(solution)$ git add -A && git commit -m "Adding all changes from this spike into one big commit."

当你想要把它放到一个分支里 (可能是feature, 或者 develop), 你关心是保持整个文件的完整,你想要一个大的提交分隔成比较小。

假设你有:

  • 分支 solution, 拥有原型方案, 领先 develop 分支。
  • 分支 develop, 在这里你应用原型方案的一些内容。

我去可以通过把内容拿到你的分支里,来解决这个问题:

1
(develop)$ git checkout solution -- file1.txt

这会把这个文件内容从分支 solution 拿到分支 develop 里来:

1
2
3
4
5
6
## On branch develop
## Your branch is up-to-date with 'origin/develop'.
## Changes to be committed:
## (use "git reset HEAD <file>..." to unstage)
#
## modified: file1.txt

然后, 正常提交。

Note: Spike solutions are made to analyze or solve the problem. These solutions are used for estimation and discarded once everyone gets clear visualization of the problem. ~ Wikipedia.

我把几个提交(commit)提交到了同一个分支,而这些提交应该分布在不同的分支里

假设你有一个master分支, 执行git log, 你看到你做过两次提交:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
(master)$ git log

commit e3851e817c451cc36f2e6f3049db528415e3c114
Author: Alex Lee <alexlee@example.com>
Date: Tue Jul 22 15:39:27 2014 -0400

Bug #21 - Added CSRF protection

commit 5ea51731d150f7ddc4a365437931cd8be3bf3131
Author: Alex Lee <alexlee@example.com>
Date: Tue Jul 22 15:39:12 2014 -0400

Bug #14 - Fixed spacing on title

commit a13b85e984171c6e2a1729bb061994525f626d14
Author: Aki Rose <akirose@example.com>
Date: Tue Jul 21 01:12:48 2014 -0400

First commit

让我们用提交 hash(commit hash)标记 bug (e3851e8 for #21, 5ea5173 for #14).

首先, 我们把master分支重置到正确的提交(a13b85e):

1
2
(master)$ git reset --hard a13b85e
HEAD is now at a13b85e

现在, 我们对 bug #21 创建一个新的分支:

1
2
(master)$ git checkout -b 21
(21)$

接着, 我们用 cherry-pick 把对 bug #21 的提交放入当前分支。 这意味着我们将应用(apply)这个提交(commit),仅仅这一个提交(commit),直接在 HEAD 上面。

1
(21)$ git cherry-pick e3851e8

这时候, 这里可能会产生冲突, 参见交互式 rebasing 章 冲突节 解决冲突.

再者, 我们为 bug #14 创建一个新的分支, 也基于master分支

1
2
3
(21)$ git checkout master
(master)$ git checkout -b 14
(14)$

最后, 为 bug #14 执行 cherry-pick:

1
(14)$ git cherry-pick 5ea5173

我想删除上游(upstream)分支被删除了的本地分支

一旦你在 github 上面合并(merge)了一个 pull request, 你就可以删除你 fork 里被合并的分支。 如果你不准备继续在这个分支里工作, 删除这个分支的本地拷贝会更干净,使你不会陷入工作分支和一堆陈旧分支的混乱之中。

1
git fetch -p

我不小心删除了我的分支

如果你定期推送到远程, 多数情况下应该是安全的,但有些时候还是可能删除了还没有推到远程的分支。 让我们先创建一个分支和一个新的文件:

1
2
3
4
5
(master)$ git checkout -b my-branch
(my-branch)$ git branch
(my-branch)$ touch foo.txt
(my-branch)$ ls
README.md foo.txt

添加文件并做一次提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
(my-branch)$ git add .
(my-branch)$ git commit -m 'foo.txt added'
(my-branch)$ foo.txt added
1 files changed, 1 insertions(+)
create mode 100644 foo.txt
(my-branch)$ git log

commit 4e3cd85a670ced7cc17a2b5d8d3d809ac88d5012
Author: siemiatj <siemiatj@example.com>
Date: Wed Jul 30 00:34:10 2014 +0200

foo.txt added

commit 69204cdf0acbab201619d95ad8295928e7f411d5
Author: Kate Hudson <katehudson@example.com>
Date: Tue Jul 29 13:14:46 2014 -0400

Fixes #6: Force pushing after amending commits

现在我们切回到主(master)分支,‘不小心的’删除my-branch分支

1
2
3
4
5
6
7
(my-branch)$ git checkout master
Switched to branch 'master'
Your branch is up-to-date with 'origin/master'.
(master)$ git branch -D my-branch
Deleted branch my-branch (was 4e3cd85).
(master)$ echo oh noes, deleted my branch!
oh noes, deleted my branch!

在这时候你应该想起了reflog, 一个升级版的日志,它存储了仓库(repo)里面所有动作的历史。

1
2
3
4
(master)$ git reflog
69204cd HEAD@{0}: checkout: moving from my-branch to master
4e3cd85 HEAD@{1}: commit: foo.txt added
69204cd HEAD@{2}: checkout: moving from master to my-branch

正如你所见,我们有一个来自删除分支的提交 hash(commit hash),接下来看看是否能恢复删除了的分支。

1
2
3
4
5
6
(master)$ git checkout -b my-branch-help
Switched to a new branch 'my-branch-help'
(my-branch-help)$ git reset --hard 4e3cd85
HEAD is now at 4e3cd85 foo.txt added
(my-branch-help)$ ls
README.md foo.txt

看! 我们把删除的文件找回来了。 Git 的 reflog 在 rebasing 出错的时候也是同样有用的。

我想删除一个分支

删除一个远程分支:

1
(master)$ git push origin --delete my-branch

你也可以:

1
(master)$ git push origin :my-branch

删除一个本地分支:

1
(master)$ git branch -D my-branch

我想从别人正在工作的远程分支签出(checkout)一个分支

首先, 从远程拉取(fetch) 所有分支:

1
(master)$ git fetch --all

假设你想要从远程的daves分支签出到本地的daves

1
2
3
(master)$ git checkout --track origin/daves
Branch daves set up to track remote branch daves from origin.
Switched to a new branch 'daves'

(--trackgit checkout -b [branch] [remotename]/[branch] 的简写)

这样就得到了一个daves分支的本地拷贝, 任何推过(pushed)的更新,远程都能看到.

标签

添加标签

1
$ git tag <tag-name>

添加标签并附加消息

1
$ git tag -a <tag-name>

删除标签

1
2
git tag -d <tag_name>
git push <remote> :refs/tags/<tag_name>

恢复已删除标签

如果你想恢复一个已删除标签(tag), 可以按照下面的步骤: 首先, 需要找到无法访问的标签(unreachable tag):

1
git fsck --unreachable | grep tag

记下这个标签(tag)的 hash,然后用 Git 的 update-ref:

1
git update-ref refs/tags/<tag_name> <hash>

这时你的标签(tag)应该已经恢复了。

Rebase 和 Merge

merge 与 rebase 虽然是 git 常用功能,但是强烈建议不要使用 git 命令来完成这项工作。

因为如果出现代码冲突,在没有代码比对工具的情况下,实在太艰难了。

你可以考虑使用各种 Git GUI 工具。

将分支合并到当前 HEAD 中

1
git merge <branch>

将当前 HEAD 版本重置到分支中

1
git rebase <branch>

撤销 rebase/merge

你可以合并(merge)或 rebase 了一个错误的分支, 或者完成不了一个进行中的 rebase/merge。 Git 在进行危险操作的时候会把原始的 HEAD 保存在一个叫 ORIG_HEAD 的变量里, 所以要把分支恢复到 rebase/merge 前的状态是很容易的。

1
(my-branch)$ git reset --hard ORIG_HEAD

我已经 rebase 过, 但是我不想强推(force push)

不幸的是,如果你想把这些变化(changes)反应到远程分支上,你就必须得强推(force push)。 是因你快进(Fast forward)了提交,改变了 Git 历史, 远程分支不会接受变化(changes),除非强推(force push)。这就是许多人使用 merge 工作流, 而不是 rebasing 工作流的主要原因之一, 开发者的强推(force push)会使大的团队陷入麻烦。使用时需要注意,一种安全使用 rebase 的方法是,不要把你的变化(changes)反映到远程分支上, 而是按下面的做:

1
2
3
4
(master)$ git checkout my-branch
(my-branch)$ git rebase -i master
(my-branch)$ git checkout master
(master)$ git merge --ff-only my-branch

更多, 参见 this SO thread.

我需要组合(combine)几个提交(commit)

假设你的工作分支将会做对于 master 的 pull-request。 一般情况下你不关心提交(commit)的时间戳,只想组合 所有 提交(commit) 到一个单独的里面, 然后重置(reset)重提交(recommit)。 确保主(master)分支是最新的和你的变化都已经提交了, 然后:

1
2
(my-branch)$ git reset --soft master
(my-branch)$ git commit -am "New awesome feature"

如果你想要更多的控制, 想要保留时间戳, 你需要做交互式 rebase (interactive rebase):

1
(my-branch)$ git rebase -i master

如果没有相对的其它分支, 你将不得不相对自己的HEAD 进行 rebase。 例如:你想组合最近的两次提交(commit), 你将相对于HEAD\~2 进行 rebase, 组合最近 3 次提交(commit), 相对于HEAD\~3, 等等。

1
(master)$ git rebase -i HEAD~2

在你执行了交互式 rebase 的命令(interactive rebase command)后, 你将在你的编辑器里看到类似下面的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
pick a9c8a1d Some refactoring
pick 01b2fd8 New awesome feature
pick b729ad5 fixup
pick e3851e8 another fix

## Rebase 8074d12..b729ad5 onto 8074d12
#
## Commands:
## p, pick = use commit
## r, reword = use commit, but edit the commit message
## e, edit = use commit, but stop for amending
## s, squash = use commit, but meld into previous commit
## f, fixup = like "squash", but discard this commit's log message
## x, exec = run command (the rest of the line) using shell
#
## These lines can be re-ordered; they are executed from top to bottom.
#
## If you remove a line here THAT COMMIT WILL BE LOST.
#
## However, if you remove everything, the rebase will be aborted.
#
## Note that empty commits are commented out

所有以 # 开头的行都是注释, 不会影响 rebase.

然后,你可以用任何上面命令列表的命令替换 pick, 你也可以通过删除对应的行来删除一个提交(commit)。

例如, 如果你想 单独保留最旧(first)的提交(commit),组合所有剩下的到第二个里面, 你就应该编辑第二个提交(commit)后面的每个提交(commit) 前的单词为 f:

1
2
3
4
pick a9c8a1d Some refactoring
pick 01b2fd8 New awesome feature
f b729ad5 fixup
f e3851e8 another fix

如果你想组合这些提交(commit) 并重命名这个提交(commit), 你应该在第二个提交(commit)旁边添加一个r,或者更简单的用s 替代 f:

1
2
3
4
pick a9c8a1d Some refactoring
pick 01b2fd8 New awesome feature
s b729ad5 fixup
s e3851e8 another fix

你可以在接下来弹出的文本提示框里重命名提交(commit)。

1
2
3
4
5
6
7
8
9
10
11
Newer, awesomer features

## Please enter the commit message for your changes. Lines starting
## with '#' will be ignored, and an empty message aborts the commit.
## rebase in progress; onto 8074d12
## You are currently editing a commit while rebasing branch 'master' on '8074d12'.
#
## Changes to be committed:
# modified: README.md
#

如果成功了, 你应该看到类似下面的内容:

1
(master)$ Successfully rebased and updated refs/heads/master.

安全合并(merging)策略

--no-commit 执行合并(merge)但不自动提交, 给用户在做提交前检查和修改的机会。 no-ff 会为特性分支(feature branch)的存在过留下证据, 保持项目历史一致。

1
(master)$ git merge --no-ff --no-commit my-branch

我需要将一个分支合并成一个提交(commit)

1
(master)$ git merge --squash my-branch

我只想组合(combine)未推的提交(unpushed commit)

有时候,在将数据推向上游之前,你有几个正在进行的工作提交(commit)。这时候不希望把已经推(push)过的组合进来,因为其他人可能已经有提交(commit)引用它们了。

1
(master)$ git rebase -i @{u}

这会产生一次交互式的 rebase(interactive rebase), 只会列出没有推(push)的提交(commit), 在这个列表时进行 reorder/fix/squash 都是安全的。

检查是否分支上的所有提交(commit)都合并(merge)过了

检查一个分支上的所有提交(commit)是否都已经合并(merge)到了其它分支, 你应该在这些分支的 head(或任何 commits)之间做一次 diff:

1
(master)$ git log --graph --left-right --cherry-pick --oneline HEAD...feature/120-on-scroll

这会告诉你在一个分支里有而另一个分支没有的所有提交(commit), 和分支之间不共享的提交(commit)的列表。 另一个做法可以是:

1
(master)$ git log master ^feature/120-on-scroll --no-merges

交互式 rebase(interactive rebase)可能出现的问题

这个 rebase 编辑屏幕出现’noop’

如果你看到的是这样:

1
noop

这意味着你 rebase 的分支和当前分支在同一个提交(commit)上, 或者 领先(ahead) 当前分支。 你可以尝试:

  • 检查确保主(master)分支没有问题
  • rebase HEAD\~2 或者更早

有冲突的情况

如果你不能成功的完成 rebase, 你可能必须要解决冲突。

首先执行 git status 找出哪些文件有冲突:

1
2
3
4
5
6
7
(my-branch)$ git status
On branch my-branch
Changes not staged for commit:
(use "git add <file>..." to update what will be committed)
(use "git checkout -- <file>..." to discard changes in working directory)

modified: README.md

在这个例子里面, README.md 有冲突。 打开这个文件找到类似下面的内容:

1
2
3
4
5
<<<<<<< HEAD
some code
=========
some code
>>>>>>> new-commit

你需要解决新提交的代码(示例里, 从中间==线到new-commit的地方)与HEAD 之间不一样的地方.

有时候这些合并非常复杂,你应该使用可视化的差异编辑器(visual diff editor):

1
(master*)$ git mergetool -t opendiff

在你解决完所有冲突和测试过后, git add 变化了的(changed)文件, 然后用git rebase --continue 继续 rebase。

1
2
(my-branch)$ git add README.md
(my-branch)$ git rebase --continue

如果在解决完所有的冲突过后,得到了与提交前一样的结果, 可以执行git rebase --skip

任何时候你想结束整个 rebase 过程,回来 rebase 前的分支状态, 你可以做:

1
(my-branch)$ git rebase --abort

查看信息

显示工作路径下已修改的文件:git status

显示与上次提交版本文件的不同:git diff

显示提交历史:

1
2
3
4
5
6
7
8
# 从最新提交开始,显示所有的提交记录(显示hash, 作者信息,提交的标题和时间)
$ git log

# 显示某个用户的所有提交
$ git log --author="username"

# 显示某个文件的所有修改
$ git log -p <file>

显示搜索内容:

1
2
3
4
5
# 从当前目录的所有文件中查找文本内容
$ git grep "Hello"

# 在某一版本中搜索文本
$ git grep "Hello" v2.5

其他

克隆所有子模块

1
git clone --recursive git://github.com/foo/bar.git

如果已经克隆了:

1
git submodule update --init --recursive

已删除补丁(patch)

如果某人在 GitHub 上给你发了一个 pull request, 但是然后他删除了他自己的原始 fork, 你将没法克隆他们的提交(commit)或使用 git am。在这种情况下, 最好手动的查看他们的提交(commit),并把它们拷贝到一个本地新分支,然后做提交。

做完提交后, 再修改作者,参见变更作者。 然后, 应用变化, 再发起一个新的 pull request。

跟踪文件(Tracking Files)

我只想改变一个文件名字的大小写,而不修改内容

1
(master)$ git mv --force myfile MyFile

我想从 Git 删除一个文件,但保留该文件

1
(master)$ git rm --cached log.txt

Fork 项目

GitHub 中 Fork 是 服务端的代码仓库克隆(即 新克隆出来的代码仓库在远程服务端),包含了原来的仓库(即 upstream repository,上游仓库)所有内容,如分支、Tag、提交。代码托管服务(如 Github、BitBucket)提供了方便的完成 Fork 操作的功能(在仓库页面点一下 Fork 按钮)。这样有了一个你自己的可以自由提交的远程仓库,然后可以通过的 Pull Request 把你的提交贡献回 原仓库。而对于原仓库 Owner 来说,鼓励别人 Fork 自己的仓库,通过 Pull Request 给自己的仓库做贡献,也能提高了自己仓库的知名度。

参考:Fork a repo

(1)执行 git remote -v,您将看到当前为 fork 配置的远程存储库。

(2)添加上游项目的仓库地址

1
git remote add upstream <github仓库地址>

(3)确认是否添加成功,再次键入 git remote -v

(4)获取上游项目更新,可以执行 git fetch upstream

(5)同步上游项目的代码到新仓库

1
2
3
4
# merge
git merge upstream/master
# rebase
git rebase upstream/master origin/master

我不知道我做错了些什么

你把事情搞砸了:你 重置(reset) 了一些东西, 或者你合并了错误的分支, 亦或你强推了后找不到你自己的提交(commit)了。有些时候, 你一直都做得很好, 但你想回到以前的某个状态。

这就是 git reflog 的目的, reflog 记录对分支顶端(the tip of a branch)的任何改变, 即使那个顶端没有被任何分支或标签引用。基本上, 每次 HEAD 的改变, 一条新的记录就会增加到reflog。遗憾的是,这只对本地分支起作用,且它只跟踪动作 (例如,不会跟踪一个没有被记录的文件的任何改变)。

1
2
3
4
(master)$ git reflog
0a2e358 HEAD@{0}: reset: moving to HEAD\~2
0254ea7 HEAD@{1}: checkout: moving from 2.2 to master
c10f740 HEAD@{2}: checkout: moving from master to 2.2

上面的 reflog 展示了从 master 分支签出(checkout)到 2.2 分支,然后再签回。 那里,还有一个硬重置(hard reset)到一个较旧的提交。最新的动作出现在最上面以 HEAD@{0}标识.

如果事实证明你不小心回移(move back)了提交(commit), reflog 会包含你不小心回移前 master 上指向的提交(0254ea7)。

1
git reset --hard 0254ea7

然后使用 git reset 就可以把 master 改回到之前的 commit,这提供了一个在历史被意外更改情况下的安全网。

📚 资料

一篇文章让你彻底掌握 Scala

Scala 是大数据领域的热门语言,如:Akka、Kafka,所以,想要学习大数据顶级开源项目的源码,有必要具备一定的 Scala 基础。

基本语法

Scala 基本语法需要注意以下几点:

  • 区分大小写 - Scala 是大小写敏感的。
  • 类名 - 对于所有的类名的第一个字母要大写。示例:class MyFirstScalaClass
  • 方法名称 - 所有的方法名称的第一个字母用小写。示例:def myMethodName()
  • 程序文件名 - 程序文件的名称应该与对象名称完全匹配(新版本不需要了,但建议保留这种习惯)。示例: 假设”HelloWorld”是对象的名称。那么该文件应保存为’HelloWorld.scala”
  • def main(args: Array[String]) - Scala 程序从 main() 方法开始处理,这是每一个 Scala 程序的强制程序入口部分。
  • 一行中只有空格或者带有注释,Scala 会认为其是空行,会忽略它。标记可以被空格或者注释来分割。
  • Scala 是面向行的语言,语句可以用分号(;)结束或换行符。

注释

Scala 类似 Java 支持单行和多行注释。

【示例】单行和多行注释

1
2
3
4
5
6
7
8
9
object HelloWorld {
/*
* 这是我的第一个 Scala 程序
* 以下程序将输出'Hello World!'
*/
def main(args: Array[String]) {
println("Hello, world!") // 输出 Hello World
}
}

变量

在 Scala 中,使用关键词 var 声明变量,使用关键词 val 声明常量。

【示例】声明变量

1
2
var myVar : String = "Foo"
var myVar : String = "Too"

【示例】声明常量

1
val myVal : String = "Foo"

变量类型声明

变量的类型在变量名之后等号之前声明。定义变量的类型的语法格式如下:

1
2
3
4
// 声明变量类型
var VariableName : DataType [= Initial Value]
// 声明常量类型
val VariableName : DataType [= Initial Value]

在 Scala 中声明变量和常量不一定要指明数据类型,在没有指明数据类型的情况下,其数据类型是通过变量或常量的初始值推断出来的。所以,如果在没有指明数据类型的情况下声明变量或常量必须要给出其初始值,否则将会报错。

1
2
3
var myVar = 10;
val myVal = "Hello, Scala!";
val xmax, ymax = 100

数据类型

Scala 与 Java 有着相同的数据类型:

数据类型 描述
Byte 8 位有符号补码整数。数值区间为 -128 到 127
Short 16 位有符号补码整数。数值区间为 -32768 到 32767
Int 32 位有符号补码整数。数值区间为 -2147483648 到 2147483647
Long 64 位有符号补码整数。数值区间为 -9223372036854775808 到 9223372036854775807
Float 32 位, IEEE 754 标准的单精度浮点数
Double 64 位 IEEE 754 标准的双精度浮点数
Char 16 位无符号 Unicode 字符, 区间值为 U+0000 到 U+FFFF
String 字符序列
Boolean true 或 false
Unit 表示无值,和其他语言中 void 等同。用作不返回任何结果的方法的结果类型。Unit 只有一个实例值,写成()。
Null null 或空引用
Nothing Nothing 类型在 Scala 的类层级的最底端;它是任何其他类型的子类型。
Any Any 是所有其他类的超类
AnyRef AnyRef 类是 Scala 里所有引用类(reference class)的基类

上表中列出的数据类型都是对象,也就是说 scala 没有 java 中的原生类型。在 scala 是可以对数字等基础类型调用方法的。

数组

Scala 数组声明的语法格式:

1
2
3
4
// 方式一
var z:Array[String] = new Array[String](3)
// 方式二
var z = new Array[String](3)

逻辑控制语句

条件语句

【示例】条件语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
object IfDemo {
def main(args: Array[String]) {
var x = 30;

if (x == 10) {
println("X 的值为 10");
} else if (x == 20) {
println("X 的值为 20");
} else if (x == 30) {
println("X 的值为 30");
} else {
println("无法判断 X 的值");
}
}
}

循环语句

和 Java 一样,Scala 支持 whiledo ... whilefor 三种循环语句。

1
2
3
4
5
6
7
8
9
10
11
12
object WhileDemo {
def main(args: Array[String]) {
// 局部变量
var a = 10;

// while 循环执行
while (a < 20) {
println("Value of a: " + a);
a = a + 1;
}
}
}

**scala 不支持 breakcontinue**。但是,可以通过 Breaks 对象来进行循环控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import scala.util.control._

object BreakDemo {
def main(args: Array[String]) {
var a = 0;
var b = 0;
val numList1 = List(1, 2, 3, 4, 5);
val numList2 = List(11, 12, 13);

val outer = new Breaks;
val inner = new Breaks;

outer.breakable {
for (a <- numList1) {
println("Value of a: " + a);
inner.breakable {
for (b <- numList2) {
println("Value of b: " + b);
if (b == 12) {
inner.break;
}
}
} // 内嵌循环中断
}
} // 外部循环中断
}
}

模式匹配

scala 的 match 对应 Java 里的 switch,但是写在选择器表达式之后。即: 选择器 match {备选项}。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* @author peng.zhang
*/
object MatchDemo {
def main(args: Array[String]) {
println(matchTest("two"))
println(matchTest("test"))
println(matchTest(1))
println(matchTest(6))

}

def matchTest(x: Any): Any = x match {
case 1 => "one"
case "two" => 2
case y: Int => "scala.Int"
case _ => "many"
}
}

运算符

Scala 含有丰富的内置运算符,包括以下几种类型:

  • 算术运算符:+-*/%
  • 关系运算符:==!=><>=<=
  • 逻辑运算符:&&||!
  • 位运算符:~&|^<<>>>>> (无符号右移)
  • 赋值运算符:=

方法与函数

Scala 有方法与函数,二者在语义上的区别很小。

Scala 中的方法跟 Java 的类似,方法是组成类的一部分。

Scala 中的函数则是一个完整的对象,Scala 中的函数其实就是继承了 Trait 的类的对象。

Scala 中使用 val 语句可以定义函数,def 语句定义方法。

【示例】

1
2
3
4
class Test {
def m(x: Int) = x + 3
val f = (x: Int) => x + 3
}

闭包

闭包是一个函数,返回值依赖于声明在函数外部的一个或多个变量。

闭包通常来讲可以简单的认为是可以访问一个函数里面局部变量的另外一个函数。

如下面这段匿名的函数:

1
val multiplier = (i:Int) => i * 10

函数体内有一个变量 i,它作为函数的一个参数。如下面的另一段代码:

1
val multiplier = (i:Int) => i * factor

在 multiplier 中有两个变量:i 和 factor。其中的一个 i 是函数的形式参数,在 multiplier 函数被调用时,i 被赋予一个新的值。然而,factor 不是形式参数,而是自由变量,考虑下面代码:

1
2
var factor = 3
val multiplier = (i:Int) => i * factor

这里我们引入一个自由变量 factor,这个变量定义在函数外面。

这样定义的函数变量 multiplier 成为一个”闭包”,因为它引用到函数外面定义的变量,定义这个函数的过程是将这个自由变量捕获而构成一个封闭的函数。

1
2
3
4
5
6
7
8
9
object ClosureDemo {
def main(args: Array[String]) {
println("muliplier(1) value = " + multiplier(1))
println("muliplier(2) value = " + multiplier(2))
}

var factor = 3
val multiplier = (i: Int) => i * factor
}

集合

Scala 集合支持 List、Set、Map、元祖、Option。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 定义整型 List
val x = List(1,2,3,4)

// 定义 Set
val x = Set(1,3,5,7)

// 定义 Map
val x = Map("one" -> 1, "two" -> 2, "three" -> 3)

// 创建两个不同类型元素的元组
val x = (10, "Runoob")

// 定义 Option
val x:Option[Int] = Some(5)

迭代器

迭代器 it 的两个基本操作是 nexthasNext

调用 it.next() 会返回迭代器的下一个元素,并且更新迭代器的状态。

调用 it.hasNext() 用于检测集合中是否还有元素。

1
2
3
4
5
6
7
8
9
object Test {
def main(args: Array[String]) {
val it = Iterator("Baidu", "Google", "Runoob", "Taobao")

while (it.hasNext){
println(it.next())
}
}
}

类和对象

类是对象的抽象,而对象是类的具体实例。类是抽象的,不占用内存,而对象是具体的,占用存储空间。类是用于创建对象的蓝图,它是一个定义包括在特定类型的对象中的方法和变量的软件模板。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Point(val xc: Int, val yc: Int) {
var x: Int = xc
var y: Int = yc

def move(dx: Int, dy: Int) {
x = x + dx
y = y + dy
println("x 的坐标点 : " + x);
println("y 的坐标点 : " + y);
}
}

class Location(override val xc: Int, override val yc: Int, val zc: Int)
extends Point(xc, yc) {
var z: Int = zc

def move(dx: Int, dy: Int, dz: Int) {
x = x + dx
y = y + dy
z = z + dz
println("x 的坐标点 : " + x);
println("y 的坐标点 : " + y);
println("z 的坐标点 : " + z);
}
}

object Test {
def main(args: Array[String]) {
val loc = new Location(10, 20, 15);

// 移到一个新的位置
loc.move(10, 10, 5);
}
}

Trait

Scala Trait(特征) 相当于 Java 的接口,实际上它比接口还功能强大。

与接口不同的是,它还可以定义属性和方法的实现。

一般情况下 Scala 的类只能够继承单一父类,但是如果是 Trait(特征) 的话就可以继承多个,从结果来看就是实现了多重继承。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
trait Equal {
def isEqual(x: Any): Boolean

def isNotEqual(x: Any): Boolean = !isEqual(x)
}

class Point(xc: Int, yc: Int) extends Equal {
var x: Int = xc
var y: Int = yc

def isEqual(obj: Any) =
obj.isInstanceOf[Point] &&
obj.asInstanceOf[Point].x == x
}

object Test {
def main(args: Array[String]) {
val p1 = new Point(2, 3)
val p2 = new Point(2, 4)
val p3 = new Point(3, 3)

println(p1.isNotEqual(p2))
println(p1.isNotEqual(p3))
println(p1.isNotEqual(2))
}
}

异常

Scala 抛出异常的方法和 Java 一样,使用 throw 关键词。

【示例】抛出异常

1
throw new IllegalArgumentException

【示例】捕获异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import java.io.{FileNotFoundException, FileReader, IOException}

object Test {
def main(args: Array[String]) {
try {
val f = new FileReader("input.txt")
} catch {
case ex: FileNotFoundException => {
println("Missing file exception")
}
case ex: IOException => {
println("IO Exception")
}
} finally {
println("Exiting finally...")
}
}
}

输入输出

读取用户输入

使用 scala.io.StdIn.readLine() 方法读取用户输入

1
2
3
4
5
6
7
8
object StdInDemo {
def main(args: Array[String]) {
print("请输入内容: ")
val line = StdIn.readLine()

println("你输入的是: " + line)
}
}

读取文件内容

1
2
3
4
5
6
7
8
9
object SourceDemo {
def main(args: Array[String]) {
println("文件内容为:")

Source.fromFile("test.txt").foreach {
print
}
}
}

定义包

Scala 使用 package 关键字定义包,在 Scala 将代码定义到某个包中有两种方式:

第一种方法和 Java 一样,在文件的头定义包名,这种方法就后续所有代码都放在该包中。 比如:

1
2
package com.runoob
class HelloWorld

第二种方法有些类似 C#,如:

1
2
3
package com.runoob {
class HelloWorld
}

引用

Scala 使用 import 关键字引用包。

1
2
3
4
5
6
7
import java.awt.Color  // 引入Color

import java.awt._ // 引入包内所有成员

def handler(evt: event.ActionEvent) { // java.awt.event.ActionEvent
... // 因为引入了java.awt,所以可以省去前面的部分
}

import 语句可以出现在任何地方,而不是只能在文件顶部。import 的效果从开始延伸到语句块的结束。这可以大幅减少名称冲突的可能性。

如果想要引入包中的几个成员,可以使用 selector(选取器):

1
2
3
4
5
6
7
import java.awt.{Color, Font}

// 重命名成员
import java.util.{HashMap => JavaHashMap}

// 隐藏成员
import java.util.{HashMap => _, _} // 引入了util包的所有成员,但是HashMap被隐藏了

注意:默认情况下,Scala 总会引入 java.lang._ 、 scala._ 和 Predef._,这里也能解释,为什么以 scala 开头的包,在使用时都是省去 scala.的。

访问修饰符

Scala 访问修饰符基本和 Java 的一样,分别有:private,protected,public。

如果没有指定访问修饰符,默认情况下,Scala 对象的访问级别都是 public。

Scala 中的 private 限定符,比 Java 更严格,在嵌套类情况下,外层类甚至不能访问被嵌套类的私有成员。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Outer {

class Inner {
private def f() {
println("f")
}

class InnerMost {
f() // 正确
}

}

(new Inner).f() //错误
}

参考资料

Kafka 可靠传输

消息不丢失

如何保证消息的可靠性传输,或者说,如何保证消息不丢失?这对于任何 MQ 都是核心问题。

一条消息从生产到消费,可以划分三个阶段:

  • 生产阶段:Producer 创建消息,并通过网络发送给 Broker。
  • 存储阶段:Broker 收到消息并存储,如果是集群,还要同步副本给其他 Broker。
  • 消费阶段:Consumer 向 Broker 请求消息,Broker 通过网络传输给 Consumer。

这三个阶段都可能丢失数据,所以要保证消息丢失,就需要任意一环都保证可靠。

存储阶段

存储阶段指的是 Kafka Server,也就是 Broker 如何保证消息不丢失。

一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证

上面的话可以解读为:

  • 已提交只有当消息被写入分区的若干同步副本时,才被认为是已提交的。为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要 Leader 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。
  • 持久化:Kafka 的数据存储在磁盘上,所以只要写入成功,天然就是持久化的。
  • 只要还有一个副本是存活的,那么已提交的消息就不会丢失
  • 消费者只能读取已提交的消息

副本机制

Kafka 的副本机制是 kafka 可靠性保证的核心

Kafka 的主题被分为多个分区,分区是基本的数据块。每个分区可以有多个副本,有一个是 Leader(主副本),其他是 Follower(从副本)。所有数据都直接发送给 Leader,或者直接从 Leader 读取事件。Follower 只需要与 Leader 保持同步,并及时复制最新的数据。当 Leader 宕机时,从 Follower 中选举一个成为新的 Leader。

Broker 有 3 个配置参数会影响 Kafka 消息存储的可靠性。

副本数

replication.factor 的作用是设置每个分区的副本数replication.factor 是主题级别配置; default.replication.factor 是 broker 级别配置。

副本数越多,数据可靠性越高;但由于副本数增多,也会增加同步副本的开销,可能会降低集群的可用性。一般,建议设为 3,这也是 Kafka 的默认值。

不完全的选主

unclean.leader.election.enable 用于控制是否支持不同步的副本参与选举 Leader。unclean.leader.election.enable 是 broker 级别(实际上是集群范围内)配置,默认值为 true。

  • 如果设为 true,代表着允许不同步的副本成为主副本(即不完全的选举),那么将面临丢失消息的风险
  • 如果设为 false,就要等待原先的主副本重新上线,从而降低了可用性。

最少同步副本

min.insync.replicas 控制的是消息至少要被写入到多少个副本才算是“已提交”min.insync.replicas 是主题级别和 broker 级别配置。

尽管可以为一个主题配置 3 个副本,但还是可能会出现只有一个同步副本的情况。如果这个同步副本变为不可用,则必须在可用性和数据一致性之间做出选择。Kafka 中,消息只有被写入到所有的同步副本之后才被认为是已提交的。但如果只有一个同步副本,那么在这个副本不可用时,则数据就会丢失。

如果要确保已经提交的数据被已写入不止一个副本,就需要把最小同步副本的设置为大一点的值。

注意:要确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1

生产阶段

在生产消息阶段,消息队列一般通过请求确认机制,来保证消息的可靠传递,Kafka 也不例外。

Kafka 生产 中提到了,Kafka 有三种发送方式:同步、异步、异步回调。

同步方式能保证消息不丢失,但性能太差;异步方式发送消息,通常会立即返回,但消息可能丢失。

解决生产者丢失消息的方案:

生产者使用异步回调方式 producer.send(msg, callback) 发送消息。callback(回调)能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。

  • 如果是因为那些瞬时错误,那么仅仅让 Producer 重试就可以了;
  • 如果是消息不合格造成的,那么可以调整消息格式后再次发送。

然后,需要基于以下几点来保证 Kafka 生产者的可靠性:

ACK

生产者可选的确认模式有三种:acks=0acks=1acks=all

  • acks=0acks=1 都有丢失数据的风险。

  • acks=all 意味着会等待所有同步副本都收到消息。再结合 min.insync.replicas ,就可以决定在得到确认响应前,至少有多少副本能够收到消息。

这是最保险的做法,但也会降低吞吐量。

重试

如果 broker 返回的错误可以通过重试来解决,生产者会自动处理这些错误。

  • 可重试错误,如:LEADER_NOT_AVAILABLE,主副本不可用,可能过一段时间,集群就会选举出新的主副本,重试可以解决问题。
  • 不可重试错误,如:INVALID_CONFIG,即使重试,也无法改变配置选项,重试没有意义。

需要注意的是:有时可能因为网络问题导致没有收到确认,但实际上消息已经写入成功。生产者会认为出现临时故障,重试发送消息,这样就会出现重复记录。所以,尽可能在业务上保证幂等性。

设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

错误处理

开发者需要自行处理的错误:

  • 不可重试的 broker 错误,如消息大小错误、认证错误等;
  • 消息发送前发生的错误,如序列化错误;
  • 生产者达到重试次数上限或消息占用的内存达到上限时发生的错误。

消费阶段

前文已经提到,消费者只能读取已提交的消息。这就保证了消费者接收到消息时已经具备了数据一致性。

消费者唯一要做的是确保哪些消息是已经读取过的,哪些是没有读取过的(通过提交偏移量给 Broker 来确认)。如果消费者提交了偏移量却未能处理完消息,那么就有可能造成消息丢失,这也是消费者丢失消息的主要原因。

img

消费者的可靠性配置

  • group.id - 如果希望消费者可以看到主题的所有消息,那么需要为它们设置唯一的 group.id
  • auto.offset.reset - 有两个选项:
    • earliest - 消费者会从分区的开始位置读取数据
    • latest - 消费者会从分区末尾位置读取数据
  • enable.auto.commit - 消费者自动提交偏移量。如果设为 true,处理流程更简单,但无法保证重复处理消息。
  • auto.commit.interval.ms - 自动提交的频率,默认为每 5 秒提交一次。

显示提交偏移量

如果 enable.auto.commit 设为 true,即自动提交,就无需考虑提交偏移量的问题。

如果选择显示提交偏移量,需要考虑以下问题:

  • 必须在处理完消息后再发送确认(提交偏移量),不要收到消息立即确认。
  • 提交频率是性能和重复消息数之间的权衡
  • 分区再均衡
  • 消费可能需要重试机制
  • 超时处理
  • 消费者可能需要维护消费状态,如:处理完消息后,记录在数据库中。
  • 幂等性设计
    • 写数据库:根据主键判断记录是否存在
    • 写 Redis:set 操作天然具有幂等性
    • 复杂的逻辑处理,则可以在消息中加入全局 ID

重复消息

在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

  • At most once:至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
  • At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
  • Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。

绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是这样。也就是说,消息队列很难保证消息不重复。

一般解决重复消息的办法是,在消费端,保证消费消息的操作具备幂等性

常用的实现幂等操作的方法:

利用数据库的唯一约束实现幂等

关系型数据库可以使用 INSERT IF NOT EXIST 语句防止重复;Redis 可以使用 SETNX 命令来防止重复;其他数据库只要支持类似语义,也是一个道理。

为更新的数据设置前置条件

如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。

但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?更加通用的方法是,给数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。

记录并检查操作

还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操作,也称为“Token 机制或者 GUID(全局唯一 ID)机制”,实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。

具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。

需要注意的是,“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现 Bug。这一组操作可以通过分布式事务或分布式锁来保证其原子性。

消息的有序性

某些场景下,可能会要求按序发送消息。

方案一、单 Partition

Kafka 每一个 Partition 只能隶属于消费者群组中的一个 Consumer,换句话说,每个 Partition 只能被一个 Consumer 消费。所以,如果 Topic 是单 Partition,自然是有序的。

方案分析

优点:简单粗暴。开发者什么也不用做。

缺点:Kafka 基于 Partition 实现其高并发能力,如果使用单 Partition,会严重限制 Kafka 的吞吐量。

结论:作为分布式消息引擎,限制并发能力,显然等同于自废武功,所以,这个方案几乎是不可接受的。

方案二、同一个 key 的消息发送给指定 Partition

(1)生产者端显示指定 key 发往一个指定的 Partition,就可以保证同一个 key 在这个 Partition 中是有序的。

(2)接下来,消费者端为每个 key 设定一个缓存队列,然后让一个独立线程负责消费指定 key 的队列,这就保证了消费消息也是有序的。

消息积压

先修复消费者,然后停掉当前所有消费者。

新建 Topic,扩大分区,以提高并发处理能力。

创建临时消费者程序,并部署在多节点上,扩大消费处理能力。

最后处理完积压消息后,恢复原先部署架构。

验证系统可靠性

建议从 3 个层面验证系统的可靠性:

  • 配置验证
  • 应用验证
    • 客户端和服务器断开连接
    • 选举
    • 依次重启 broker
    • 依次重启生产者
    • 依次重启消费者
  • 监控可靠性
    • 对于生产者来说,最重要的两个指标是消息的 error-rateretry-rate。如果这两个指标上升,说明系统出了问题。
    • 对于消费者来说,最重要的指标是 consumer-lag,该指标表明了消费者的处理速度与最近提交到分区里的偏移量之间还有多少差距。

最佳实践

生产者

  1. 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
  2. 设置 acks = allacks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
  3. 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

服务器(Kafka Broker)

  1. 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
  2. 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
  3. 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
  4. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1

消费者

  1. 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

参考资料

Kafka 消费

消费者简介

获取消息模式

消息引擎获取消息有两种模式:

  • push 模式 - MQ 推送数据给消费者
  • pull 模式 - 消费者主动向 MQ 请求数据

Kafka 消费者(Consumer)以 pull 方式从 Broker 拉取消息。相比于 push 方式,pull 方式灵活度和扩展性更好,因为消费的主动性由消费者自身控制。

push 模式的优缺点:

  • 缺点:由 broker 决定消息推送的速率,对于不同消费速率的 consumer 就不太好处理了。push 模式下,当 broker 推送的速率远大于 consumer 消费的速率时,consumer 恐怕就要崩溃了。

push 模式的优缺点:

  • 优点:consumer 可以根据自己的消费能力自主的决定消费策略
  • 缺点:如果 broker 没有可供消费的消息,将导致 consumer 不断在循环中轮询,直到新消息到达。为了避免这点,Kafka 有个参数可以让 consumer 阻塞直到新消息到达

消费者

每个 Consumer 的唯一元数据是该 Consumer 在日志中消费的位置。这个偏移量是由 Consumer 控制的:Consumer 通常会在读取记录时线性的增加其偏移量。但实际上,由于位置由 Consumer 控制,所以 Consumer 可以采用任何顺序来消费记录。

一条消息只有被提交,才会被消费者获取到。如下图,只能消费 Message0、Message1、Message2:

img

消费者群组

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制

Kafka 的写入数据量很庞大,如果只有一个消费者,消费消息速度很慢,时间长了,就会造成数据积压。为了减少数据积压,Kafka 支持消费者群组,可以让多个消费者并发消费消息,对数据进行分流。

Kafka 消费者从属于消费者群组,一个群组里的 Consumer 订阅同一个 Topic,一个主题有多个 Partition,每一个 Partition 只能隶属于消费者群组中的一个 Consumer

如果超过主题的分区数量,那么有一部分消费者就会被闲置,不会接收到任何消息。

同一时刻,一条消息只能被同一消费者组中的一个消费者实例消费

不同消费者群组之间互不影响

消费流程

Kafka 消费者通过 poll 模式来获取消息,但是获取消息时并不是立刻返回结果,需要考虑两个因素:

  • 消费者通过 customer.poll(time) 中设置等待时间
  • Broker 会等待累计一定量数据,然后发送给消费者。这样可以减少网络开销。

poll 除了获取消息外,还有其他作用:

  • 发送心跳信息。消费者通过向被指派为群组协调器的 Broker 发送心跳来维护他和群组的从属关系,当机器宕掉后,群组协调器触发再均衡。

消费者 API

创建消费者

1
2
3
4
5
6
7
8
9
10
11
12
Properties props = new Properties();
// 服务器地址
props.put("bootstrap.servers", "localhost:9092");
// 消费者群组
props.put("group.id", "test");
// 关闭自动提交偏移量
props.put("enable.auto.commit", "false");
// 设置 key 反序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 设置 value 反序列化器
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

订阅主题

1
2
3
4
// 订阅主题列表
consumer.subscribe(Arrays.asList("t1", "t2"));
// 订阅所有与 test 相关的主题
consumer.subscribe("test.*");

subscribe 方法允许传入一个正则表达式,这样就可以匹配多个主题。如果有人创建了新的主题,并且主题名恰好匹配正则表达式,那么会立即触发一次分区再均衡,消费者就可以读取新添加的主题。

轮询获取消息

消息轮询是消费者 API 的核心。一旦消费者订阅了主题,轮询就会处理所有细节,包括:群组协调、分区再均衡、发送心跳和获取数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
try {
// 3. 轮询
while (true) {
// 4. 消费消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
log.debug("topic = {}, partition = {}, offset = {}, key = {}, value = {}",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
}
} finally {
// 5. 退出程序前,关闭消费者
consumer.close();
}

手动提交偏移量

(1)同步提交

使用 commitSync() 提交偏移量最简单也最可靠。这个 API 会提交由 poll() 方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("commit failed", e)
}
}

同步提交的缺点:同步提交方式会一直阻塞,直到接收到 Broker 的响应请求,这会大大限制吞吐量

(2)异步提交

在成功提交或碰到无法恢复的错误之前,commitSync() 会一直重试,但是 commitAsync() 不会,这也是 commitAsync() 不好的一个地方。它之所以不进行重试,是因为在它收到服务器响应的时候,可能有一个更大的偏移量已经提交成功。假设我们发出一个请求用于提交偏移量 2000,这个时候发生了短暂的通信问题,服务器收不到请求,自然也不会作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量 3000。如果 commitAsync() 重新尝试提交偏移量 2000,它有可能在偏移量 3000 之后提交成功。这个时候如果发生再均衡,就会出现重复消息

1
2
3
4
5
6
7
8
9
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = % d, customer = %s, country = %s\n ",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync();
}

commitAsync() 也支持回调,在 Broker 作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标,不过如果要用它来进行重试,则一定要注意提交的顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = % d, customer = %s, country = %s\n ",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null) { log.error("Commit failed for offsets {}", offsets, e); }
}
});
}

重试异步提交

可以使用一个单调递增的序列号来维护异步提交的顺序。在每次提交偏移量之后或在回调里提交偏移量时递增序列号。在进行重试前,先检查回调的序列号和即将提交的偏移量是否相等,如果相等,说明没有新的提交,那么可以安全地进行重试;如果序列号比较大,说明有一个新的提交已经发送出去了,应该停止重试。

(3)同步和异步组合提交

一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功

因此,在消费者关闭前一般会组合使用 commitSync()commitAsync()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = % d, customer = %s, country = %s\n ",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}

(4)提交特定的偏移量

提交偏移量的频率和处理消息批次的频率是一样的。如果想要更频繁地提交该怎么办?如果 poll() 方法返回一大批数据,为了避免因再均衡引起的重复处理整批消息,想要在批次中间提交偏移量该怎么办?这种情况无法通过调用 commitSync()commitAsync() 来实现,因为它们只会提交最后一个偏移量,而此时该批次里的消息还没有处理完。

解决办法是:消费者 API 允许在调用 commitSync()commitAsync() 方法时传进去希望提交的分区和偏移量的 map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private int count = 0;
private final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();


while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = % d, customer = %s, country = %s\n ",
record.topic(), record.partition(), record.offset(), record.key(), record.value());

currentOffsets.put(new TopicPartition(record.topic(),
record.partition()), new
OffsetAndMetadata(record.offset() + 1, "no metadata"));
if (count % 1000 == 0) { consumer.commitAsync(currentOffsets, null); }
count++;
}
}

(5)从特定偏移量处开始处理

使用 poll() 方法可以从各个分区的最新偏移量处开始处理消息。

不过,有时候,我们可能需要从特定偏移量处开始处理消息。

  • 从分区的起始位置开始读消息:seekToBeginning(Collection<TopicPartition> partitions) 方法
  • 从分区的末尾位置开始读消息:seekToEnd(Collection<TopicPartition> partitions) 方法
  • 查找偏移量:seek(TopicPartition partition, long offset) 方法

通过 seek(TopicPartition partition, long offset) 可以实现处理消息和提交偏移量在一个事务中完成。思路就是需要在客户端建立一张数据表,保证处理消息和和消息偏移量位置写入到这张数据表。在一个事务中,此时就可以保证处理消息和记录偏移量要么同时成功,要么同时失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
consumer.subscribe(topic);
// 1.第一次调用pool,加入消费者群组
consumer.poll(0);
// 2.获取负责的分区,并从本地数据库读取改分区最新偏移量,并通过seek方法修改poll获取消息的位置
for (TopicPartition partition: consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition));

while (true) {
ConsumerRecords<String, String> records =
consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
processRecord(record);
storeRecordInDB(record);
storeOffsetInDB(record.topic(), record.partition(),
record.offset());
}
commitDBTransaction();
}

关闭连接

如果想让消费者从轮询消费消息的无限循环中退出,可以通过另一个线程调用 consumer.wakeup() 方法consumer.wakeup() 是消费者唯一一个可以从其他线程里安全调用的方法。调用 consumer.wakeup() 可以退出 poll() ,并抛出 WakeupException 异常,或者如果调用 consumer.wakeup() 时线程没有等待轮询,那么异常将在下一轮调用 poll() 时抛出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

...

try {
// looping until ctrl-c, the shutdown hook will cleanup on exit
while (true) {
ConsumerRecords<String, String> records =
movingAvg.consumer.poll(1000);
System.out.println(System.currentTimeMillis() +
"-- waiting for data...");
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
for (TopicPartition tp: consumer.assignment())
System.out.println("Committing offset at position:" +
consumer.position(tp));
movingAvg.consumer.commitSync();
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}

分区再均衡

什么是分区再均衡

分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为分区再均衡(Rebalance)Rebalance 实现了消费者群组的高可用性和伸缩性

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。

当在群组里面新增/移除消费者或者新增/移除 kafka 集群 broker 节点时,群组协调器 Broker 会触发再均衡,重新为每一个 Partition 分配消费者。Rebalance 期间,消费者无法读取消息,造成整个消费者群组一小段时间的不可用。

何时生分区再均衡

分区再均衡的触发时机有三种:

  • 消费者群组成员数发生变更。比如有新的 Consumer 加入群组或者离开群组,或者是有 Consumer 实例崩溃被“踢出”群组。
    • 新增消费者。consumer 订阅主题之后,第一次执行 poll 方法
    • 移除消费者。执行 consumer.close() 操作或者消费客户端宕机,就不再通过 poll 向群组协调器发送心跳了,当群组协调器检测次消费者没有心跳,就会触发再均衡。
  • 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
  • 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
    • 新增 broker。如重启 broker 节点
    • 移除 broker。如 kill 掉 broker 节点。

分区再均衡的过程

Rebalance 是通过消费者群组中的称为“群主”消费者客户端进行的

(1)选择群主

当消费者要加入群组时,会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获取群组的活跃成员列表,并负责给每一个消费者分配分区

所谓协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。具体来讲,Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。

(2)消费者通过向被指派为群组协调器(Coordinator)的 Broker 定期发送心跳来维持它们和群组的从属关系以及它们对分区的所有权。

(3)群主从群组协调器获取群组成员列表,然后给每一个消费者进行分配分区 Partition。有两种分配策略:Range 和 RoundRobin。

  • Range 策略,就是把若干个连续的分区分配给消费者,如存在分区 1-5,假设有 3 个消费者,则消费者 1 负责分区 1-2,消费者 2 负责分区 3-4,消费者 3 负责分区 5。
  • RoundRoin 策略,就是把所有分区逐个分给消费者,如存在分区 1-5,假设有 3 个消费者,则分区 1->消费 1,分区 2->消费者 2,分区 3>消费者 3,分区 4>消费者 1,分区 5->消费者 2。

(4)群主分配完成之后,把分配情况发送给群组协调器。

(5)群组协调器再把这些信息发送给消费者。每个消费者只能看到自己的分配信息,只有群主知道所有消费者的分配信息

如何判定消费者已经死亡

消费者通过向被指定为群组协调器的 Broker 发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的。消费者会在轮询消息或提交偏移量时发送心跳。如果消费者超时未发送心跳,会话就会过期,群组协调器认定它已经死亡,就会触发一次再均衡。

当一个消费者要离开群组时,会通知协调器,协调器会立即触发一次再均衡,尽量降低处理停顿。

查找协调者

所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。那么,Consumer Group 如何确定为它服务的 Coordinator 在哪台 Broker 上呢?答案就在我们之前说过的 Kafka 内部位移主题 __consumer_offsets 身上。

目前,Kafka 为某个 Consumer Group 确定 Coordinator 所在的 Broker 的算法有 2 个步骤。

  1. 第 1 步:确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)

  2. 第 2 步:找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。

分区再均衡的问题

  • 首先,Rebalance 过程对 Consumer Group 消费过程有极大的影响。在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成
  • 其次,目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。
  • 最后,Rebalance 实在是太慢了。

避免分区再均衡

通过前文,我们已经知道了:分区再均衡的代价很高,应该尽量避免不必要的分区再均衡,以整体提高 Consumer 的吞吐量。

分区再均衡发生的时机有三个:

  • 组成员数量发生变化
  • 订阅主题数量发生变化
  • 订阅主题的分区数发生变化

后面两个通常都是运维的主动操作,所以它们引发的 Rebalance 大都是不可避免的。实际上,大部分情况下,导致分区再均衡的原因是:组成员数量发生变化。

有两种情况,消费者并没有宕机,但也被视为消亡:

  • 未及时发送心跳
  • Consumer 消费时间过长

未及时发送心跳

第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。因此,需要合理设置会话超时时间。这里给出一些推荐数值,你可以“无脑”地应用在你的生产环境中。

  • 设置 session.timeout.ms = 6s。
  • 设置 heartbeat.interval.ms = 2s。
  • 要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms

session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer。毕竟,我们还是希望能尽快揪出那些“尸位素餐”的 Consumer,早日把它们踢出 Group。希望这份配置能够较好地帮助你规避第一类“不必要”的 Rebalance。

Consumer 消费时间过长

第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。此时,**max.poll.interval.ms** 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。

GC 参数

如果你按照上面的推荐数值恰当地设置了这几个参数,却发现还是出现了 Rebalance,那么我建议你去排查一下Consumer 端的 GC 表现,比如是否出现了频繁的 Full GC 导致的长时间停顿,从而引发了 Rebalance。为什么特意说 GC?那是因为在实际场景中,我见过太多因为 GC 设置不合理导致程序频发 Full GC 而引发的非预期 Rebalance 了。

提交偏移量

每次调用 poll() 方法,它总是会返回由生产者写入 Kafka 但还没有被消费者读取过的记录,Kafka 因此可以追踪哪些记录是被哪个群组的哪个消费者读取的。

更新分区当前位置的操作叫作提交

偏移量的用处

如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡,完成再均衡后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。

(1)如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理

img

(2)如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失

img

由此可知,处理偏移量,会对客户端处理数据产生影响。

提交偏移量的旧方案

老版本的 Consumer Group 把偏移量保存在 ZooKeeper 中。ZooKeeper 是一个分布式的协调服务框架,Kafka 重度依赖它实现各种各样的协调管理。将偏移量保存在 ZooKeeper 外部系统的做法,最显而易见的好处就是减少了 Kafka Broker 端的状态保存开销,有利于实现伸缩性。

这种方案的问题在于:ZooKeeper 其实并不适合进行高频的写操作,而 Consumer Group 的偏移量更新却是一个非常频繁的操作。这种大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能,因此 Kafka 社区渐渐有了这样的共识:将 Consumer 偏移量保存在 ZooKeeper 中是不合适的做法。

提交偏移量的新方案

新版本 Consumer 的偏移量管理机制其实也很简单。

消费者向一个叫做 _consumer_offsets 的特殊主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡,完成再均衡后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。

**_consumer_offsets 主题的 Key 中应该保存 3 部分内容:<Group ID,主题名,分区号 >**。

通常来说,当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建偏移量主题。偏移量主题就是普通的 Kafka 主题,那么它自然也有对应的分区数。如果偏移量主题是 Kafka 自动创建的,那么该主题的分区数是 50,副本数是 3。分区数可以通过 offsets.topic.num.partitions 设置;副本数可以通过 offsets.topic.replication.factor 设置。

自动提交

自动提交是 Kafka 处理偏移量最简单的方式。

enable.auto.commit 属性被设为 true,那么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s

与消费者里的其他东西一样,自动提交也是在轮询里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。

假设我们仍然使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s(因为没有达到 5s 的时限,并没有提交偏移量),所以在这 3s 的数据将会被重复处理。虽然可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗的时间跨度,不过这种情况是无法完全避免的。

在使用自动提交时,每次调用轮询方法都会把上一次调用返回的偏移量提交上去,它并不知道具体哪些消息已经被处理了,所以在再次调用之前最好确保所有当前调用返回的消息都已经处理完毕(在调用 close() 方法之前也会进行自动提交)。一般情况下不会有什么问题,不过在处理异常或提前退出轮询时要格外小心。

自动提交虽然方便,不过无法避免丢失消息和分区再均衡时重复消息的问题

手动提交

自动提交虽然方便,不过无法避免丢失消息和分区再均衡时重复消息的问题。因此,可以通过手动提交偏移量,由开发者自行控制。

首先,enable.auto.commit 设为 false,关闭自动提交

如果 Kafka 触发了再均衡,我们需要在消费者失去对一个分区的所有权之前提交最后一个已处理记录的偏移量。如果消费者准备了一个缓冲区用于处理偶发的事件,那么在失去分区所有权之前,需要处理在缓冲区累积下来的记录。可能还需要关闭文件句柄、数据库连接等。

在为消费者分配新分区或移除旧分区时,可以通过消费者 API 执行一些应用程序代码,在调用 subscribe() 方法时传进去一个 ConsumerRebalanceListener 实例就可以了。 ConsumerRebalanceListener 有两个需要实现的方法。

  • public void onPartitionsRevoked(Collection partitions) 方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了。
  • public void onPartitionsAssigned(Collection partitions) 方法会在重新分配分区之后和消费者开始读取消息之前被调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private Map<TopicPartition, OffsetAndMetadata> currentOffsets=
new HashMap<>();

private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
}

public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
System.out.println("Lost partitions in rebalance.
Committing current
offsets:" + currentOffsets);
consumer.commitSync(currentOffsets);
}
}

try {
consumer.subscribe(topics, new HandleRebalance());

while (true) {
ConsumerRecords<String, String> records =
consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.println("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(),
record.partition()), new
OffsetAndMetadata(record.offset()+1, "no metadata"));
}
consumer.commitAsync(currentOffsets, null);
}
} catch (WakeupException e) {
// 忽略异常,正在关闭消费者
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync(currentOffsets);
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}
}

反序列化器

生产者需要用序列化器将 Java 对象转换成字节数组再发送给 Kafka;同理,消费者需要用反序列化器将从 Kafka 接收到的字节数组转换成 Java 对象。

独立消费者

通常,会有多个 Kafka 消费者组成群组,关注一个主题。

但可能存在这样的场景:只需要一个消费者从一个主题的所有分区或某个特定的分区读取数据。这时,就不需要消费者群组和再均衡了,只需要把主题或分区分配给消费者,然后开始读取消息并提交偏移量。

如果是这样,就不需要订阅主题,取而代之的是为自己分配分区。一个消费者可以订阅主题(并加入消费者群组),或为自己分配分区,但不能同时做这两件事。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
List<PartitionInfo> partitionInfos = null;
partitionInfos = consumer.partitionsFor("topic");

if (partitionInfos != null) {
for (PartitionInfo partition : partitionInfos)
partitions.add(new TopicPartition(partition.topic(),
partition.partition()));
consumer.assign(partitions);

while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);

for (ConsumerRecord<String, String> record: records) {
System.out.printf("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitSync();
}
}

消费者的配置

  • bootstrap.servers - Broker 集群地址,格式:ip1:port,ip2:port…,不需要设定全部的集群地址,设置两个或者两个以上即可。
  • group.id - 消费者隶属的消费者组名称,如果为空会报异常,一般而言,这个参数要有一定的业务意义。
  • fetch.min.bytes - 消费者获取记录的最小字节数。Kafka 会等到有足够的数据时才返回消息给消费者,以降低负载。
  • fetch.max.wait.ms - Kafka 需要等待足够的数据才返回给消费者,如果一直没有足够的数据,消费者就会迟迟收不到消息。所以需要指定 Broker 的等待延迟,一旦超时,直接返回数据给消费者。
  • max.partition.fetch.bytes - 指定了服务器从每个分区返回给消费者的最大字节数。默认为 1 MB。
  • session.timeout.ms - 指定了消费者的心跳超时时间。如果消费者没有在有效时间内发送心跳给群组协调器,协调器会视消费者已经消亡,从而触发分区再均衡。默认为 3 秒。
  • auto.offset.reset - 指定了消费者在读取一个没有偏移量的分区或偏移量无效的情况下,该如何处理。
    • latest - 表示在偏移量无效时,消费者将从最新的记录开始读取分区记录。
    • earliest - 表示在偏移量无效时,消费者将从起始位置读取分区记录。
  • enable.auto.commit - 指定了是否自动提交消息偏移量,默认开启。
  • partition.assignment.strategy - 消费者的分区分配策略。
    • Range - 表示会将主题的若干个连续的分区分配给消费者。
    • RoundRobin - 表示会将主题的所有分区按照轮询方式分配给消费者。
  • client.id - 客户端标识。
  • max.poll.records - 用于控制单次能获取到的记录数量。
  • receive.buffer.bytes - 用于设置 Socket 接收消息缓冲区(SO_RECBUF)的大小,默认值为 64KB。如果设置为-1,则使用操作系统的默认值。
  • send.buffer.bytes - 用于设置 Socket 发送消息缓冲区(SO_SNDBUF)的大小,默认值为 128KB。与 receive.buffer.bytes 参数一样,如果设置为-1,则使用操作系统的默认值。

参考资料

Kafka 生产

生产者简介

不管是把 Kafka 作为消息队列系统、还是数据存储平台,总是需要一个可以向 Kafka 写入数据的生产者和一个可以从 Kafka 读取数据的消费者,或者是一个兼具两种角色的应用程序。

使用 Kafka 的场景很多,诉求也各有不同,主要有:是否允许丢失消息?是否接受重复消息?是否有严格的延迟和吞吐量要求?

不同的场景对于 Kafka 生产者 API 的使用和配置会有直接的影响。

生产者传输实体

Kafka Producer 发送的数据对象叫做 ProducerRecord ,它有 4 个关键参数:

  • Topic - 主题
  • Partition - 分区(非必填)
  • Key - 键(非必填)
  • Value - 值

生产者发送流程

Kafka 生产者发送消息流程:

(1)序列化 - 发送前,生产者要先把键和值序列化成字节数组,这样它们才能够在网络中传输。

(2)分区 - 数据被传给分区器。如果在 ProducerRecord 中已经指定了分区,那么分区器什么也不会做;否则,分区器会根据 ProducerRecord 的键来选择一个分区。选定分区后,生产者就知道该把消息发送给哪个主题的哪个分区。

(3)批次传输 - 接着,这条记录会被添加到一个记录批次中。这个批次中的所有消息都会被发送到相同的主题和分区上。有一个独立的线程负责将这些记录批次发送到相应 Broker 上。

  • 批次,就是一组消息,这些消息属于同一个主题和分区
  • 发送时,会把消息分成批次传输,如果每次只发送一个消息,会占用大量的网路开销。

(4)响应 - 服务器收到消息会返回一个响应。

  • 如果成功,则返回一个 RecordMetaData 对象,它包含了主题、分区、偏移量;
  • 如果失败,则返回一个错误。生产者在收到错误后,可以进行重试,重试次数可以在配置中指定。失败一定次数后,就返回错误消息。

img

生产者向 Broker 发送消息时是怎么确定向哪一个 Broker 发送消息?

  • 生产者会向任意 broker 发送一个元数据请求(MetadataRequest),获取到每一个分区对应的 Leader 信息,并缓存到本地。
  • 生产者在发送消息时,会指定 Partition 或者通过 key 得到到一个 Partition,然后根据 Partition 从缓存中获取相应的 Leader 信息。

img

生产者 API

Kafka 的 Java 生产者 API 主要的对象就是 KafkaProducer。通常我们开发一个生产者的步骤有 4 步。

  1. 构造生产者对象所需的参数对象。
  2. 利用第 1 步的参数对象,创建 KafkaProducer 对象实例。
  3. 使用 KafkaProducersend 方法发送消息。
  4. 调用 KafkaProducerclose 方法关闭生产者并释放各种系统资源。

创建生产者

Kafka 生产者核心配置:

  • bootstrap.servers - 指定了 Producer 启动时要连接的 Broker 地址。注:如果你指定了 1000 个 Broker 连接信息,那么,Producer 启动时就会首先创建与这 1000 个 Broker 的 TCP 连接。在实际使用过程中,并不建议把集群中所有的 Broker 信息都配置到 bootstrap.servers 中,通常你指定 3 ~ 4 台就足以了。因为 Producer 一旦连接到集群中的任一台 Broker,就能拿到整个集群的 Broker 信息,故没必要为 bootstrap.servers 指定所有的 Broker。
  • key.serializer - 键的序列化器。
  • value.serializer - 值的序列化器。
1
2
3
4
5
6
7
8
9
10
// 指定生产者的配置
final Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
// 设置 key 的序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 设置 value 的序列化器
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 使用配置初始化 Kafka 生产者
Producer<String, String> producer = new KafkaProducer<>(properties);

异步发送

直接发送消息,不关心消息是否到达。

这种方式吞吐量最高,但有小概率会丢失消息。

【示例】异步发送

1
2
3
4
5
6
7
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}

同步发送

返回一个 Future 对象,调用 get() 方法,会一直阻塞等待 Broker 返回结果。

这是一种可靠传输方式,但吞吐量最差。

【示例】同步发送

1
2
3
4
5
6
7
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}

异步响应发送

代码如下,异步方式相对于“发送并忽略返回”的方式的不同在于:在异步返回时可以执行一些操作,如:抛出异常、记录错误日志。

这是一个折中的方案,即兼顾吞吐量,也保证消息不丢失。

【示例】异步响应发送

首先,定义一个 callback:

1
2
3
4
5
6
7
8
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}

然后,使用这个 callback:

1
2
3
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());

关闭连接

调用 producer.close() 方法可以关闭 Kafka 生产者连接。

1
2
3
4
5
6
7
8
9
Producer<String, String> producer = new KafkaProducer<>(properties);
try {
producer.send(new ProducerRecord<>(topic, msg));
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭连接
producer.close();
}

生产者的连接

Apache Kafka 的所有通信都是基于 TCP 的。无论是生产者、消费者,还是 Broker 之间的通信都是如此。

选用 TCP 连接是由于 TCP 本身提供的一些高级功能,如多路复用请求以及同时轮询多个连接的能力。

何时创建 TCP 连接

Kafka 生产者创建连接有三个时机:

(1)在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时,首先会创建与 bootstrap.servers 中所有 Broker 的 TCP 连接。

(2)当 Producer 更新集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。

  • 场景一:当 Producer 尝试给一个不存在的主题发送消息时,Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息。
  • 场景二:Producer 通过 metadata.max.age.ms 参数定期地去更新元数据信息。该参数的默认值是 300000,即 5 分钟,也就是说不管集群那边是否有变化,Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据。

(3)当要发送消息时,Producer 发现尚不存在与目标 Broker 的连接,会创建一个 TCP 连接。

何时关闭 TCP 连接

Producer 端关闭 TCP 连接的方式有两种:一种是用户主动关闭;一种是 Kafka 自动关闭

主动关闭是指调用 producer.close() 方法来关闭生产者连接;甚至包括用户调用 kill -9 主动“杀掉”Producer 应用。

如果设置 Producer 端 connections.max.idle.ms 参数大于 0(默认为 9 分钟),意味着,在 connections.max.idle.ms 指定时间内,如果没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。如果设置该参数为 -1,TCP 连接将成为永久长连接。

值得注意的是,在第二种方式中,TCP 连接是在 Broker 端被关闭的,但其实这个 TCP 连接的发起方是客户端,因此在 TCP 看来,这属于被动关闭的场景,即 passive close。被动关闭的后果就是会产生大量的 CLOSE_WAIT 连接,因此 Producer 端或 Client 端没有机会显式地观测到此连接已被中断。

序列化

Kafka 内置了常用 Java 基础类型的序列化器,如:StringSerializerIntegerSerializerDoubleSerializer 等。

但如果要传输较为复杂的对象,推荐使用序列化性能更高的工具,如:Avro、Thrift、Protobuf 等。

使用方式是通过实现 org.apache.kafka.common.serialization.Serializer 接口来引入自定义的序列化器。

分区

什么是分区

Kafka 的数据结构采用三级结构,即:主题(Topic)、分区(Partition)、消息(Record)。

在 Kafka 中,任意一个 Topic 维护了一组 Partition 日志,如下所示:

img

每个 Partition 都是一个单调递增的、不可变的日志记录,以不断追加的方式写入数据。Partition 中的每条记录会被分配一个单调递增的 id 号,称为偏移量(Offset),用于唯一标识 Partition 内的每条记录。

为什么要分区

为什么 Kafka 的数据结构采用三级结构?

分区的作用就是提供负载均衡的能力,以实现系统的高伸缩性(Scalability)。

不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的机器节点来增加整体系统的吞吐量。

分区策略

所谓分区策略是决定生产者将消息发送到哪个分区的算法,也就是负载均衡算法。

前文中已经提到,Kafka 生产者发送消息使用的对象 ProducerRecord ,可以选填 Partition 和 Key。不过,大多数应用会用到 key。key 有两个作用:作为消息的附加信息;也可以用来决定消息该被写到 Topic 的哪个 Partition,拥有相同 key 的消息将被写入同一个 Partition。

如果 ProducerRecord 指定了 Partition,则分区器什么也不做,否则分区器会根据 key 选择一个 Partition 。

  • 没有 key 时的分发逻辑:每隔 topic.metadata.refresh.interval.ms 的时间,随机选择一个 partition。这个时间窗口内的所有记录发送到这个 partition。发送数据出错后会重新选择一个 partition。
  • 根据 key 分发:Kafka 的选择分区策略是:根据 key 求 hash 值,然后将 hash 值对 partition 数量求模。这里的关键点在于,同一个 key 总是被映射到同一个 Partition 上。所以,在选择分区时,Kafka 会使用 Topic 的所有 Partition ,而不仅仅是可用的 Partition。这意味着,如果写入数据的 Partition 是不可用的,那么就会出错

自定义分区策略

如果 Kafka 的默认分区策略无法满足实际需要,可以自定义分区策略。需要显式地配置生产者端的参数 partitioner.class。这个参数该怎么设定呢?

首先,要实现 org.apache.kafka.clients.producer.Partitioner 接口。这个接口定义了两个方法:partitionclose,通常只需要实现最重要的 partition 方法。我们来看看这个方法的方法签名:

1
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

这里的 topickeykeyBytesvaluevalueBytes 都属于消息数据,cluster 则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。

接着,设置 partitioner.class 参数为自定义类的全限定名,那么生产者程序就会按照你的代码逻辑对消息进行分区。

负载均衡算法常见的有:

  • 随机算法
  • 轮询算法
  • 最小活跃数算法
  • 源地址哈希算法

可以根据实际需要去实现。

压缩

Kafka 的消息格式

目前,Kafka 共有两大类消息格式,社区分别称之为 V1 版本和 V2 版本。V2 版本是 Kafka 0.11.0.0 中正式引入的。

不论是哪个版本,Kafka 的消息层次都分为两层:消息集合(message set)以及消息(message)。一个消息集合中包含若干条日志项(record item),而日志项才是真正封装消息的地方。Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka 通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。

那么社区引入 V2 版本的目的是什么呢?V2 版本主要是针对 V1 版本的一些弊端做了修正。

在 V1 版本中,每条消息都需要执行 CRC 校验。但有些情况下消息的 CRC 值是会发生变化的。比如在 Broker 端可能会对消息时间戳字段进行更新,那么重新计算之后的 CRC 值也会相应更新;再比如 Broker 端在执行消息格式转换时(主要是为了兼容老版本客户端程序),也会带来 CRC 值的变化。鉴于这些情况,再对每条消息都执行 CRC 校验就有点没必要了,不仅浪费空间还耽误 CPU 时间。

因此,在 V2 版本中,只对消息集合执行 CRC 校验。V2 版本还有一个和压缩息息相关的改进,就是保存压缩消息的方法发生了变化。之前 V1 版本中保存压缩消息的方法是把多条消息进行压缩然后保存到外层消息的消息体字段中;而 V2 版本的做法是对整个消息集合进行压缩。显然后者应该比前者有更好的压缩效果。

Kafka 的压缩流程

Kafka 的压缩流程,一言以概之——Producer 端压缩、Broker 端保持、Consumer 端解压缩。

压缩过程

在 Kafka 中,压缩可能发生在两个地方:生产者端和 Broker 端。

生产者程序中配置 compression.type 参数即表示启用指定类型的压缩算法。

【示例】开启 GZIP 的 Producer 对象

1
2
3
4
5
6
7
8
9
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启 GZIP 压缩
props.put("compression.type", "gzip");

Producer<String, String> producer = new KafkaProducer<>(props);

通常,Broker 从 Producer 端接收到消息后,不做任何处理。以下两种情况除外:

  • 情况一:Broker 端指定了和 Producer 端不同的压缩算法。显然,应该尽量避免这种情况。

  • 情况二:Broker 端发生了消息格式转换。所谓的消息格式转换,主要是为了兼容老版本的消费者程序。在一个生产环境中,Kafka 集群中同时保存多种版本的消息格式非常常见。为了兼容老版本的格式,Broker 端会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。一般情况下这种消息格式转换对性能是有很大影响的,除了这里的压缩之外,它还让 Kafka 丧失了引以为豪的 Zero Copy 特性。

所谓零拷贝,说的是当数据在磁盘和网络进行传输时避免昂贵的内核态数据拷贝,从而实现快速的数据传输。因此如果 Kafka 享受不到这个特性的话,性能必然有所损失,所以尽量保证消息格式的统一吧,这样不仅可以避免不必要的解压缩 / 重新压缩,对提升其他方面的性能也大有裨益。

解压缩的过程

通常来说解压缩发生在消费者程序中,也就是说 Producer 发送压缩消息到 Broker 后,Broker 照单全收并原样保存起来。当 Consumer 程序请求这部分消息时,Broker 依然原样发送出去,当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息。

那么现在问题来了,Consumer 怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。Kafka 会将启用了哪种压缩算法封装进消息集合中,这样当 Consumer 读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。

压缩算法

在 Kafka 2.1.0 版本之前,Kafka 支持 3 种压缩算法:GZIP、Snappy 和 LZ4。从 2.1.0 开始,Kafka 正式支持 Zstandard 算法(简写为 zstd)。

在实际使用中,GZIP、Snappy、LZ4 甚至是 zstd 的表现各有千秋。但对于 Kafka 而言,它们的性能测试结果却出奇得一致,即在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在压缩比方面,zstd > LZ4 > GZIP > Snappy。

如果客户端机器 CPU 资源有很多富余,强烈建议开启 zstd 压缩,这样能极大地节省网络资源消耗

何时启用压缩

何时启用压缩是比较合适的时机呢?

压缩是在 Producer 端完成的工作,那么启用压缩的一个条件就是 Producer 程序运行机器上的 CPU 资源要很充足。如果 Producer 运行机器本身 CPU 已经消耗殆尽了,那么启用消息压缩无疑是雪上加霜,只会适得其反。

如果环境中带宽资源有限,那么也建议开启压缩。

幂等性

什么是幂等性

幂等(idempotent、idempotence)是一个数学与计算机学概念,指的是:一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。

Kafka Producer 的幂等性

在 Kafka 中,Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。它其实是 0.11.0.0 版本引入的新功能。在此之前,Kafka 向分区发送数据时,可能会出现同一条消息被发送了多次,导致消息重复的情况。在 0.11 之后,指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)

enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的去重。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。当然,实际的实现原理并没有这么简单,但你大致可以这么理解。

我们必须要了解幂等性 Producer 的作用范围:

  • 首先,**enable.idempotence 只能保证单分区上的幂等性**,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。
  • 其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。

如果想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)或者依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别!

PID 和 Sequence Number

为了实现 Producer 的幂等性,Kafka 引入了 Producer ID(即 PID)和 Sequence Number。

  • PID。每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个 PID 对用户是不可见的。
  • Sequence Numbler。对于每个 PID,该 Producer 发送数据的每个 <Topic, Partition> 都对应一个从 0 开始单调递增的 Sequence Number。

Broker 端在缓存中保存了这 seq number,对于接收的每条消息,如果其序号比 Broker 缓存中序号大于 1 则接受它,否则将其丢弃。这样就可以实现了消息重复提交了。但是,只能保证单个 Producer 对于同一个 <Topic, Partition> 的 Exactly Once 语义。不能保证同一个 Producer 一个 topic 不同的 partion 幂等。

img

实现幂等之后:

img

生成 PID 的流程

在执行创建事务时,如下:

1
Producer<String, String> producer = new KafkaProducer<String, String>(props);

会创建一个 Sender,并启动线程,执行如下 run 方法,在 maybeWaitForProducerId()中生成一个 producerId,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
====================================
类名:Sender
====================================

void run(long now) {
if (transactionManager != null) {
try {
........
if (!transactionManager.isTransactional()) {
// 为idempotent producer生成一个producer id
maybeWaitForProducerId();
} else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
........

幂等性的应用实例

(1)配置属性

需要设置:

  • enable.idempotence,需要设置为 ture,此时就会默认把 acks 设置为 all,所以不需要再设置 acks 属性了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 指定生产者的配置
final Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
// 设置 key 的序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 设置 value 的序列化器
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 开启幂等性
properties.put("enable.idempotence", true);
// 设置重试次数
properties.put("retries", 3);
//Reduce the no of requests less than 0
properties.put("linger.ms", 1);
// buffer.memory 控制生产者可用于缓冲的内存总量
properties.put("buffer.memory", 33554432);

// 使用配置初始化 Kafka 生产者
producer = new KafkaProducer<>(properties);

(2)发送消息

跟一般生产者一样,如下

1
2
3
4
5
6
7
public void produceIdempotMessage(String topic, String message) {
// 创建Producer
Producer producer = buildIdempotProducer();
// 发送消息
producer.send(new ProducerRecord<String, String>(topic, message));
producer.flush();
}

此时,因为我们并没有配置 transaction.id 属性,所以不能使用事务相关 API,如下

1
producer.initTransactions();

否则会出现如下错误:

1
2
3
4
Exception in thread “main” java.lang.IllegalStateException: Transactional method invoked on a non-transactional producer.
at org.apache.kafka.clients.producer.internals.TransactionManager.ensureTransactional(TransactionManager.java:777)
at org.apache.kafka.clients.producer.internals.TransactionManager.initializeTransactions(TransactionManager.java:202)
at org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:544)

Kafka 事务

Kafka 的事务概念是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务,或者说是是一个原子操作),同时成功或者失败

消息可靠性保障,由低到高为:

  • 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
  • 至少一次(at least once):消息不会丢失,但有可能被重复发送。
  • 精确一次(exactly once):消息不会丢失,也不会被重复发送。

Kafka 支持事务功能主要是为了实现精确一次处理语义的,而精确一次处理是实现流处理的基石。

事务

Kafka 自 0.11 版本开始提供了对事务的支持,目前主要是在 read committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息

事务型 Producer

事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。

事务属性实现前提是幂等性,即在配置事务属性 transaction.id 时,必须还得配置幂等性;但是幂等性是可以独立使用的,不需要依赖事务属性。

在事务属性之前先引入了生产者幂等性,它的作用为:

  • 生产者多次发送消息可以封装成一个原子操作,要么都成功,要么失败。
  • consumer-transform-producer 模式下,因为消费者提交偏移量出现问题,导致重复消费。需要将这个模式下消费者提交偏移量操作和生产者一系列生成消息的操作封装成一个原子操作。

消费者提交偏移量导致重复消费消息的场景:消费者在消费消息完成提交便宜量 o2 之前挂掉了(假设它最近提交的偏移量是 o1),此时执行再均衡时,其它消费者会重复消费消息(o1 到 o2 之间的消息)。

事务操作的 API

Producer 提供了 initTransactions, beginTransaction, sendOffsets, commitTransaction, abortTransaction 五个事务方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 初始化事务。需要注意的有:
* 1、前提
* 需要保证transation.id属性被配置。
* 2、这个方法执行逻辑是:
* (1)Ensures any transactions initiated by previous instances of the producer with the same
* transactional.id are completed. If the previous instance had failed with a transaction in
* progress, it will be aborted. If the last transaction had begun completion,
* but not yet finished, this method awaits its completion.
* (2)Gets the internal producer id and epoch, used in all future transactional
* messages issued by the producer.
*
*/
public void initTransactions();

/**
* 开启事务
*/
public void beginTransaction() throws ProducerFencedException ;

/**
* 为消费者提供的在事务内提交偏移量的操作
*/
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException ;

/**
* 提交事务
*/
public void commitTransaction() throws ProducerFencedException;

/**
* 放弃事务,类似回滚事务的操作
*/
public void abortTransaction() throws ProducerFencedException ;

Kafka 事务相关配置

使用 kafka 的事务 api 时的一些注意事项:

  • 需要消费者的自动模式设置为 false,并且不能子再手动的进行执行 consumer#commitSync 或者 consumer#commitAsyc
  • 设置 Producer 端参数 transctional.id。最好为其设置一个有意义的名字。
  • 和幂等性 Producer 一样,开启 enable.idempotence = true。如果配置了 transaction.id,则此时 enable.idempotence 会被设置为 true
  • 消费者需要配置事务隔离级别 isolation.level。在 consume-trnasform-produce 模式下使用事务时,必须设置为 READ_COMMITTED
    • read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
    • read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

Kafka 事务应用示例

只有生成操作

创建一个事务,在这个事务操作中,只有生成消息操作。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 在一个事务只有生产消息操作
*/
public void onlyProduceInTransaction() {
Producer producer = buildProducer();

// 1.初始化事务
producer.initTransactions();

// 2.开启事务
producer.beginTransaction();

try {
// 3.kafka写操作集合
// 3.1 do业务逻辑

// 3.2 发送消息
producer.send(new ProducerRecord<String, String>("test", "transaction-data-1"));

producer.send(new ProducerRecord<String, String>("test", "transaction-data-2"));
// 3.3 do其他业务逻辑,还可以发送其他topic的消息。

// 4.事务提交
producer.commitTransaction();


} catch (Exception e) {
// 5.放弃事务
producer.abortTransaction();
}

}

创建生产者,代码如下,需要:

  • 配置 transactional.id 属性
  • 配置 enable.idempotence 属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 需要:
* 1、设置transactional.id
* 2、设置enable.idempotence
* @return
*/
private Producer buildProducer() {

// create instance for properties to access producer configs
Properties props = new Properties();

// bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
props.put("bootstrap.servers", "localhost:9092");

// 设置事务id
props.put("transactional.id", "first-transactional");

// 设置幂等性
props.put("enable.idempotence",true);

//Set acknowledgements for producer requests.
props.put("acks", "all");

//If the request fails, the producer can automatically retry,
props.put("retries", 1);

//Specify buffer size in config,这里不进行设置这个属性,如果设置了,还需要执行producer.flush()来把缓存中消息发送出去
//props.put("batch.size", 16384);

//Reduce the no of requests less than 0
props.put("linger.ms", 1);

//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);

// Kafka消息是以键值对的形式发送,需要设置key和value类型序列化器
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");


Producer<String, String> producer = new KafkaProducer<String, String>(props);

return producer;
}

消费-生产并存(consume-transform-produce)

在一个事务中,既有生产消息操作又有消费消息操作,即常说的 Consume-tansform-produce 模式。如下实例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 在一个事务内,即有生产消息又有消费消息
*/
public void consumeTransferProduce() {
// 1.构建上产者
Producer producer = buildProducer();
// 2.初始化事务(生成productId),对于一个生产者,只能执行一次初始化事务操作
producer.initTransactions();

// 3.构建消费者和订阅主题
Consumer consumer = buildConsumer();
consumer.subscribe(Arrays.asList("test"));
while (true) {
// 4.开启事务
producer.beginTransaction();

// 5.1 接受消息
ConsumerRecords<String, String> records = consumer.poll(500);

try {
// 5.2 do业务逻辑;
System.out.println("customer Message---");
Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap();
for (ConsumerRecord<String, String> record : records) {
// 5.2.1 读取消息,并处理消息。print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());

// 5.2.2 记录提交的偏移量
commits.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));


// 6.生产新的消息。比如外卖订单状态的消息,如果订单成功,则需要发送跟商家结转消息或者派送员的提成消息
producer.send(new ProducerRecord<String, String>("test", "data2"));
}

// 7.提交偏移量
producer.sendOffsetsToTransaction(commits, "group0323");

// 8.事务提交
producer.commitTransaction();

} catch (Exception e) {
// 7.放弃事务
producer.abortTransaction();
}
}
}

创建消费者代码,需要:

  • 将配置中的自动提交属性(auto.commit)进行关闭
  • 而且在代码里面也不能使用手动提交 commitSync( )或者 commitAsync( )
  • 设置 isolation.level
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 需要:
* 1、关闭自动提交 enable.auto.commit
* 2、isolation.level为
* @return
*/
public Consumer buildConsumer() {
Properties props = new Properties();
// bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
props.put("bootstrap.servers", "localhost:9092");
// 消费者群组
props.put("group.id", "group0323");
// 设置隔离级别
props.put("isolation.level","read_committed");
// 关闭自动提交
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);

return consumer;
}

只有消费操作

创建一个事务,在这个事务操作中,只有生成消息操作,如下代码。这种操作其实没有什么意义,跟使用手动提交效果一样,无法保证消费消息操作和提交偏移量操作在一个事务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 在一个事务只有消息操作
*/
public void onlyConsumeInTransaction() {
Producer producer = buildProducer();

// 1.初始化事务
producer.initTransactions();

// 2.开启事务
producer.beginTransaction();

// 3.kafka读消息的操作集合
Consumer consumer = buildConsumer();
while (true) {
// 3.1 接受消息
ConsumerRecords<String, String> records = consumer.poll(500);

try {
// 3.2 do业务逻辑;
System.out.println("customer Message---");
Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap();
for (ConsumerRecord<String, String> record : records) {
// 3.2.1 处理消息 print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());

// 3.2.2 记录提交偏移量
commits.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));
}

// 4.提交偏移量
producer.sendOffsetsToTransaction(commits, "group0323");

// 5.事务提交
producer.commitTransaction();

} catch (Exception e) {
// 6.放弃事务
producer.abortTransaction();
}
}

}

生产者的配置

更详尽的生产者配置可以参考:Kafka 生产者官方配置说明

以下为生产者主要配置参数清单:

  • acks:指定了必须有多少个分区副本收到消息,生产者才会认为消息写入是成功的。默认为 acks=1
    • acks=0 如果设置为 0,则 Producer 不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成。在这种情况下,服务器是否收到请求是没法保证的,并且参数retries也不会生效(因为客户端无法获得失败信息)。每个记录返回的 offset 总是被设置为-1。
    • acks=1 如果设置为 1,leader 节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功。在这种情况下,如果 leader 节点在接收记录之后,并且在 follower 节点复制数据完成之前产生错误,则这条记录会丢失。
    • acks=all 如果设置为 all,这就意味着 leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。只要至少有一个同步副本存在,记录就不会丢失。这种方式是对请求传递的最有效保证。acks=-1 与 acks=all 是等效的。
  • buffer.memory:用来设置 Producer 缓冲区大小。
  • compression.type:Producer 生成数据时可使用的压缩类型。默认值是 none(即不压缩)。可配置的压缩类型包括:nonegzipsnappylz4zstd。压缩是针对批处理的所有数据,所以批处理的效果也会影响压缩比(更多的批处理意味着更好的压缩)。
  • retries:用来设置发送失败的重试次数。
  • batch.size:用来设置一个批次可占用的内存大小。
  • linger.ms:用来设置 Producer 在发送批次前的等待时间。
  • client.id:Kafka 服务器用它来识别消息源,可以是任意字符串。
  • max.in.flight.requests.per.connection:用来设置 Producer 在收到服务器响应前可以发送多少个消息。
  • timeout.ms:用来设置 Broker 等待同步副本返回消息确认的时间,与 acks 的配置相匹配。
  • request.timeout.ms:Producer 在发送数据时等待服务器返回响应的时间。
  • metadata.fetch.timeout.ms:Producer 在获取元数据时(如:分区的 Leader 是谁)等待服务器返回响应的时间。
  • max.block.ms:该配置控制 KafkaProducer.send()KafkaProducer.partitionsFor() 允许被阻塞的时长。这些方法可能因为缓冲区满了或者元数据不可用而被阻塞。用户提供的序列化程序或分区程序的阻塞将不会被计算到这个超时。
  • max.request.size:请求的最大字节数。
  • receieve.buffer.bytes:TCP 接收缓冲区的大小。
  • send.buffer.bytes:TCP 发送缓冲区的大小。

参考资料

深入理解 Java String 类型

String 类型可能是 Java 中应用最频繁的引用类型,但它的性能问题却常常被忽略。高效的使用字符串,可以提升系统的整体性能。当然,要做到高效使用字符串,需要深入了解其特性。

String 的不可变性

我们先来看下 String 的定义:

1
2
3
4
public final class String
implements java.io.Serializable, Comparable<String>, CharSequence {
/** The value is used for character storage. */
private final char value[];

String 类被 final 关键字修饰,表示不可继承 String

String 类的数据存储于 char[] 数组,这个数组被 final 关键字修饰,表示 String 对象不可被更改

为什么 Java 要这样设计?

(1)保证 String 对象安全性。避免 String 被篡改。

(2)保证 hash 值不会频繁变更

(3)可以实现字符串常量池。通常有两种创建字符串对象的方式,一种是通过字符串常量的方式创建,如 String str="abc"; 另一种是字符串变量通过 new 形式的创建,如 String str = new String("abc")

使用第一种方式创建字符串对象时,JVM 首先会检查该对象是否在字符串常量池中,如果在,就返回该对象引用,否则新的字符串将在常量池中被创建。这种方式可以减少同一个值的字符串对象的重复创建,节约内存。

String str = new String("abc") 这种方式,首先在编译类文件时,"abc" 常量字符串将会放入到常量结构中,在类加载时,"abc" 将会在常量池中创建;其次,在调用 new 时,JVM 命令将会调用 String 的构造函数,同时引用常量池中的 "abc" 字符串,在堆内存中创建一个 String 对象;最后,str 将引用 String 对象。

String 的性能考量

字符串拼接

字符串常量的拼接,编译器会将其优化为一个常量字符串

【示例】字符串常量拼接

1
2
3
4
5
6
public static void main(String[] args) {
// 本行代码在 class 文件中,会被编译器直接优化为:
// String str = "abc";
String str = "a" + "b" + "c";
System.out.println("str = " + str);
}

字符串变量的拼接,编译器会优化成 StringBuilder 的方式

【示例】字符串变量的拼接

1
2
3
4
5
6
7
8
public static void main(String[] args) {
String str = "";
for(int i=0; i<1000; i++) {
// 本行代码会被编译器优化为:
// str = (new StringBuilder(String.valueOf(str))).append(i).toString();
str = str + i;
}
}

但是,每次循环都会生成一个新的 StringBuilder 实例,同样也会降低系统的性能。

字符串拼接的正确方案:

  • 如果需要使用字符串拼接,应该优先考虑 StringBuilderappend 方法替代使用 +
  • 如果在并发编程中,String 对象的拼接涉及到线程安全,可以使用 StringBuffer。但是要注意,由于 StringBuffer 是线程安全的,涉及到锁竞争,所以从性能上来说,要比 StringBuilder 差一些。

字符串分割

Stringsplit() 方法使用正则表达式实现其强大的分割功能。而正则表达式的性能是非常不稳定的,使用不恰当会引起回溯问题,很可能导致 CPU 居高不下。

所以,应该慎重使用 split() 方法,可以考虑用 String.indexOf() 方法代替 split() 方法完成字符串的分割。如果实在无法满足需求,你就在使用 Split() 方法时,对回溯问题加以重视就可以了。

String.intern

在每次赋值的时候使用 Stringintern 方法,如果常量池中有相同值,就会重复使用该对象,返回对象引用,这样一开始的对象就可以被回收掉

在字符串常量中,默认会将对象放入常量池;在字符串变量中,对象是会创建在堆内存中,同时也会在常量池中创建一个字符串对象,复制到堆内存对象中,并返回堆内存对象引用。

如果调用 intern 方法,会去查看字符串常量池中是否有等于该对象的字符串,如果没有,就在常量池中新增该对象,并返回该对象引用;如果有,就返回常量池中的字符串引用。堆内存中原有的对象由于没有引用指向它,将会通过垃圾回收器回收。

【示例】

1
2
3
4
5
6
7
8
9
10
public class SharedLocation {

private String city;
private String region;
private String countryCode;
}

SharedLocation sharedLocation = new SharedLocation();
sharedLocation.setCity(messageInfo.getCity().intern()); sharedLocation.setCountryCode(messageInfo.getRegion().intern());
sharedLocation.setRegion(messageInfo.getCountryCode().intern());

使用 intern 方法需要注意:一定要结合实际场景。因为常量池的实现是类似于一个 HashTable 的实现方式,HashTable 存储的数据越大,遍历的时间复杂度就会增加。如果数据过大,会增加整个字符串常量池的负担。

String、StringBuffer、StringBuilder 有什么区别

String 是 Java 语言非常基础和重要的类,提供了构造和管理字符串的各种基本逻辑。它是典型的 Immutable 类,被声明成为 final class,所有属性也都是 final 的。也由于它的不可变性,类似拼接、裁剪字符串等动作,都会产生新的 String 对象。由于字符串操作的普遍性,所以相关操作的效率往往对应用性能有明显影响。

StringBuffer 是为解决上面提到拼接产生太多中间对象的问题而提供的一个类,我们可以用 append 或者 add 方法,把字符串添加到已有序列的末尾或者指定位置。StringBuffer 是一个线程安全的可修改字符序列。StringBuffer 的线程安全是通过在各种修改数据的方法上用 synchronized 关键字修饰实现的。

StringBuilder 是 Java 1.5 中新增的,在能力上和 StringBuffer 没有本质区别,但是它去掉了线程安全的部分,有效减小了开销,是绝大部分情况下进行字符串拼接的首选。

StringBufferStringBuilder 底层都是利用可修改的(char,JDK 9 以后是 byte)数组,二者都继承了 AbstractStringBuilder,里面包含了基本操作,区别仅在于最终的方法是否加了 synchronized。构建时初始字符串长度加 16(这意味着,如果没有构建对象时输入最初的字符串,那么初始值就是 16)。我们如果确定拼接会发生非常多次,而且大概是可预计的,那么就可以指定合适的大小,避免很多次扩容的开销。扩容会产生多重开销,因为要抛弃原有数组,创建新的(可以简单认为是倍数)数组,还要进行 arraycopy

**除非有线程安全的需要,不然一般都使用 StringBuilder**。

参考资料

Java 正则从入门到精通

关键词:Pattern、Matcher、捕获与非捕获、反向引用、零宽断言、贪婪与懒惰、元字符、DFA、NFA

正则简介

正则表达式是什么

正则表达式(Regular Expression)是一个用正则符号写出的公式,程序对这个公式进行语法分析,建立一个语法分析树,再根据这个分析树结合正则表达式的引擎生成执行程序(这个执行程序我们把它称作状态机,也叫状态自动机),用于字符匹配。

如何学习正则

正则表达式是一个强大的文本匹配工具,但是它的规则很复杂,理解起来较为困难,容易让人望而生畏。

刚接触正则时,我看了一堆正则的语义说明,但是仍然不明所以。后来,我多接触一些正则的应用实例,渐渐有了感觉,再结合语义说明,终有领悟。我觉得正则表达式和武侠修练武功差不多,应该先练招式,再练心法。如果一开始就直接看正则的规则,保证你会懵逼。当你熟悉基本招式(正则基本使用案例)后,也该修炼修炼心法(正则语法)了。真正的高手不能只靠死记硬背那么几招把式。就像张三丰教张无忌太极拳一样,领悟心法,融会贯通,少侠你就可以无招胜有招,成为传说中的绝世高手。

以上闲话可归纳为一句:学习正则应该从实例去理解规则。

正则工具类

JDK 中的 java.util.regex 包提供了对正则表达式的支持。

java.util.regex 有三个核心类:

  • Pattern 类:Pattern 是一个正则表达式的编译表示。
  • Matcher 类:Matcher 是对输入字符串进行解释和匹配操作的引擎。
  • PatternSyntaxException:PatternSyntaxException 是一个非强制异常类,它表示一个正则表达式模式中的语法错误。

注:需要格外注意一点,在 Java 中使用反斜杠”\“时必须写成 "\\"。所以本文的代码出现形如 String regex = "\\$\\{.*?\\}" 其实就是 \$\{.\*?\}

Pattern 类

Pattern类没有公共构造方法。要创建一个Pattern对象,你必须首先调用其静态方法compile,加载正则规则字符串,然后返回一个 Pattern 对象。

Pattern类一样,Matcher类也没有公共构造方法。你需要调用Pattern对象的matcher方法来获得一个Matcher对象。

【示例】Pattern 和 Matcher 的初始化

1
2
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(content);

Matcher 类

Matcher 类可以说是 java.util.regex 中的核心类,它有三类功能:校验、查找、替换。

校验

为了校验文本是否与正则规则匹配,Matcher 提供了以下几个返回值为 boolean 的方法。

序号 方法及说明
1 **public boolean lookingAt() ** 尝试将从区域开头开始的输入序列与该模式匹配。
2 **public boolean find() **尝试查找与该模式匹配的输入序列的下一个子序列。
3 public boolean find(int start)重置此匹配器,然后尝试查找匹配该模式、从指定索引开始的输入序列的下一个子序列。
4 **public boolean matches() **尝试将整个区域与模式匹配。

如果你傻傻分不清上面的查找方法有什么区别,那么下面一个例子就可以让你秒懂。

【示例】lookingAt、find、matches

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public static void main(String[] args) {
checkLookingAt("hello", "helloworld");
checkLookingAt("world", "helloworld");

checkFind("hello", "helloworld");
checkFind("world", "helloworld");

checkMatches("hello", "helloworld");
checkMatches("world", "helloworld");
checkMatches("helloworld", "helloworld");
}

private static void checkLookingAt(String regex, String content) {
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(content);
if (m.lookingAt()) {
System.out.println(content + "\tlookingAt: " + regex);
} else {
System.out.println(content + "\tnot lookingAt: " + regex);
}
}

private static void checkFind(String regex, String content) {
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(content);
if (m.find()) {
System.out.println(content + "\tfind: " + regex);
} else {
System.out.println(content + "\tnot find: " + regex);
}
}

private static void checkMatches(String regex, String content) {
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(content);
if (m.matches()) {
System.out.println(content + "\tmatches: " + regex);
} else {
System.out.println(content + "\tnot matches: " + regex);
}
}

输出:

1
2
3
4
5
6
7
helloworld	lookingAt: hello
helloworld not lookingAt: world
helloworld find: hello
helloworld find: world
helloworld not matches: hello
helloworld not matches: world
helloworld matches: helloworld

说明

regex = "world" 表示的正则规则是以 world 开头的字符串,regex = "hello"regex = "helloworld" 也是同理。

  • lookingAt方法从头部开始,检查 content 字符串是否有子字符串于正则规则匹配。
  • find方法检查 content 字符串是否有子字符串于正则规则匹配,不管字符串所在位置。
  • matches方法检查 content 字符串整体是否与正则规则匹配。

查找

为了查找文本匹配正则规则的位置,Matcher提供了以下方法:

序号 方法及说明
1 **public int start() **返回以前匹配的初始索引。
2 public int start(int group) 返回在以前的匹配操作期间,由给定组所捕获的子序列的初始索引
3 **public int end()**返回最后匹配字符之后的偏移量。
4 **public int end(int group)**返回在以前的匹配操作期间,由给定组所捕获子序列的最后字符之后的偏移量。
5 **public String group()**返回前一个符合匹配条件的子序列。
6 **public String group(int group)**返回指定的符合匹配条件的子序列。

【示例】使用 start()、end()、group() 查找所有匹配正则条件的子序列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) {
final String regex = "world";
final String content = "helloworld helloworld";
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(content);
System.out.println("content: " + content);

int i = 0;
while (m.find()) {
i++;
System.out.println("[" + i + "th] found");
System.out.print("start: " + m.start() + ", ");
System.out.print("end: " + m.end() + ", ");
System.out.print("group: " + m.group() + "\n");
}
}

输出

1
2
3
4
5
content: helloworld helloworld
[1th] found
start: 5, end: 10, group: world
[2th] found
start: 16, end: 21, group: world

说明

例子很直白,不言自明了吧。

替换

替换方法是替换输入字符串里文本的方法:

序号 方法及说明
1 **public Matcher appendReplacement(StringBuffer sb, String replacement)**实现非终端添加和替换步骤。
2 **public StringBuffer appendTail(StringBuffer sb)**实现终端添加和替换步骤。
3 **public String replaceAll(String replacement) ** 替换模式与给定替换字符串相匹配的输入序列的每个子序列。
4 public String replaceFirst(String replacement) 替换模式与给定替换字符串匹配的输入序列的第一个子序列。
5 **public static String quoteReplacement(String s)**返回指定字符串的字面替换字符串。这个方法返回一个字符串,就像传递给 Matcher 类的 appendReplacement 方法一个字面字符串一样工作。

【示例】replaceFirst 和 replaceAll

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {
String regex = "can";
String replace = "can not";
String content = "I can because I think I can.";

Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(content);

System.out.println("content: " + content);
System.out.println("replaceFirst: " + m.replaceFirst(replace));
System.out.println("replaceAll: " + m.replaceAll(replace));
}

输出

1
2
3
content: I can because I think I can.
replaceFirst: I can not because I think I can.
replaceAll: I can not because I think I can not.

说明

replaceFirst:替换第一个匹配正则规则的子序列。

replaceAll:替换所有匹配正则规则的子序列。

【示例】appendReplacement、appendTail 和 replaceAll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) {
String regex = "can";
String replace = "can not";
String content = "I can because I think I can.";
StringBuffer sb = new StringBuffer();
StringBuffer sb2 = new StringBuffer();

System.out.println("content: " + content);
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(content);
while (m.find()) {
m.appendReplacement(sb, replace);
}
System.out.println("appendReplacement: " + sb);
m.appendTail(sb);
System.out.println("appendTail: " + sb);
}

输出

1
2
3
content: I can because I think I can.
appendReplacement: I can not because I think I can not
appendTail: I can not because I think I can not.

说明

从输出结果可以看出,appendReplacementappendTail方法组合起来用,功能和replaceAll是一样的。

如果你查看replaceAll的源码,会发现其内部就是使用appendReplacementappendTail方法组合来实现的。

【示例】quoteReplacement 和 replaceAll,解决特殊字符替换问题

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {
String regex = "\\$\\{.*?\\}";
String replace = "${product}";
String content = "product is ${productName}.";

Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(content);
String replaceAll = m.replaceAll(replace);

System.out.println("content: " + content);
System.out.println("replaceAll: " + replaceAll);
}

输出

1
2
3
4
5
6
7
8
9
10
Exception in thread "main" java.lang.IllegalArgumentException: No group with name {product}
at java.util.regex.Matcher.appendReplacement(Matcher.java:849)
at java.util.regex.Matcher.replaceAll(Matcher.java:955)
at org.zp.notes.javase.regex.RegexDemo.wrongMethod(RegexDemo.java:42)
at org.zp.notes.javase.regex.RegexDemo.main(RegexDemo.java:18)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

说明

String regex = "\\$\\{.*?\\}";表示匹配类似${name}这样的字符串。由于${}都是特殊字符,需要用反义字符\来修饰才能被当做一个字符串字符来处理。

上面的例子是想将 ${productName} 替换为 ${product} ,然而replaceAll方法却将传入的字符串中的$当做特殊字符来处理了。结果产生异常。

如何解决这个问题?

JDK1.5 引入了quoteReplacement方法。它可以用来转换特殊字符。其实源码非常简单,就是判断字符串中如果有\$,就为它加一个转义字符\

我们对上面的代码略作调整:

m.replaceAll(replace)改为m.replaceAll(Matcher.quoteReplacement(replace)),新代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {
String regex = "\\$\\{.*?\\}";
String replace = "${product}";
String content = "product is ${productName}.";

Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(content);
String replaceAll = m.replaceAll(Matcher.quoteReplacement(replace));

System.out.println("content: " + content);
System.out.println("replaceAll: " + replaceAll);
}

输出

1
2
content: product is ${productName}.
replaceAll: product is ${product}.

说明

字符串中如果有\$,不能被正常解析的问题解决。

元字符

元字符(metacharacters)就是正则表达式中具有特殊意义的专用字符。

基本元字符

正则表达式的元字符难以记忆,很大程度上是因为有很多为了简化表达而出现的等价字符。而实际上最基本的元字符,并没有那么多。对于大部分的场景,基本元字符都可以搞定。让我们从一个个实例出发,由浅入深的去体会正则的奥妙。

多选(|

【示例】匹配一个确定的字符串

1
checkMatches("abc", "abc");

如果要匹配一个确定的字符串,非常简单,如例 1 所示。但是,如果你不确定要匹配的字符串,希望有多个选择,怎么办?答案是:使用元字符| ,它的含义是或。

【示例】匹配多个可选的字符串

1
2
3
4
5
6
7
8
9
// 测试正则表达式字符:|
Assert.assertTrue(checkMatches("yes|no", "yes"));
Assert.assertTrue(checkMatches("yes|no", "no"));
Assert.assertFalse(checkMatches("yes|no", "right"));

// 输出
// yes matches: yes|no
// no matches: yes|no
// right not matches: yes|no

分组(()

如果你希望表达式由多个子表达式组成,你可以使用 ()

【示例】匹配组合字符串

1
2
3
4
5
6
7
8
9
10
Assert.assertTrue(checkMatches("(play|end)(ing|ed)", "ended"));
Assert.assertTrue(checkMatches("(play|end)(ing|ed)", "ending"));
Assert.assertTrue(checkMatches("(play|end)(ing|ed)", "playing"));
Assert.assertTrue(checkMatches("(play|end)(ing|ed)", "played"));

// 输出
// ended matches: (play|end)(ing|ed)
// ending matches: (play|end)(ing|ed)
// playing matches: (play|end)(ing|ed)
// played matches: (play|end)(ing|ed)

指定单字符有效范围([]

前面展示了如何匹配字符串,但是很多时候你需要精确的匹配一个字符,这时可以使用[]

【示例】字符在指定范围

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 测试正则表达式字符:[]
Assert.assertTrue(checkMatches("[abc]", "b")); // 字符只能是a、b、c
Assert.assertTrue(checkMatches("[a-z]", "m")); // 字符只能是a - z
Assert.assertTrue(checkMatches("[A-Z]", "O")); // 字符只能是A - Z
Assert.assertTrue(checkMatches("[a-zA-Z]", "K")); // 字符只能是a - z和A - Z
Assert.assertTrue(checkMatches("[a-zA-Z]", "k"));
Assert.assertTrue(checkMatches("[0-9]", "5")); // 字符只能是0 - 9

// 输出
// b matches: [abc]
// m matches: [a-z]
// O matches: [A-Z]
// K matches: [a-zA-Z]
// k matches: [a-zA-Z]
// 5 matches: [0-9]

指定单字符无效范围( [^]

【示例】字符不能在指定范围

如果需要匹配一个字符的逆操作,即字符不能在指定范围,可以使用[^]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 测试正则表达式字符:[^]
Assert.assertFalse(checkMatches("[^abc]", "b")); // 字符不能是a、b、c
Assert.assertFalse(checkMatches("[^a-z]", "m")); // 字符不能是a - z
Assert.assertFalse(checkMatches("[^A-Z]", "O")); // 字符不能是A - Z
Assert.assertFalse(checkMatches("[^a-zA-Z]", "K")); // 字符不能是a - z和A - Z
Assert.assertFalse(checkMatches("[^a-zA-Z]", "k"));
Assert.assertFalse(checkMatches("[^0-9]", "5")); // 字符不能是0 - 9

// 输出
// b not matches: [^abc]
// m not matches: [^a-z]
// O not matches: [^A-Z]
// K not matches: [^a-zA-Z]
// k not matches: [^a-zA-Z]
// 5 not matches: [^0-9]

限制字符数量({}

如果想要控制字符出现的次数,可以使用 {}

字符 描述
{n} n 是一个非负整数。匹配确定的 n 次。
{n,} n 是一个非负整数。至少匹配 n 次。
{n,m} m 和 n 均为非负整数,其中 n <= m。最少匹配 n 次且最多匹配 m 次。

【示例】限制字符出现次数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// {n}: n 是一个非负整数。匹配确定的 n 次。
checkMatches("ap{1}", "a");
checkMatches("ap{1}", "ap");
checkMatches("ap{1}", "app");
checkMatches("ap{1}", "apppppppppp");

// {n,}: n 是一个非负整数。至少匹配 n 次。
checkMatches("ap{1,}", "a");
checkMatches("ap{1,}", "ap");
checkMatches("ap{1,}", "app");
checkMatches("ap{1,}", "apppppppppp");

// {n,m}: m 和 n 均为非负整数,其中 n <= m。最少匹配 n 次且最多匹配 m 次。
checkMatches("ap{2,5}", "a");
checkMatches("ap{2,5}", "ap");
checkMatches("ap{2,5}", "app");
checkMatches("ap{2,5}", "apppppppppp");

// 输出
// a not matches: ap{1}
// ap matches: ap{1}
// app not matches: ap{1}
// apppppppppp not matches: ap{1}
// a not matches: ap{1,}
// ap matches: ap{1,}
// app matches: ap{1,}
// apppppppppp matches: ap{1,}
// a not matches: ap{2,5}
// ap not matches: ap{2,5}
// app matches: ap{2,5}
// apppppppppp not matches: ap{2,5}

转义字符(/

如果想要查找元字符本身,你需要使用转义符,使得正则引擎将其视作一个普通字符,而不是一个元字符去处理。

1
2
3
4
5
6
* 的转义字符:\*
+ 的转义字符:\+
? 的转义字符:\?
^ 的转义字符:\^
$ 的转义字符:\$
. 的转义字符:\.

如果是转义符 \ 本身,你需要使用 \\

指定表达式字符串的开始(^)和结尾($

如果希望匹配的字符串必须以特定字符串开头,可以使用 ^

注意:请特别留意,这里的 ^ 一定要和 [^] 中的 ^ 区分。

【示例】限制字符串头部

1
2
3
4
5
6
Assert.assertTrue(checkMatches("^app[a-z]{0,}", "apple")); // 字符串必须以app开头
Assert.assertFalse(checkMatches("^app[a-z]{0,}", "aplause"));

// 输出
// apple matches: ^app[a-z]{0,}
// aplause not matches: ^app[a-z]{0,}

如果希望匹配的字符串必须以特定字符串结尾,可以使用 $

【示例】限制字符串尾部

1
2
3
4
5
6
Assert.assertTrue(checkMatches("[a-z]{0,}ing$", "playing")); // 字符串必须以ing结尾
Assert.assertFalse(checkMatches("[a-z]{0,}ing$", "long"));

// 输出
// playing matches: [a-z]{0,}ing$
// long not matches: [a-z]{0,}ing$

等价字符

等价字符,顾名思义,就是对于基本元字符表达的一种简化(等价字符的功能都可以通过基本元字符来实现)。

在没有掌握基本元字符之前,可以先不用理会,因为很容易把人绕晕。

等价字符的好处在于简化了基本元字符的写法。

表示某一类型字符的等价字符

下表中的等价字符都表示某一类型的字符。

字符 描述
. 匹配除“\n”之外的任何单个字符。
\d 匹配一个数字字符。等价于[0-9]。
\D 匹配一个非数字字符。等价于[^0-9]。
\w 匹配包括下划线的任何单词字符。类似但不等价于“[A-Za-z0-9_]”,这里的单词字符指的是 Unicode 字符集。
\W 匹配任何非单词字符。
\s 匹配任何不可见字符,包括空格、制表符、换页符等等。等价于[ \f\n\r\t\v]。
\S 匹配任何可见字符。等价于[ \f\n\r\t\v]。

【示例】基本等价字符的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 匹配除“\n”之外的任何单个字符
Assert.assertTrue(checkMatches(".{1,}", "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_"));
Assert.assertTrue(checkMatches(".{1,}", "~!@#$%^&*()+`-=[]{};:<>,./?|\\"));
Assert.assertFalse(checkMatches(".", "\n"));
Assert.assertFalse(checkMatches("[^\n]", "\n"));

// 匹配一个数字字符。等价于[0-9]
Assert.assertTrue(checkMatches("\\d{1,}", "0123456789"));
// 匹配一个非数字字符。等价于[^0-9]
Assert.assertFalse(checkMatches("\\D{1,}", "0123456789"));

// 匹配包括下划线的任何单词字符。类似但不等价于“[A-Za-z0-9_]”,这里的单词字符指的是Unicode字符集
Assert.assertTrue(checkMatches("\\w{1,}", "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_"));
Assert.assertFalse(checkMatches("\\w{1,}", "~!@#$%^&*()+`-=[]{};:<>,./?|\\"));
// 匹配任何非单词字符
Assert.assertFalse(checkMatches("\\W{1,}", "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_"));
Assert.assertTrue(checkMatches("\\W{1,}", "~!@#$%^&*()+`-=[]{};:<>,./?|\\"));

// 匹配任何不可见字符,包括空格、制表符、换页符等等。等价于[ \f\n\r\t\v]
Assert.assertTrue(checkMatches("\\s{1,}", " \f\r\n\t"));
// 匹配任何可见字符。等价于[^ \f\n\r\t\v]
Assert.assertFalse(checkMatches("\\S{1,}", " \f\r\n\t"));

// 输出
// ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_ matches: .{1,}
// ~!@#$%^&*()+`-=[]{};:<>,./?|\\ matches: .{1,}
// \n not matches: .
// \n not matches: [^\n]
// 0123456789 matches: \\d{1,}
// 0123456789 not matches: \\D{1,}
// ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_ matches: \\w{1,}
// ~!@#$%^&*()+`-=[]{};:<>,./?|\\ not matches: \\w{1,}
// ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_ not matches: \\W{1,}
// ~!@#$%^&*()+`-=[]{};:<>,./?|\\ matches: \\W{1,}
// \f\r\n\t matches: \\s{1,}
// \f\r\n\t not matches: \\S{1,}

限制字符数量的等价字符

在基本元字符章节中,已经介绍了限制字符数量的基本元字符 - {}

此外,还有 *+? 这个三个为了简化写法而出现的等价字符,我们来认识一下。

字符 描述
* 匹配前面的子表达式零次或多次。等价于{0,}。
+ 匹配前面的子表达式一次或多次。等价于{1,}。
? 匹配前面的子表达式零次或一次。等价于 {0,1}。

案例 限制字符数量的等价字符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// *: 匹配前面的子表达式零次或多次。* 等价于{0,}。
checkMatches("ap*", "a");
checkMatches("ap*", "ap");
checkMatches("ap*", "app");
checkMatches("ap*", "apppppppppp");

// +: 匹配前面的子表达式一次或多次。+ 等价于 {1,}。
checkMatches("ap+", "a");
checkMatches("ap+", "ap");
checkMatches("ap+", "app");
checkMatches("ap+", "apppppppppp");

// ?: 匹配前面的子表达式零次或一次。? 等价于 {0,1}。
checkMatches("ap?", "a");
checkMatches("ap?", "ap");
checkMatches("ap?", "app");
checkMatches("ap?", "apppppppppp");

// 输出
// a matches: ap*
// ap matches: ap*
// app matches: ap*
// apppppppppp matches: ap*
// a not matches: ap+
// ap matches: ap+
// app matches: ap+
// apppppppppp matches: ap+
// a matches: ap?
// ap matches: ap?
// app not matches: ap?
// apppppppppp not matches: ap?

元字符优先级顺序

正则表达式从左到右进行计算,并遵循优先级顺序,这与算术表达式非常类似。

下表从最高到最低说明了各种正则表达式运算符的优先级顺序:

运算符 说明
\ 转义符
()(?:)(?=)[] 括号和中括号
*+?{n}{n,}{n,m} 限定符
^$*任何字符任何字符* 定位点和序列
` `

字符具有高于替换运算符的优先级,使得 m|food 匹配 mfood 。若要匹配 moodfood ,请使用括号创建子表达式,从而产生 (m|f)ood

分组构造

在基本元字符章节,提到了 () 字符可以用来对表达式分组。实际上分组还有更多复杂的用法。

所谓分组构造,是用来描述正则表达式的子表达式,用于捕获字符串中的子字符串。

捕获与非捕获

下表为分组构造中的捕获和非捕获分类。

表达式 描述 捕获或非捕获
(exp) 匹配的子表达式 捕获
(?<name>exp) 命名的反向引用 捕获
(?:exp) 非捕获组 非捕获
(?=exp) 零宽度正预测先行断言 非捕获
(?!exp) 零宽度负预测先行断言 非捕获
(?<=exp) 零宽度正回顾后发断言 非捕获
(?<!exp) 零宽度负回顾后发断言 非捕获

注:Java 正则引擎不支持平衡组。

反向引用

带编号的反向引用

带编号的反向引用使用以下语法:\number

其中number 是正则表达式中捕获组的序号位置。 例如,\4 匹配第四个捕获组的内容。 如果正则表达式模式中未定义number,则将发生分析错误

【示例】匹配重复的单词和紧随每个重复的单词的单词(不命名子表达式)

1
2
3
4
5
6
7
8
// (\w+)\s\1\W(\w+) 匹配重复的单词和紧随每个重复的单词的单词
Assert.assertTrue(findAll("(\\w+)\\s\\1\\W(\\w+)",
"He said that that was the the correct answer.") > 0);

// 输出
// regex = (\w+)\s\1\W(\w+), content: He said that that was the the correct answer.
// [1th] start: 8, end: 21, group: that that was
// [2th] start: 22, end: 37, group: the the correct

说明:

  • (\w+):匹配一个或多个单词字符。
  • \s:与空白字符匹配。
  • \1:匹配第一个组,即(\w+)。
  • \W:匹配包括空格和标点符号的一个非单词字符。 这样可以防止正则表达式模式匹配从第一个捕获组的单词开头的单词。

命名的反向引用

命名后向引用通过使用下面的语法进行定义:\k<name >

【示例】匹配重复的单词和紧随每个重复的单词的单词(命名子表达式)

1
2
3
4
5
6
7
8
// (?<duplicateWord>\w+)\s\k<duplicateWord>\W(?<nextWord>\w+) 匹配重复的单词和紧随每个重复的单词的单词
Assert.assertTrue(findAll("(?<duplicateWord>\\w+)\\s\\k<duplicateWord>\\W(?<nextWord>\\w+)",
"He said that that was the the correct answer.") > 0);

// 输出
// regex = (?<duplicateWord>\w+)\s\k<duplicateWord>\W(?<nextWord>\w+), content: He said that that was the the correct answer.
// [1th] start: 8, end: 21, group: that that was
// [2th] start: 22, end: 37, group: the the correct

说明:

  • (?<duplicateWord>\w+):匹配一个或多个单词字符。 命名此捕获组 duplicateWord。
  • \s: 与空白字符匹配。
  • \k<duplicateWord>:匹配名为 duplicateWord 的捕获的组。
  • \W:匹配包括空格和标点符号的一个非单词字符。 这样可以防止正则表达式模式匹配从第一个捕获组的单词开头的单词。
  • (?<nextWord>\w+):匹配一个或多个单词字符。 命名此捕获组 nextWord。

非捕获组

(?:exp) 表示当一个限定符应用到一个组,但组捕获的子字符串并非所需时,通常会使用非捕获组构造。

【示例】匹配以.结束的语句。

1
2
3
4
5
6
// 匹配由句号终止的语句。
Assert.assertTrue(findAll("(?:\\b(?:\\w+)\\W*)+\\.", "This is a short sentence. Never end") > 0);

// 输出
// regex = (?:\b(?:\w+)\W*)+\., content: This is a short sentence. Never end
// [1th] start: 0, end: 25, group: This is a short sentence.

零宽断言

用于查找在某些内容(但并不包括这些内容)之前或之后的东西,也就是说它们像\b,^,$那样用于指定一个位置,这个位置应该满足一定的条件(即断言),因此它们也被称为零宽断言。

表达式 描述
(?=exp) 匹配 exp 前面的位置
(?<=exp) 匹配 exp 后面的位置
(?!exp) 匹配后面跟的不是 exp 的位置
(?<!exp) 匹配前面不是 exp 的位置

匹配 exp 前面的位置

(?=exp) 表示输入字符串必须匹配子表达式中的正则表达式模式,尽管匹配的子字符串未包含在匹配结果中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// \b\w+(?=\sis\b) 表示要捕获is之前的单词
Assert.assertTrue(findAll("\\b\\w+(?=\\sis\\b)", "The dog is a Malamute.") > 0);
Assert.assertFalse(findAll("\\b\\w+(?=\\sis\\b)", "The island has beautiful birds.") > 0);
Assert.assertFalse(findAll("\\b\\w+(?=\\sis\\b)", "The pitch missed home plate.") > 0);
Assert.assertTrue(findAll("\\b\\w+(?=\\sis\\b)", "Sunday is a weekend day.") > 0);

// 输出
// regex = \b\w+(?=\sis\b), content: The dog is a Malamute.
// [1th] start: 4, end: 7, group: dog
// regex = \b\w+(?=\sis\b), content: The island has beautiful birds.
// not found
// regex = \b\w+(?=\sis\b), content: The pitch missed home plate.
// not found
// regex = \b\w+(?=\sis\b), content: Sunday is a weekend day.
// [1th] start: 0, end: 6, group: Sunday

说明:

  • \b:在单词边界处开始匹配。
  • \w+:匹配一个或多个单词字符。
  • (?=\sis\b):确定单词字符是否后接空白字符和字符串“is”,其在单词边界处结束。 如果如此,则匹配成功。

匹配 exp 后面的位置

(?<=exp) 表示子表达式不得在输入字符串当前位置左侧出现,尽管子表达式未包含在匹配结果中。零宽度正回顾后发断言不会回溯。

1
2
3
4
5
6
7
// (?<=\b20)\d{2}\b 表示要捕获以20开头的数字的后面部分
Assert.assertTrue(findAll("(?<=\\b20)\\d{2}\\b", "2010 1999 1861 2140 2009") > 0);

// 输出
// regex = (?<=\b20)\d{2}\b, content: 2010 1999 1861 2140 2009
// [1th] start: 2, end: 4, group: 10
// [2th] start: 22, end: 24, group: 09

说明:

  • \d{2}:匹配两个十进制数字。
  • {?<=\b20):如果两个十进制数字的字边界以小数位数“20”开头,则继续匹配。
  • \b:在单词边界处结束匹配。

匹配后面跟的不是 exp 的位置

(?!exp) 表示输入字符串不得匹配子表达式中的正则表达式模式,尽管匹配的子字符串未包含在匹配结果中。

【示例】捕获未以“un”开头的单词

1
2
3
4
5
6
7
8
9
// \b(?!un)\w+\b 表示要捕获未以“un”开头的单词
Assert.assertTrue(findAll("\\b(?!un)\\w+\\b", "unite one unethical ethics use untie ultimate") > 0);

// 输出
// regex = \b(?!un)\w+\b, content: unite one unethical ethics use untie ultimate
// [1th] start: 6, end: 9, group: one
// [2th] start: 20, end: 26, group: ethics
// [3th] start: 27, end: 30, group: use
// [4th] start: 37, end: 45, group: ultimate

说明:

  • \b:在单词边界处开始匹配。
  • (?!un):确定接下来的两个的字符是否为“un”。 如果没有,则可能匹配。
  • \w+:匹配一个或多个单词字符。
  • \b:在单词边界处结束匹配。

匹配前面不是 exp 的位置

(?<!exp) 表示子表达式不得在输入字符串当前位置的左侧出现。 但是,任何不匹配子表达式 的子字符串不包含在匹配结果中。

【示例】捕获任意工作日

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// (?<!(Saturday|Sunday) )\b\w+ \d{1,2}, \d{4}\b 表示要捕获任意工作日(即周一到周五)
Assert.assertTrue(findAll("(?<!(Saturday|Sunday) )\\b\\w+ \\d{1,2}, \\d{4}\\b", "Monday February 1, 2010") > 0);
Assert.assertTrue(findAll("(?<!(Saturday|Sunday) )\\b\\w+ \\d{1,2}, \\d{4}\\b", "Wednesday February 3, 2010") > 0);
Assert.assertFalse(findAll("(?<!(Saturday|Sunday) )\\b\\w+ \\d{1,2}, \\d{4}\\b", "Saturday February 6, 2010") > 0);
Assert.assertFalse(findAll("(?<!(Saturday|Sunday) )\\b\\w+ \\d{1,2}, \\d{4}\\b", "Sunday February 7, 2010") > 0);
Assert.assertTrue(findAll("(?<!(Saturday|Sunday) )\\b\\w+ \\d{1,2}, \\d{4}\\b", "Monday, February 8, 2010") > 0);

// 输出
// regex = (?<!(Saturday|Sunday) )\b\w+ \d{1,2}, \d{4}\b, content: Monday February 1, 2010
// [1th] start: 7, end: 23, group: February 1, 2010
// regex = (?<!(Saturday|Sunday) )\b\w+ \d{1,2}, \d{4}\b, content: Wednesday February 3, 2010
// [1th] start: 10, end: 26, group: February 3, 2010
// regex = (?<!(Saturday|Sunday) )\b\w+ \d{1,2}, \d{4}\b, content: Saturday February 6, 2010
// not found
// regex = (?<!(Saturday|Sunday) )\b\w+ \d{1,2}, \d{4}\b, content: Sunday February 7, 2010
// not found
// regex = (?<!(Saturday|Sunday) )\b\w+ \d{1,2}, \d{4}\b, content: Monday, February 8, 2010
// [1th] start: 8, end: 24, group: February 8, 2010

贪婪与懒惰

当正则表达式中包含能接受重复的限定符时,通常的行为是(在使整个表达式能得到匹配的前提下)匹配尽可能多的字符。以这个表达式为例:a.*b,它将会匹配最长的以 a 开始,以 b 结束的字符串。如果用它来搜索 aabab 的话,它会匹配整个字符串 aabab。这被称为贪婪匹配。

有时,我们更需要懒惰匹配,也就是匹配尽可能少的字符。前面给出的限定符都可以被转化为懒惰匹配模式,只要在它后面加上一个问号?。这样.*?就意味着匹配任意数量的重复,但是在能使整个匹配成功的前提下使用最少的重复。

表达式 描述
*? 重复任意次,但尽可能少重复
+? 重复 1 次或更多次,但尽可能少重复
?? 重复 0 次或 1 次,但尽可能少重复
{n,m}? 重复 n 到 m 次,但尽可能少重复
{n,}? 重复 n 次以上,但尽可能少重复

【示例】Java 正则中贪婪与懒惰的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 贪婪匹配
Assert.assertTrue(findAll("a\\w*b", "abaabaaabaaaab") > 0);

// 懒惰匹配
Assert.assertTrue(findAll("a\\w*?b", "abaabaaabaaaab") > 0);
Assert.assertTrue(findAll("a\\w+?b", "abaabaaabaaaab") > 0);
Assert.assertTrue(findAll("a\\w??b", "abaabaaabaaaab") > 0);
Assert.assertTrue(findAll("a\\w{0,4}?b", "abaabaaabaaaab") > 0);
Assert.assertTrue(findAll("a\\w{3,}?b", "abaabaaabaaaab") > 0);

// 输出
// regex = a\w*b, content: abaabaaabaaaab
// [1th] start: 0, end: 14, group: abaabaaabaaaab
// regex = a\w*?b, content: abaabaaabaaaab
// [1th] start: 0, end: 2, group: ab
// [2th] start: 2, end: 5, group: aab
// [3th] start: 5, end: 9, group: aaab
// [4th] start: 9, end: 14, group: aaaab
// regex = a\w+?b, content: abaabaaabaaaab
// [1th] start: 0, end: 5, group: abaab
// [2th] start: 5, end: 9, group: aaab
// [3th] start: 9, end: 14, group: aaaab
// regex = a\w??b, content: abaabaaabaaaab
// [1th] start: 0, end: 2, group: ab
// [2th] start: 2, end: 5, group: aab
// [3th] start: 6, end: 9, group: aab
// [4th] start: 11, end: 14, group: aab
// regex = a\w{0,4}?b, content: abaabaaabaaaab
// [1th] start: 0, end: 2, group: ab
// [2th] start: 2, end: 5, group: aab
// [3th] start: 5, end: 9, group: aaab
// [4th] start: 9, end: 14, group: aaaab
// regex = a\w{3,}?b, content: abaabaaabaaaab
// [1th] start: 0, end: 5, group: abaab
// [2th] start: 5, end: 14, group: aaabaaaab

说明:

本例中代码展示的是使用不同贪婪或懒惰策略去查找字符串 abaabaaabaaaab 中匹配a 开头,以 b 结尾的所有子字符串。请从输出结果中,细细体味使用不同的贪婪或懒惰策略,对于匹配子字符串有什么影响。

正则附录

匹配正则字符串的方法

由于正则表达式中很多元字符本身就是转义字符,在 Java 字符串的规则中不会被显示出来。

为此,可以使用一个工具类org.apache.commons.lang3.StringEscapeUtils来做特殊处理,使得转义字符可以打印。这个工具类提供的都是静态方法,从方法命名大致也可以猜出用法,这里不多做说明。

如果你了解 maven,可以直接引入依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>

【示例】本文为了展示正则匹配规则用到的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private boolean checkMatches(String regex, String content) {
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(content);
boolean flag = m.matches();
if (m.matches()) {
System.out.println(StringEscapeUtils.escapeJava(content) + "\tmatches: " + StringEscapeUtils.escapeJava(regex));
} else {
System.out.println(StringEscapeUtils.escapeJava(content) + "\tnot matches: " + StringEscapeUtils.escapeJava(regex));
}
return flag;
}

public int findAll(String regex, String content) {
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(content);
System.out.println("regex = " + regex + ", content: " + content);

int count = 0;
while (m.find()) {
count++;
System.out.println("[" + count + "th] " + "start: " + m.start() + ", end: " + m.end()
+ ", group: " + m.group());
}
if (0 == count) {
System.out.println("not found");
}
return count;
}

速查元字符字典

为了方便快查正则的元字符含义,在本节根据元字符的功能集中罗列正则的各种元字符。

限定符

字符 描述
* 匹配前面的子表达式零次或多次。例如,zo* 能匹配 “z” 以及 “zoo”。* 等价于{0,}。
+ 匹配前面的子表达式一次或多次。例如,’zo+’ 能匹配 “zo” 以及 “zoo”,但不能匹配 “z”。+ 等价于 {1,}。
? 匹配前面的子表达式零次或一次。例如,”do(es)?” 可以匹配 “do” 或 “does” 中的”do” 。? 等价于 {0,1}。
{n} n 是一个非负整数。匹配确定的 n 次。例如,’o{2}’ 不能匹配 “Bob” 中的 ‘o’,但是能匹配 “food” 中的两个 o。
{n,} n 是一个非负整数。至少匹配 n 次。例如,’o{2,}’ 不能匹配 “Bob” 中的 ‘o’,但能匹配 “foooood” 中的所有 o。’o{1,}’ 等价于 ‘o+’。’o{0,}’ 则等价于 ‘o*‘。
{n,m} m 和 n 均为非负整数,其中 n <= m。最少匹配 n 次且最多匹配 m 次。例如,”o{1,3}” 将匹配 “fooooood” 中的前三个 o。’o{0,1}’ 等价于 ‘o?’。请注意在逗号和两个数之间不能有空格。

定位符

字符 描述
^ 匹配输入字符串开始的位置。如果设置了 RegExp 对象的 Multiline 属性,^ 还会与 \n 或 \r 之后的位置匹配。
$ 匹配输入字符串结尾的位置。如果设置了 RegExp 对象的 Multiline 属性,$ 还会与 \n 或 \r 之前的位置匹配。
\b 匹配一个字边界,即字与空格间的位置。
\B 非字边界匹配。

非打印字符

字符 描述
\cx 匹配由 x 指明的控制字符。例如, \cM 匹配一个 Control-M 或回车符。x 的值必须为 A-Z 或 a-z 之一。否则,将 c 视为一个原义的 ‘c’ 字符。
\f 匹配一个换页符。等价于 \x0c 和 \cL。
\n 匹配一个换行符。等价于 \x0a 和 \cJ。
\r 匹配一个回车符。等价于 \x0d 和 \cM。
\s 匹配任何空白字符,包括空格、制表符、换页符等等。等价于 [ \f\n\r\t\v]。
\S 匹配任何非空白字符。等价于 [ \f\n\r\t\v]。
\t 匹配一个制表符。等价于 \x09 和 \cI。
\v 匹配一个垂直制表符。等价于 \x0b 和 \cK。

分组

表达式 描述
(exp) 匹配的子表达式。()中的内容就是子表达式。
(?<name>exp) 命名的子表达式(反向引用)。
(?:exp) 非捕获组,表示当一个限定符应用到一个组,但组捕获的子字符串并非所需时,通常会使用非捕获组构造。
(?=exp) 匹配 exp 前面的位置。
(?<=exp) 匹配 exp 后面的位置。
(?!exp) 匹配后面跟的不是 exp 的位置。
(?<!exp) 匹配前面不是 exp 的位置。

特殊符号

字符 描述
\ 将下一个字符标记为或特殊字符、或原义字符、或向后引用、或八进制转义符。例如, ‘n’ 匹配字符 ‘n’。’\n’ 匹配换行符。序列 ‘\‘ 匹配 “",而 ‘(‘ 则匹配 “(“。
| 指明两项之间的一个选择。
[] 匹配方括号范围内的任意一个字符。形式如:[xyz]、[^xyz]、[a-z]、[^a-z]、[x,y,z]

正则实战

虽然本系列洋洋洒洒的大谈特谈正则表达式。但是我还是要在这里建议,如果一个正则表达式没有经过充分测试,还是要谨慎使用。

正则是把双刃剑,它可以为你节省大量的代码行。但是由于它不易阅读,维护起来可是头疼的哦(你需要一个字符一个字符的去理解)。

最实用的正则

校验中文

校验字符串中只能有中文字符(不包括中文标点符号)。中文字符的 Unicode 编码范围是 \u4e00\u9fa5

如有兴趣,可以参考百度百科-Unicode

1
^[\u4e00-\u9fa5]+$
  • 匹配: 春眠不觉晓
  • 不匹配:春眠不觉晓,

校验身份证号码

身份证为 15 位或 18 位。15 位是第一代身份证。从 1999 年 10 月 1 日起,全国实行公民身份证号码制度,居民身份证编号由原 15 位升至 18 位。

  • 15 位身份证:由 15 位数字组成。排列顺序从左至右依次为:六位数字地区码;六位数字出生日期;三位顺序号,其中 15 位男为单数,女为双数。
  • 18 位身份证:由十七位数字本体码和一位数字校验码组成。排列顺序从左至右依次为:六位数字地区码;八位数字出生日期;三位数字顺序码和一位数字校验码(也可能是 X)。

身份证号含义详情请见:百度百科-居民身份证号码

地区码(6 位)

1
(1[1-5]|2[1-3]|3[1-7]|4[1-3]|5[0-4]|6[1-5])\d{4}

出生日期(8 位)

注:下面的是 18 位身份证的有效出生日期,如果是 15 位身份证,只要将第一个\d{4}改为\d{2}即可。

1
((\d{4}((0[13578]|1[02])(0[1-9]|[12]\d|3[01])|(0[13456789]|1[012])(0[1-9]|[12]\d|30)|02(0[1-9]|1\d|2[0-8])))|([02468][048]|[13579][26])0229)

15 位有效身份证

1
^((1[1-5]|2[1-3]|3[1-7]|4[1-3]|5[0-4]|6[1-5])\d{4})((\d{2}((0[13578]|1[02])(0[1-9]|[12]\d|3[01])|(0[13456789]|1[012])(0[1-9]|[12]\d|30)|02(0[1-9]|1\d|2[0-8])))|([02468][048]|[13579][26])0229)(\d{3})$
  • 匹配:110001700101031

  • 不匹配:110001701501031

18 位有效身份证

1
^((1[1-5]|2[1-3]|3[1-7]|4[1-3]|5[0-4]|6[1-5])\d{4})((\d{4}((0[13578]|1[02])(0[1-9]|[12]\d|3[01])|(0[13456789]|1[012])(0[1-9]|[12]\d|30)|02(0[1-9]|1\d|2[0-8])))|([02468][048]|[13579][26])0229)(\d{3}(\d|X))$
  • 匹配:110001199001010310 | 11000019900101015X

  • 不匹配:990000199001010310 | 110001199013010310

校验有效用户名、密码

描述:长度为 6-18 个字符,允许输入字母、数字、下划线,首字符必须为字母。

1
^[a-zA-Z]\w{5,17}$

校验邮箱

描述:不允许使用 IP 作为域名,如 : hello@154.145.68.12

@符号前的邮箱用户和.符号前的域名(domain)必须满足以下条件:

  • 字符只能是英文字母、数字、下划线_.-
  • 首字符必须为字母或数字;
  • _.- 不能连续出现。

域名的根域只能为字母,且至少为两个字符。

1
^[A-Za-z0-9](([_\.\-]?[a-zA-Z0-9]+)*)@([A-Za-z0-9]+)(([\.\-]?[a-zA-Z0-9]+)*)\.([A-Za-z]{2,})$

校验 URL

描述:校验 URL。支持 http、https、ftp、ftps。

1
^(ht|f)(tp|tps)\://[a-zA-Z0-9\-\.]+\.([a-zA-Z]{2,3})?(/\S*)?$

校验时间

描述:校验时间。时、分、秒必须是有效数字,如果数值不是两位数,十位需要补零。

1
^([0-1][0-9]|[2][0-3]):([0-5][0-9])$
  • 匹配:00:00:00 | 23:59:59 | 17:06:30

  • 不匹配:17:6:30 | 24:16:30

校验日期

描述:校验日期。日期满足以下条件:

  • 格式 yyyy-MM-dd 或 yyyy-M-d
  • 连字符可以没有或是“-”、“/”、“.”之一
  • 闰年的二月可以有 29 日;而平年不可以。
  • 一、三、五、七、八、十、十二月为 31 日。四、六、九、十一月为 30 日。
1
^(?:(?!0000)[0-9]{4}([-/.]?)(?:(?:0?[1-9]|1[0-2])\1(?:0?[1-9]|1[0-9]|2[0-8])|(?:0?[13-9]|1[0-2])\1(?:29|30)|(?:0?[13578]|1[02])\1(?:31))|(?:[0-9]{2}(?:0[48]|[2468][048]|[13579][26])|(?:0[48]|[2468][048]|[13579][26])00)([-/.]?)0?2\2(?:29))$
  • 匹配:2016/1/1 | 2016/01/01 | 20160101 | 2016-01-01 | 2016.01.01 | 2000-02-29
  • 不匹配:2001-02-29 | 2016/12/32 | 2016/6/31 | 2016/13/1 | 2016/0/1

校验中国手机号码

描述:中国手机号码正确格式:11 位数字。

移动有 16 个号段:134、135、136、137、138、139、147、150、151、152、157、158、159、182、187、188。其中 147、157、188 是 3G 号段,其他都是 2G 号段。联通有 7 种号段:130、131、132、155、156、185、186。其中 186 是 3G(WCDMA)号段,其余为 2G 号段。电信有 4 个号段:133、153、180、189。其中 189 是 3G 号段(CDMA2000),133 号段主要用作无线网卡号。总结:13 开头手机号 0-9;15 开头手机号 0-3、5-9;18 开头手机号 0、2、5-9。

此外,中国在国际上的区号为 86,所以手机号开头有+86、86 也是合法的。

以上信息来源于 百度百科-手机号

1
^((\+)?86\s*)?((13[0-9])|(15([0-3]|[5-9]))|(18[0,2,5-9]))\d{8}$
  • 匹配:+86 18012345678 | 86 18012345678 | 15812345678

  • 不匹配:15412345678 | 12912345678 | 180123456789

校验中国固话号码

描述:固话号码,必须加区号(以 0 开头)。
3 位有效区号:010、020~029,固话位数为 8 位。
4 位有效区号:03xx 开头到 09xx,固话位数为 7。

如果想了解更详细的信息,请参考 百度百科-电话区号

1
^(010|02[0-9])(\s|-)\d{8}|(0[3-9]\d{2})(\s|-)\d{7}$
  • 匹配:010-12345678 | 010 12345678 | 0512-1234567 | 0512 1234567

  • 不匹配:1234567 | 12345678

校验 IPv4 地址

描述:IP 地址是一个 32 位的二进制数,通常被分割为 4 个“8 位二进制数”(也就是 4 个字节)。IP 地址通常用“点分十进制”表示成(a.b.c.d)的形式,其中,a,b,c,d 都是 0~255 之间的十进制整数。

1
^([01]?\d\d?|2[0-4]\d|25[0-5])\.([01]?\d\d?|2[0-4]\d|25[0-5])\.([01]?\d\d?|2[0-4]\d|25[0-5])\.([01]?\d\d?|2[0-4]\d|25[0-5])$
  • 匹配:0.0.0.0 | 255.255.255.255 | 127.0.0.1

  • 不匹配:10.10.10 | 10.10.10.256

校验 IPv6 地址

描述:IPv6 的 128 位地址通常写成 8 组,每组为四个十六进制数的形式。

IPv6 地址可以表示为以下形式:

显然,IPv6 地址的表示方式很复杂。你也可以参考:

百度百科-IPv6

Stack overflow 上的 IPv6 正则表达高票答案

1
(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))
  • 匹配:1:2:3:4:5:6:7:8 | 1:: | 1::8 | 1::6:7:8 | 1::5:6:7:8 | 1::4:5:6:7:8 | 1::3:4:5:6:7:8 | ::2:3:4:5:6:7:8 | 1:2:3:4:5:6:7:: | 1:2:3:4:5:6::8 | 1:2:3:4:5::8 | 1:2:3:4::8 | 1:2:3::8 | 1:2::8 | 1::8 | ::8 | fe80::7:8%1 | ::255.255.255.255 | 2001:db8:3:4::192.0.2.33 | 64:ff9b::192.0.2.33

  • 不匹配:1.2.3.4.5.6.7.8 | 1::2::3

特定字符

  • 匹配长度为 3 的字符串:^.{3}$
  • 匹配由 26 个英文字母组成的字符串:^[A-Za-z]+$
  • 匹配由 26 个大写英文字母组成的字符串:^[A-Z]+$
  • 匹配由 26 个小写英文字母组成的字符串:^[a-z]+$
  • 匹配由数字和 26 个英文字母组成的字符串:^[A-Za-z0-9]+$
  • 匹配由数字、26 个英文字母或者下划线组成的字符串:^\w+$

特定数字

  • 匹配正整数:^[1-9]\d*$
  • 匹配负整数:^-[1-9]\d*$
  • 匹配整数:^(-?[1-9]\d*)|0$
  • 匹配正浮点数:^[1-9]\d*\.\d+|0\.\d+$
  • 匹配负浮点数:^-([1-9]\d*\.\d*|0\.\d*[1-9]\d*)$
  • 匹配浮点数:^-?([1-9]\d*\.\d*|0\.\d*[1-9]\d*|0?\.0+|0)$

正则表达式的性能

目前实现正则表达式引擎的方式有两种:DFA 自动机(Deterministic Final Automata 确定有限状态自动机)和 NFA 自动机(Non deterministic Finite Automaton 非确定有限状态自动机)。对比来看,构造 DFA 自动机的代价远大于 NFA 自动机,但 DFA 自动机的执行效率高于 NFA 自动机。

假设一个字符串的长度是 n,如果用 DFA 自动机作为正则表达式引擎,则匹配的时间复杂度为 O(n);如果用 NFA 自动机作为正则表达式引擎,由于 NFA 自动机在匹配过程中存在大量的分支和回溯,假设 NFA 的状态数为 s,则该匹配算法的时间复杂度为 O(ns)。

NFA 自动机的优势是支持更多功能。例如,捕获 group、环视、占有优先量词等高级功能。这些功能都是基于子表达式独立进行匹配,因此在编程语言里,使用的正则表达式库都是基于 NFA 实现的。

NFA 自动机的回溯

用 NFA 自动机实现的比较复杂的正则表达式,在匹配过程中经常会引起回溯问题。大量的回溯会长时间地占用 CPU,从而带来系统性能开销。

1
2
text=“abbc”
regex=“ab{1,3}c”

这个例子匹配目的是:匹配以 a 开头,以 c 结尾,中间有 1-3 个 b 字符的字符串。NFA 自动机对其解析的过程是这样的:

  • 读取正则表达式第一个匹配符 a 和字符串第一个字符 a 进行比较,a 对 a,匹配。
  • 然后,读取正则表达式第二个匹配符 b{1,3} 和字符串的第二个字符 b 进行比较,匹配。但因为 b{1,3} 表示 1-3 个 b 字符串,NFA 自动机又具有贪婪特性,所以此时不会继续读取正则表达式的下一个匹配符,而是依旧使用 b{1,3} 和字符串的第三个字符 b 进行比较,结果还是匹配。
  • 接着继续使用 b{1,3} 和字符串的第四个字符 c 进行比较,发现不匹配了,此时就会发生回溯,已经读取的字符串第四个字符 c 将被吐出去,指针回到第三个字符 b 的位置。
  • 那么发生回溯以后,匹配过程怎么继续呢?程序会读取正则表达式的下一个匹配符 c,和字符串中的第四个字符 c 进行比较,结果匹配,结束。

如何避免回溯

贪婪模式(Greedy)

顾名思义,就是在数量匹配中,如果单独使用 +、 ? 、* 或{min,max} 等量词,正则表达式会匹配尽可能多的内容。

例如,上边那个例子:

1
2
text=“abbc”
regex=“ab{1,3}c”

就是在贪婪模式下,NFA 自动机读取了最大的匹配范围,即匹配 3 个 b 字符。匹配发生了一次失败,就引起了一次回溯。如果匹配结果是“abbbc”,就会匹配成功。

1
2
text=“abbbc”
regex=“ab{1,3}c”

懒惰模式(Reluctant)

在该模式下,正则表达式会尽可能少地重复匹配字符。如果匹配成功,它会继续匹配剩余的字符串。

例如,在上面例子的字符后面加一个“?”,就可以开启懒惰模式。

1
2
text=“abc”
regex=“ab{1,3}?c”

匹配结果是“abc”,该模式下 NFA 自动机首先选择最小的匹配范围,即匹配 1 个 b 字符,因此就避免了回溯问题。

独占模式(Possessive)

同贪婪模式一样,独占模式一样会最大限度地匹配更多内容;不同的是,在独占模式下,匹配失败就会结束匹配,不会发生回溯问题。

还是上边的例子,在字符后面加一个“+”,就可以开启独占模式。

1
2
text=“abbc”
regex=“ab{1,3}+bc”

结果是不匹配,结束匹配,不会发生回溯问题。

讲到这里,你应该非常清楚了,避免回溯的方法就是:使用懒惰模式和独占模式。

正则表达式的优化

少用贪婪模式,多用独占模式

贪婪模式会引起回溯问题,可以使用独占模式来避免回溯。

减少分支选择

分支选择类型 (X|Y|Z) 的正则表达式会降低性能,我们在开发的时候要尽量减少使用。如果一定要用,我们可以通过以下几种方式来优化:

  • 首先,我们需要考虑选择的顺序,将比较常用的选择项放在前面,使它们可以较快地被匹配;
  • 其次,我们可以尝试提取共用模式,例如,将 (abcd|abef) 替换为 ab(cd|ef),后者匹配速度较快,因为 NFA 自动机会尝试匹配 ab,如果没有找到,就不会再尝试任何选项;
  • 最后,如果是简单的分支选择类型,我们可以用三次 index 代替 (X|Y|Z),如果测试的话,你就会发现三次 index 的效率要比 (X|Y|Z) 高出一些。

减少捕获嵌套

  • 捕获组是指把正则表达式中,子表达式匹配的内容保存到以数字编号或显式命名的数组中,方便后面引用。一般一个 () 就是一个捕获组,捕获组可以进行嵌套。
  • 非捕获组则是指参与匹配却不进行分组编号的捕获组,其表达式一般由 (?:exp) 组成。

在正则表达式中,每个捕获组都有一个编号,编号 0 代表整个匹配到的内容。我们可以看下面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {
String text = "<input high=\"20\" weight=\"70\">test</input>";
String reg="(<input.*?>)(.*?)(</input>)";
Pattern p = Pattern.compile(reg);
Matcher m = p.matcher(text);
while(m.find()) {
System.out.println(m.group(0));// 整个匹配到的内容
System.out.println(m.group(1));//(<input.*?>)
System.out.println(m.group(2));//(.*?)
System.out.println(m.group(3));//(</input>)
}
}

运行结果:

1
2
3
4
<input high=\"20\" weight=\"70\">test</input>
<input high=\"20\" weight=\"70\">
test
</input>

如果你并不需要获取某一个分组内的文本,那么就使用非捕获分组。例如,使用“(?:X)”代替“(X)”,我们再看下面的例子:

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
String text = "<input high=\"20\" weight=\"70\">test</input>";
String reg="(?:<input.*?>)(.*?)(?:</input>)";
Pattern p = Pattern.compile(reg);
Matcher m = p.matcher(text);
while(m.find()) {
System.out.println(m.group(0));// 整个匹配到的内容
System.out.println(m.group(1));//(.*?)
}
}

运行结果:

1
2
<input high=\"20\" weight=\"70\">test</input>
test

综上可知:减少不需要获取的分组,可以提高正则表达式的性能。

参考资料

Java 并发之内存模型

Java 内存模型(Java Memory Model),简称 JMM。Java 内存模型的目标是为了解决由可见性和有序性导致的并发安全问题。Java 内存模型通过 屏蔽各种硬件和操作系统的内存访问差异,以实现让 Java 程序在各种平台下都能达到一致的内存访问效果

物理内存模型

物理机遇到的并发问题与虚拟机中的情况有不少相似之处,物理机对并发的处理方案对于虚拟机的实现也有相当大的参考意义。

硬件处理效率存在很大差异

技术在进步,CPU、内存、I/O 设备的性能也在不断提高。但是,始终存在一个核心矛盾:CPU、内存、I/O 设备存在很大的速度差异 - CPU 远快于内存,内存远快于 I/O 设备。木桶短板理论告诉我们:一只木桶能装多少水,取决于最短的那块木板。同理,程序整体性能取决于最慢的操作(即 I/O 操作),所以单方面提高 CPU、内存的性能是无效的。

为了合理利用 CPU 的高性能,平衡这三者的速度差异,计算机体系机构、操作系统、编译程序都做出了贡献,主要体现为:

  • CPU 增加了缓存,以均衡与 CPU 内存的速度差异;
  • 编译程序优化指令执行次序,使得缓存能够得到更加合理地利用。
  • 操作系统增加了进程、线程,以分时复用 CPU,进而均衡 CPU 与 I/O 的速度差异;

缓存导致的可见性问题,编译优化带来的有序性问题,线程切换带来的原子性问题。

缓存一致性

高速缓存解决了 硬件效率问题,但是引入了一个新的问题:缓存一致性(Cache Coherence)

在多处理器系统中,每个处理器都有自己的高速缓存,而它们又共享同一主内存。当多个处理器的运算任务都涉及同一块主内存区域时,将可能导致各自的缓存数据不一致。

为了解决缓存一致性问题,需要各个处理器访问缓存时都遵循一些协议,在读写时要根据协议来进行操作

指令重排序

为了使缓存得到更加合理地使用,计算机在执行程序代码的时候,会对指令进行重排序。

什么是指令重排序? 简单来说就是系统在执行代码的时候并不一定是按照你写的代码的顺序依次执行。

常见的指令重排序有下面 2 种情况:

  • 编译器优化重排:编译器(包括 JVM、JIT 编译器等)在不改变单线程程序语义的前提下,重新安排语句的执行顺序。
  • 指令并行重排:现代处理器采用了指令级并行技术 (Instruction-Level Parallelism,ILP) 来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。

另外,内存系统也会有“重排序”,但又不是真正意义上的重排序。在 JMM 里表现为主存和本地内存的内容可能不一致,进而导致程序在多线程下执行可能出现问题。

Java 源代码会经历 编译器优化重排 —> 指令并行重排 —> 内存系统重排 的过程,最终才变成操作系统可执行的指令序列。

指令重排序可以保证串行语义一致,但是没有义务保证多线程间的语义也一致 ,所以在多线程下,指令重排序可能会导致一些问题。

对于编译器优化重排和处理器的指令重排序(指令并行重排和内存系统重排都属于是处理器级别的指令重排序),处理该问题的方式不一样。

  • 对于编译器,通过禁止特定类型的编译器重排序的方式来禁止重排序。
  • 对于处理器,通过插入内存屏障(Memory Barrier,或有时叫做内存栅栏,Memory Fence)的方式来禁止特定类型的处理器重排序。

Java 内存模型

内存模型 这个概念。我们可以理解为:在特定的操作协议下,对特定的内存或高速缓存进行读写访问的过程抽象。不同架构的物理计算机可以有不一样的内存模型,JVM 也有自己的内存模型。

JVM 中试图定义一种 Java 内存模型(Java Memory Model, JMM)来屏蔽各种硬件和操作系统的内存访问差异,以实现让 Java 程序 在各种平台下都能达到一致的内存访问效果

Java 并发简介 中已经介绍了,并发安全需要满足可见性、有序性、原子性。其中,导致可见性的原因是缓存,导致有序性的原因是编译优化。那解决可见性、有序性最直接的办法就是禁用缓存和编译优化 。但这么做,性能就堪忧了。

合理的方案应该是按需禁用缓存以及编译优化。那么,如何做到呢?,Java 内存模型规范了 JVM 如何提供按需禁用缓存和编译优化的方法。具体来说,这些方法包括 volatilesynchronizedfinal 三个关键字,以及 Happens-Before 规则

主内存和工作内存

JMM 的主要目标是 定义程序中各个变量的访问规则,即在虚拟机中将变量存储到内存和从内存中取出变量这样的底层细节。此处的变量(Variables)与 Java 编程中所说的变量有所区别,它包括了实例字段、静态字段和构成数值对象的元素,但不包括局部变量与方法参数,因为后者是线程私有的,不会被共享,自然就不会存在竞争问题。为了获得较好的执行效能,JMM 并没有限制执行引擎使用处理器的特定寄存器或缓存来和主存进行交互,也没有限制即使编译器进行调整代码执行顺序这类优化措施。

JMM 规定了所有的变量都存储在主内存(Main Memory)中

每条线程还有自己的工作内存(Working Memory),工作内存中保留了该线程使用到的变量的主内存的副本。工作内存是 JMM 的一个抽象概念,并不真实存在,它涵盖了缓存,写缓冲区,寄存器以及其他的硬件和编译器优化。

线程对变量的所有操作都必须在工作内存中进行,而不能直接读写主内存中的变量。不同的线程间也无法直接访问对方工作内存中的变量,线程间变量值的传递均需要通过主内存来完成

说明:

这里说的主内存、工作内存与 Java 内存区域中的堆、栈、方法区等不是同一个层次的内存划分。

JMM 内存操作的问题

类似于物理内存模型面临的问题,JMM 存在以下两个问题:

  • 工作内存数据一致性 - 各个线程操作数据时会保存使用到的主内存中的共享变量副本,当多个线程的运算任务都涉及同一个共享变量时,将导致各自的的共享变量副本不一致。如果真的发生这种情况,数据同步回主内存以谁的副本数据为准? Java 内存模型主要通过一系列的数据同步协议、规则来保证数据的一致性。
  • 指令重排序优化 - Java 中重排序通常是编译器或运行时环境为了优化程序性能而采取的对指令进行重新排序执行的一种手段。重排序分为两类:编译期重排序和运行期重排序,分别对应编译时和运行时环境。 同样的,指令重排序不是随意重排序,它需要满足以下两个条件:
    • 在单线程环境下不能改变程序运行的结果。即时编译器(和处理器)需要保证程序能够遵守 as-if-serial 属性。通俗地说,就是在单线程情况下,要给程序一个顺序执行的假象。即经过重排序的执行结果要与顺序执行的结果保持一致。
    • 存在数据依赖关系的不允许重排序。
    • 多线程环境下,如果线程处理逻辑之间存在依赖关系,有可能因为指令重排序导致运行结果与预期不同。

内存间交互操作

JMM 定义了 8 个操作来完成主内存和工作内存之间的交互操作。JVM 实现时必须保证下面介绍的每种操作都是 原子的(对于 double 和 long 型的变量来说,load、store、read、和 write 操作在某些平台上允许有例外 )。

  • lock (锁定) - 作用于主内存的变量,它把一个变量标识为一条线程独占的状态。
  • unlock (解锁) - 作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
  • read (读取) - 作用于主内存的变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的 load 动作使用。
  • write (写入) - 作用于主内存的变量,它把 store 操作从工作内存中得到的变量的值放入主内存的变量中。
  • load (载入) - 作用于工作内存的变量,它把 read 操作从主内存中得到的变量值放入工作内存的变量副本中。
  • use (使用) - 作用于工作内存的变量,它把工作内存中一个变量的值传递给执行引擎,每当虚拟机遇到一个需要使用到变量的值得字节码指令时就会执行这个操作。
  • assign (赋值) - 作用于工作内存的变量,它把一个从执行引擎接收到的值赋给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。
  • store (存储) - 作用于工作内存的变量,它把工作内存中一个变量的值传送到主内存中,以便随后 write 操作使用。

如果要把一个变量从主内存中复制到工作内存,就需要按序执行 readload 操作;如果把变量从工作内存中同步回主内存中,就需要按序执行 storewrite 操作。但 Java 内存模型只要求上述操作必须按顺序执行,而没有保证必须是连续执行。

JMM 还规定了上述 8 种基本操作,需要满足以下规则:

  • read 和 load 必须成对出现store 和 write 必须成对出现。即不允许一个变量从主内存读取了但工作内存不接受,或从工作内存发起回写了但主内存不接受的情况出现。
  • 不允许一个线程丢弃它的最近 assign 的操作,即变量在工作内存中改变了之后必须把变化同步到主内存中。
  • 不允许一个线程无原因的(没有发生过任何 assign 操作)把数据从工作内存同步回主内存中
  • 一个新的变量只能在主内存中诞生,不允许在工作内存中直接使用一个未被初始化(load 或 assign )的变量。换句话说,就是对一个变量实施 use 和 store 操作之前,必须先执行过了 load 或 assign 操作。
  • 一个变量在同一个时刻只允许一条线程对其进行 lock 操作,但 lock 操作可以被同一条线程重复执行多次,多次执行 lock 后,只有执行相同次数的 unlock 操作,变量才会被解锁。所以 lock 和 unlock 必须成对出现
  • 如果对一个变量执行 lock 操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量前,需要重新执行 load 或 assign 操作初始化变量的值。
  • 如果一个变量事先没有被 lock 操作锁定,则不允许对它执行 unlock 操作,也不允许去 unlock 一个被其他线程锁定的变量。
  • 对一个变量执行 unlock 操作之前,必须先把此变量同步到主内存中(执行 store 和 write 操作)

并发安全特性

并发最重要的问题是并发安全问题。所谓并发安全,是指保证程序的正确性,使得并发处理结果符合预期。

并发安全需要保证几个基本特性:

  • 可见性 - 是一个线程修改了某个共享变量,其状态能够立即被其他线程知晓,通常被解释为将线程本地状态反映到主内存上,volatile 就是负责保证可见性的。
  • 有序性 - 是保证线程内串行语义,避免指令重排等。
  • 原子性 - 简单说就是相关操作不会中途被其他线程干扰,一般通过互斥机制(加锁:sychronizedLock)实现。

而这三大特性,归根结底,是为了实现多线程的 数据一致性,使得程序在多线程并发,指令重排序优化的环境中能如预期运行。上文介绍了 Java 内存交互的 8 种基本操作,它们都保证可见性、有序性、原子性。

原子性

原子性即一个操作或者多个操作,要么全部执行(执行的过程不会被任何因素打断),要么就都不执行。即使在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程所干扰。

在 Java 中,为了保证原子性,提供了两个高级的字节码指令 monitorentermonitorexit。这两个字节码,在 Java 中对应的关键字就是 synchronized

因此,在 Java 中可以使用 synchronized 来保证方法和代码块内的操作是原子性的。

可见性

可见性是指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值

JMM 是通过 “变量修改后将新值同步回主内存变量读取前从主内存刷新变量值” 这种依赖主内存作为传递媒介的方式来实现的。

Java 实现多线程可见性的方式有:

  • volatile
  • synchronized
  • final

有序性

有序性规则表现在以下两种场景:线程内和线程间

  • 线程内 - 从某个线程的角度看方法的执行,指令会按照一种叫“串行”(as-if-serial)的方式执行,此种方式已经应用于顺序编程语言。
  • 线程间 - 这个线程“观察”到其他线程并发地执行非同步的代码时,由于指令重排序优化,任何代码都有可能交叉执行。唯一起作用的约束是:对于同步方法,同步块(synchronized 关键字修饰)以及 volatile 字段的操作仍维持相对有序。

在 Java 中,可以使用 synchronizedvolatile 来保证多线程之间操作的有序性。实现方式有所区别:

  • volatile 关键字会禁止指令重排序。
  • synchronized 关键字通过互斥保证同一时刻只允许一条线程操作。

Happens-Before

JMM 为程序中所有的操作定义了一个偏序关系,称之为 先行发生原则(Happens-Before)Happens-Before 是指 前面一个操作的结果对后续操作是可见的

1978 年,Lamport 在论文 Time, Clocks, and the Ordering of Events in a Distributed System译文解读 )中第一次提出了 Happens-Before,阐述了偏序关系(partial ordering)、逻辑时钟(Logical Clocks)概念,提出解决分布式系统中区分事件发生的时序问题的方法。Happens-Before 的语义是一种因果关系:如果 A 事件是导致 B 事件的起因,那么 A 事件一定是先于(Happens-Before)B 事件发生的。

Happens-Before 非常重要,它是判断数据是否存在竞争、线程是否安全的主要依据,依靠这个原则,我们可以通过几条规则一揽子地解决并发环境下两个操作间是否可能存在冲突的所有问题。

  • 程序顺序规则 - 在一个线程中,按照程序顺序,前面的操作 Happens-Before 于后续的任意操作。
  • 锁定规则 - 一个 unLock 操作 Happens-Before 于后面对同一个锁的 lock 操作。
  • volatile 变量规则 - 对一个 volatile 变量的写操作 Happens-Before 于后面对这个变量的读操作。
  • 线程启动规则 - Thread 对象的 start() 方法 Happens-Before 于此线程的每个一个动作。
  • 线程终止规则 - 线程中所有的操作都 Happens-Before 于线程的终止检测,我们可以通过 Thread.join() 方法是否结束、Thread.isAlive() 的返回值手段检测到线程已经终止执行。
  • 线程中断规则 - 对线程 interrupt() 方法的调用 Happens-Before 于被中断线程的代码检测到中断事件的发生,可以通过 Thread.interrupted() 方法检测到是否有中断发生。
  • 对象终结规则 - 一个对象的初始化完成 Happens-Before 于它的 finalize() 方法的开始。
  • 传递性 - 如果 A Happens-Before B,且 B Happens-Before C,那么 A Happens-Before C。

内存屏障

Java 中如何保证底层操作的有序性和可见性?可以通过内存屏障(memory barrier)。

内存屏障是被插入两个 CPU 指令之间的一种指令,用来禁止处理器指令发生重排序(像屏障一样),从而保障有序性的。另外,为了达到屏障的效果,它也会使处理器写入、读取值之前,将主内存的值写入高速缓存,清空无效队列,从而保障可见性

举个例子:

1
2
3
4
5
6
7
8
Store1;
Store2;
Load1;
StoreLoad; //内存屏障
Store3;
Load2;
Load3;
复制代码

对于上面的一组 CPU 指令(Store 表示写入指令,Load 表示读取指令),StoreLoad 屏障之前的 Store 指令无法与 StoreLoad 屏障之后的 Load 指令进行交换位置,即重排序。但是 StoreLoad 屏障之前和之后的指令是可以互换位置的,即 Store1 可以和 Store2 互换,Load2 可以和 Load3 互换。

常见有 4 种屏障

  • LoadLoad 屏障 - 对于这样的语句 Load1; LoadLoad; Load2,在 Load2 及后续读取操作要读取的数据被访问前,保证 Load1 要读取的数据被读取完毕。
  • StoreStore 屏障 - 对于这样的语句 Store1; StoreStore; Store2,在 Store2 及后续写入操作执行前,保证 Store1 的写入操作对其它处理器可见。
  • LoadStore 屏障 - 对于这样的语句 Load1; LoadStore; Store2,在 Store2 及后续写入操作被执行前,保证 Load1 要读取的数据被读取完毕。
  • StoreLoad 屏障 - 对于这样的语句 Store1; StoreLoad; Load2,在 Load2 及后续所有读取操作执行前,保证 Store1 的写入对所有处理器可见。它的开销是四种屏障中最大的(冲刷写缓冲器,清空无效化队列)。在大多数处理器的实现中,这个屏障是个万能屏障,兼具其它三种内存屏障的功能。

Java 中对内存屏障的使用在一般的代码中不太容易见到,常见的有 volatilesynchronized 关键字修饰的代码块(后面再展开介绍),还可以通过 Unsafe 这个类来使用内存屏障。

img

Synchronized 内存语义

synchronized 是 Java 中的关键字,是 利用锁的机制来实现互斥同步的

synchronized 可以保证在同一个时刻,只有一个线程可以执行某个方法或者某个代码块

  • 线程释放锁时内存语义:JMM 会把该线程对应的工作内存中的共享变量刷新到主内存中
  • 线程获取锁时内存语义:JMM 会把该线程对应的工作内存置为无效

如果不需要 LockReadWriteLock 所提供的高级同步特性,应该优先考虑使用 synchronized ,理由如下:

  • Java 1.6 以后,synchronized 做了大量的优化,其性能已经与 LockReadWriteLock 基本上持平。从趋势来看,Java 未来仍将继续优化 synchronized ,而不是 ReentrantLock
  • ReentrantLock 是 Oracle JDK 的 API,在其他版本的 JDK 中不一定支持;而 synchronized 是 JVM 的内置特性,所有 JDK 版本都提供支持。

synchronized 的应用

synchronized 有 3 种应用方式:

  • 同步实例方法 - 对于普通同步方法,锁是当前实例对象
  • 同步静态方法 - 对于静态同步方法,锁是当前类的 Class 对象
  • 同步代码块 - 对于同步方法块,锁是 synchonized 括号里配置的对象

【示例】synchronized 的使用语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Test {

// 修饰成员方法
synchronized void sync1() {
// 临界区
}

// 修饰静态方法
synchronized static void sync2() {
// 临界区
}

// 对象锁
Object obj = new Object();
// 修饰代码块,使用对象锁
void sync3() {
synchronized (obj) {
// 临界区
}
}

// 修饰代码块,使用类锁(Class)
void sync4() {
synchronized (Test.class) {
//临界区
}
}

}

用 synchronized 实现线程安全的计数器

我们先来看一个简单的示例,这段代码维护了一个计数器变量 count,并通过 get() 和 add() 分别实现了读写方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@NotThreadSafe
public class NotThreadSafeCounter {

private static int count = 0;

public int get() {
return count;
}

public void add() {
count++;
}

public static void main(String[] args) throws InterruptedException {
final int MAX = 100000;
NotThreadSafeCounter instance = new NotThreadSafeCounter();
Thread t1 = new Thread(() -> {
for (int i = 0; i < MAX; i++) {
instance.add();
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < MAX; i++) {
instance.add();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("count = " + instance.get());
}

}
// 输出:
// count = 117626
//

启动两个线程并行执行,期望最终值为 200000,但实际值为小于 200000 的随机数字。显然,上面的示例是线程不安全的。究其原因,在于 count++ 不是原子操作,不满足并发安全的原子性要求。

要解决此问题,可以用 synchronized 修饰方法, synchronized 可以保证同一时刻只有一个线程执行临界区的代码。

我们针对上面的示例来进行改造,将 add() 方法用 synchronized 修饰,如下所示。这下是不是就可以高枕无忧了呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
@NotThreadSafe
public class NotThreadSafeCounter2 {

private static int count = 0;

public int get() {
return count;
}

public synchronized void add() {
count++;
}
}

首先,add() 方法本身是线程安全的。但是,这个示例忽略了 get() 方法。因为 get() 方法未加锁,一个线程调用 add() 方法后,无法保证另一个线程调用 get() 时能立刻获取到更新后的结果,不满足并发安全的可见性要求。

如何彻底解决 get() 并发不安全的问题呢?很简单,就是 get() 方法也用 synchronized 修饰一下。最终的线程安全示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
@ThreadSafe
public class ThreadSafeCounter {

private int count = 0;

public synchronized long get() {
return count;
}

public synchronized void add() {
count++;
}
}

静态 synchronized 方法和非静态 synchronized 是否互斥

静态方法的同步是指同步在该方法所在的类对象上。因为在 JVM 中一个类只能对应一个类对象,所以同时只允许一个线程执行同一个类中的静态同步方法。

静态 synchronized 方法和非静态 synchronized 方法之间的调用互斥么?

答案是:不互斥,但可能存在并发问题!如果一个线程 A 调用一个实例对象的非静态 synchronized 方法;而线程 B 需要调用这个实例对象所属类的静态 synchronized 方法,是允许的,不会发生互斥现象,因为访问静态 synchronized 方法占用的锁是当前类的锁,而访问非静态 synchronized 方法占用的锁是当前实例对象锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
@ThreadSafe
public class ThreadSafeCounter2 {

private static int count = 0;

public synchronized long get() {
return count;
}

public synchronized static void add() {
count++;
}
}

上面这段代码实际上是用两个锁保护同一个资源。这个受保护的资源就是静态变量 count,两个锁分别是 this 和 ThreadSafeCounter2.class。我们可以用下面这幅图来形象描述这个关系。由于临界区 get() 和 add() 是用两个锁保护的,因此这两个临界区没有互斥关系,临界区 add() 对 value 的修改对临界区 get() 也没有可见性保证,这就导致并发问题了。

用 synchronized 保护多个资源

【示例】错误示例

1
2
3
4
5
6
7
8
9
10
11
class Account {
private int balance;
// 转账
synchronized void transfer(
Account target, int amt){
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}

在这段代码中,临界区内有两个资源,分别是转出账户的余额 this.balance 和转入账户的余额 target.balance,并且用的是一把锁 this,符合我们前面提到的,多个资源可以用一把锁来保护,这看上去完全正确呀。真的是这样吗?可惜,这个方案仅仅是看似正确,为什么呢?

问题就出在 this 这把锁上,this 这把锁可以保护自己的余额 this.balance,却保护不了别人的余额 target.balance,就像你不能用自家的锁来保护别人家的资产,也不能用自己的票来保护别人的座位一样。

应该保证使用的锁能覆盖所有受保护资源

【示例】正确姿势

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Account {
private Object lock;
private int balance;
private Account();
// 创建 Account 时传入同一个 lock 对象
public Account(Object lock) {
this.lock = lock;
}
// 转账
void transfer(Account target, int amt){
// 此处检查所有对象共享的锁
synchronized(lock) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}

这个办法确实能解决问题,但是有点小瑕疵,它要求在创建 Account 对象的时候必须传入同一个对象,如果创建 Account 对象时,传入的 lock 不是同一个对象,那可就惨了,会出现锁自家门来保护他家资产的荒唐事。在真实的项目场景中,创建 Account 对象的代码很可能分散在多个工程中,传入共享的 lock 真的很难。

上面的方案缺乏实践的可行性,我们需要更好的方案。还真有,就是用 Account.class 作为共享的锁。Account.class 是所有 Account 对象共享的,而且这个对象是 Java 虚拟机在加载 Account 类的时候创建的,所以我们不用担心它的唯一性。使用 Account.class 作为共享的锁,我们就无需在创建 Account 对象时传入了,代码更简单。

【示例】正确姿势

1
2
3
4
5
6
7
8
9
10
11
12
class Account {
private int balance;
// 转账
void transfer(Account target, int amt){
synchronized(Account.class) {
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}

synchronized 的原理

synchronized 代码块是由一对 monitorentermonitorexit 指令实现的,Monitor 对象是同步的基本实现单元。在 Java 6 之前,Monitor 的实现完全是依靠操作系统内部的互斥锁,因为需要进行用户态到内核态的切换,所以同步操作是一个无差别的重量级操作。

如果 synchronized 明确制定了对象参数,那就是这个对象的引用;如果没有明确指定,那就根据 synchronized 修饰的是实例方法还是静态方法,去对对应的对象实例或 Class 对象来作为锁对象。

synchronized 同步块对同一线程来说是可重入的,不会出现锁死问题。

synchronized 同步块是互斥的,即已进入的线程执行完成前,会阻塞其他试图进入的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void foo(Object lock) {
synchronized (lock) {
lock.hashCode();
}
}
// 上面的 Java 代码将编译为下面的字节码
public void foo(java.lang.Object);
Code:
0: aload_1
1: dup
2: astore_2
3: monitorenter
4: aload_1
5: invokevirtual java/lang/Object.hashCode:()I
8: pop
9: aload_2
10: monitorexit
11: goto 19
14: astore_3
15: aload_2
16: monitorexit
17: aload_3
18: athrow
19: return
Exception table:
from to target type
4 11 14 any
14 17 14 any

synchronized 在修饰同步代码块时,是由 monitorentermonitorexit 指令来实现同步的。进入 monitorenter 指令后,线程将持有 Monitor 对象,退出 monitorenter 指令后,线程将释放该 Monitor 对象。

synchronized 修饰同步方法时,会设置一个 ACC_SYNCHRONIZED 标志。当方法调用时,调用指令将会检查该方法是否被设置 ACC_SYNCHRONIZED 访问标志。如果设置了该标志,执行线程将先持有 Monitor 对象,然后再执行方法。在该方法运行期间,其它线程将无法获取到该 Mointor 对象,当方法执行完成后,再释放该 Monitor 对象。

每个对象实例都会有一个 MonitorMonitor 可以和对象一起创建、销毁。Monitor 是由 ObjectMonitor 实现,而 ObjectMonitor 是由 C++ 的 ObjectMonitor.hpp 文件实现。

当多个线程同时访问一段同步代码时,多个线程会先被存放在 EntryList 集合中,处于 block 状态的线程,都会被加入到该列表。接下来当线程获取到对象的 Monitor 时,Monitor 是依靠底层操作系统的 Mutex Lock 来实现互斥的,线程申请 Mutex 成功,则持有该 Mutex,其它线程将无法获取到该 Mutex。

如果线程调用 wait() 方法,就会释放当前持有的 Mutex,并且该线程会进入 WaitSet 集合中,等待下一次被唤醒。如果当前线程顺利执行完方法,也将释放 Mutex。

synchronized 的优化

Java 1.6 以后,synchronized 做了大量的优化,其性能已经与 LockReadWriteLock 基本上持平

在 JDK1.6 JVM 中,对象实例在堆内存中被分为了三个部分:对象头、实例数据和对齐填充。其中 Java 对象头由 Mark Word、指向类的指针以及数组长度三部分组成。

Mark Word 记录了对象和锁有关的信息。Mark Word 在 64 位 JVM 中的长度是 64bit,我们可以一起看下 64 位 JVM 的存储结构是怎么样的。如下图所示:

img

锁升级功能主要依赖于 Mark Word 中的锁标志位和释放偏向锁标志位,synchronized 同步锁就是从偏向锁开始的,随着竞争越来越激烈,偏向锁升级到轻量级锁,最终升级到重量级锁。

Java 1.6 引入了偏向锁和轻量级锁,从而让 synchronized 拥有了四个状态:

  • 无锁状态(unlocked)
  • 偏向锁状态(biasble)
  • 轻量级锁状态(lightweight locked)
  • 重量级锁状态(inflated)

当 JVM 检测到不同的竞争状况时,会自动切换到适合的锁实现。

当没有竞争出现时,默认会使用偏向锁。JVM 会利用 CAS 操作(compare and swap),在对象头上的 Mark Word 部分设置线程 ID,以表示这个对象偏向于当前线程,所以并不涉及真正的互斥锁。这样做的假设是基于在很多应用场景中,大部分对象生命周期中最多会被一个线程锁定,使用偏斜锁可以降低无竞争开销。

如果有另外的线程试图锁定某个已经被偏斜过的对象,JVM 就需要撤销(revoke)偏向锁,并切换到轻量级锁实现。轻量级锁依赖 CAS 操作 Mark Word 来试图获取锁,如果重试成功,就使用普通的轻量级锁;否则,进一步升级为重量级锁。

偏向锁

偏向锁的思想是偏向于第一个获取锁对象的线程,这个线程在之后获取该锁就不再需要进行同步操作,甚至连 CAS 操作也不再需要

img

轻量级锁

轻量级锁是相对于传统的重量级锁而言,它 使用 CAS 操作来避免重量级锁使用互斥量的开销。对于绝大部分的锁,在整个同步周期内都是不存在竞争的,因此也就不需要都使用互斥量进行同步,可以先采用 CAS 操作进行同步,如果 CAS 失败了再改用互斥量进行同步。

当尝试获取一个锁对象时,如果锁对象标记为 0|01,说明锁对象的锁未锁定(unlocked)状态。此时虚拟机在当前线程的虚拟机栈中创建 Lock Record,然后使用 CAS 操作将对象的 Mark Word 更新为 Lock Record 指针。如果 CAS 操作成功了,那么线程就获取了该对象上的锁,并且对象的 Mark Word 的锁标记变为 00,表示该对象处于轻量级锁状态。

img

锁消除 / 锁粗化

除了锁升级优化,Java 还使用了编译器对锁进行优化。

锁消除

锁消除是指对于被检测出不可能存在竞争的共享数据的锁进行消除

JIT 编译器在动态编译同步块的时候,借助了一种被称为逃逸分析的技术,来判断同步块使用的锁对象是否只能够被一个线程访问,而没有被发布到其它线程。

确认是的话,那么 JIT 编译器在编译这个同步块的时候不会生成 synchronized 所表示的锁的申请与释放的机器码,即消除了锁的使用。在 Java7 之后的版本就不需要手动配置了,该操作可以自动实现。

对于一些看起来没有加锁的代码,其实隐式的加了很多锁。例如下面的字符串拼接代码就隐式加了锁:

1
2
3
public static String concatString(String s1, String s2, String s3) {
return s1 + s2 + s3;
}

String 是一个不可变的类,编译器会对 String 的拼接自动优化。在 Java 1.5 之前,会转化为 StringBuffer 对象的连续 append() 操作:

1
2
3
4
5
6
7
public static String concatString(String s1, String s2, String s3) {
StringBuffer sb = new StringBuffer();
sb.append(s1);
sb.append(s2);
sb.append(s3);
return sb.toString();
}

每个 append() 方法中都有一个同步块。虚拟机观察变量 sb,很快就会发现它的动态作用域被限制在 concatString() 方法内部。也就是说,sb 的所有引用永远不会逃逸到 concatString() 方法之外,其他线程无法访问到它,因此可以进行消除。

锁粗化

锁粗化同理,就是在 JIT 编译器动态编译时,如果发现几个相邻的同步块使用的是同一个锁实例,那么 JIT 编译器将会把这几个同步块合并为一个大的同步块,从而避免一个线程“反复申请、释放同一个锁“所带来的性能开销。

如果一系列的连续操作都对同一个对象反复加锁和解锁,频繁的加锁操作就会导致性能损耗。

上一节的示例代码中连续的 append() 方法就属于这类情况。如果虚拟机探测到由这样的一串零碎的操作都对同一个对象加锁,将会把加锁的范围扩展(粗化)到整个操作序列的外部。对于上一节的示例代码就是扩展到第一个 append() 操作之前直至最后一个 append() 操作之后,这样只需要加锁一次就可以了。

自旋锁

互斥同步进入阻塞状态的开销都很大,应该尽量避免。在许多应用中,共享数据的锁定状态只会持续很短的一段时间。自旋锁的思想是让一个线程在请求一个共享数据的锁时执行忙循环(自旋)一段时间,如果在这段时间内能获得锁,就可以避免进入阻塞状态。

自旋锁虽然能避免进入阻塞状态从而减少开销,但是它需要进行忙循环操作占用 CPU 时间,它只适用于共享数据的锁定状态很短的场景。

在 Java 1.6 中引入了自适应的自旋锁。自适应意味着自旋的次数不再固定了,而是由前一次在同一个锁上的自旋次数及锁的拥有者的状态来决定。

synchronized 的误区

示例摘自:极客时间教程 - Java 业务开发常见错误 100 例

synchronized 使用范围不当导致的错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Interesting {

volatile int a = 1;
volatile int b = 1;

public static void main(String[] args) {
Interesting interesting = new Interesting();
new Thread(() -> interesting.add()).start();
new Thread(() -> interesting.compare()).start();
}

public synchronized void add() {
log.info("add start");
for (int i = 0; i < 10000; i++) {
a++;
b++;
}
log.info("add done");
}

public void compare() {
log.info("compare start");
for (int i = 0; i < 10000; i++) {
//a 始终等于 b 吗?
if (a < b) {
log.info("a:{},b:{},{}", a, b, a > b);
//最后的 a>b 应该始终是 false 吗?
}
}
log.info("compare done");
}

}

【输出】

1
2
3
4
16:05:25.541 [Thread-0] INFO io.github.dunwu.javacore.concurrent.sync.synchronized 使用范围不当 - add start
16:05:25.544 [Thread-0] INFO io.github.dunwu.javacore.concurrent.sync.synchronized 使用范围不当 - add done
16:05:25.544 [Thread-1] INFO io.github.dunwu.javacore.concurrent.sync.synchronized 使用范围不当 - compare start
16:05:25.544 [Thread-1] INFO io.github.dunwu.javacore.concurrent.sync.synchronized 使用范围不当 - compare done

之所以出现这种错乱,是因为两个线程是交错执行 add 和 compare 方法中的业务逻辑,而且这些业务逻辑不是原子性的:a++ 和 b++ 操作中可以穿插在 compare 方法的比较代码中;更需要注意的是,a<b 这种比较操作在字节码层面是加载 a、加载 b 和比较三步,代码虽然是一行但也不是原子性的。

所以,正确的做法应该是,为 add 和 compare 都加上方法锁,确保 add 方法执行时,compare 无法读取 a 和 b:

1
2
public synchronized void add()
public synchronized void compare()

所以,使用锁解决问题之前一定要理清楚,我们要保护的是什么逻辑,多线程执行的情况又是怎样的。

synchronized 保护对象不对导致的错误

加锁前要清楚锁和被保护的对象是不是一个层面的。

静态字段属于类,类级别的锁才能保护;而非静态字段属于类实例,实例级别的锁就可以保护。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class synchronized 错误使用示例 2 {

public static void main(String[] args) {
synchronized 错误使用示例 2 demo = new synchronized 错误使用示例 2();
System.out.println(demo.wrong(1000000));
System.out.println(demo.right(1000000));
}

public int wrong(int count) {
Data.reset();
IntStream.rangeClosed(1, count).parallel().forEach(i -> new Data().wrong());
return Data.getCounter();
}

public int right(int count) {
Data.reset();
IntStream.rangeClosed(1, count).parallel().forEach(i -> new Data().right());
return Data.getCounter();
}

private static class Data {

@Getter
private static int counter = 0;
private static Object locker = new Object();

public static int reset() {
counter = 0;
return counter;
}

public synchronized void wrong() {
counter++;
}

public void right() {
synchronized (locker) {
counter++;
}
}

}

}

wrong 方法中试图对一个静态对象加对象级别的 synchronized 锁,并不能保证线程安全。

锁粒度导致的问题

要尽可能的缩小加锁的范围,这可以提高并发吞吐。

如果精细化考虑了锁应用范围后,性能还无法满足需求的话,我们就要考虑另一个维度的粒度问题了,即:区分读写场景以及资源的访问冲突,考虑使用悲观方式的锁还是乐观方式的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class synchronized 锁粒度不当 {

public static void main(String[] args) {
Demo demo = new Demo();
demo.wrong();
demo.right();
}

private static class Demo {

private List<Integer> data = new ArrayList<>();

private void slow() {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
}
}

public int wrong() {
long begin = System.currentTimeMillis();
IntStream.rangeClosed(1, 1000).parallel().forEach(i -> {
synchronized (this) {
slow();
data.add(i);
}
});
log.info("took:{}", System.currentTimeMillis() - begin);
return data.size();
}

public int right() {
long begin = System.currentTimeMillis();
IntStream.rangeClosed(1, 1000).parallel().forEach(i -> {
slow();
synchronized (data) {
data.add(i);
}
});
log.info("took:{}", System.currentTimeMillis() - begin);
return data.size();
}

}

}

volatile 内存语义

volatile 是 JVM 提供的 最轻量级的同步机制

volatile 修饰的变量,具备以下特性:

  • 线程可见性
  • 禁止指令重排序
  • 不保证原子性

线程安全需要具备:可见性、有序性、原子性。然而,volatile 不保证原子性,因此它不能彻底地保证线程安全。这也正如其命名,volatile 的中文意思是不稳定的,易变的。

保证线程可见性

这里的可见性是指当一条线程修改了 volatile 变量的值,新值对于其他线程来说是可以立即得知的。而普通变量不能做到这一点,普通变量的值在线程间传递均需要通过主内存来完成。

线程写 volatile 变量的过程:

  1. 改变线程工作内存中 volatile 变量副本的值
  2. 将改变后的副本的值从工作内存刷新到主内存

线程读 volatile 变量的过程:

  1. 从主内存中读取 volatile 变量的最新值到线程的工作内存中
  2. 从工作内存中读取 volatile 变量的副本

注意:保证可见性不等同于 volatile 变量保证并发操作的安全性

在不符合以下两点的场景中,仍然要通过枷锁来保证原子性:

  • 运算结果并不依赖变量的当前值,或者能够确保只有单一的线程修改变量的值。
  • 变量不需要与其他状态变量共同参与不变约束。

但是如果多个线程同时把更新后的变量值同时刷新回主内存,可能导致得到的值不是预期结果:

举个例子: 定义 volatile int count = 0,2 个线程同时执行 count++ 操作,每个线程都执行 500 次,最终结果小于 1000,原因是每个线程执行 count++ 需要以下 3 个步骤:

  1. 线程从主内存读取最新的 count 的值
  2. 执行引擎把 count 值加 1,并赋值给线程工作内存
  3. 线程工作内存把 count 值保存到主内存 有可能某一时刻 2 个线程在步骤 1 读取到的值都是 100,执行完步骤 2 得到的值都是 101,最后刷新了 2 次 101 保存到主内存。

禁止指令重排序

观察加入 volatile 关键字和没有加入 volatile 关键字时所生成的汇编代码发现,加入 volatile 关键字时,会多出一个 lock 前缀指令

lock 前缀指令实际上相当于一个内存屏障(也成内存栅栏),内存屏障会提供 3 个功能:

  • 它确保指令重排序时不会把其后面的指令排到内存屏障之前的位置,也不会把前面的指令排到内存屏障的后面;即在执行到内存屏障这句指令时,在它前面的操作已经全部完成;
  • 它会强制将对缓存的修改操作立即写入主存;
  • 如果是写操作,它会导致其他 CPU 中对应的缓存行无效。

在 Java 中,Unsafe 类提供了三个开箱即用的内存屏障相关的方法,屏蔽了操作系统底层的差异:

1
2
3
public native void loadFence();
public native void storeFence();
public native void fullFence();

理论上来说,你通过这个三个方法也可以实现和 volatile 禁止重排序一样的效果,只是会麻烦一些。

下面我以一个常见的面试题为例讲解一下 volatile 关键字禁止指令重排序的效果。

【示例】双重校验锁实现单例模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Singleton {

private volatile static Singleton instance;

private Singleton() { }

public static Singleton getInstance() {
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}

}

instance 采用 volatile 关键字修饰也是很有必要的, instance = new Singleton(); 这段代码其实是分为三步执行:

  1. instance 分配内存空间
  2. 初始化 instance
  3. instance 指向分配的内存地址

但是由于 JVM 具有指令重排的特性,执行顺序有可能变成 1->3->2。指令重排在单线程环境下不会出现问题,但是在多线程环境下会导致一个线程获得还没有初始化的实例。例如,线程 T1 执行了 1 和 3,此时 T2 调用 getInstance() 后发现 instance 不为空,因此返回 instance,但此时 instance 还未被初始化。

volatile 不保证原子性

线程安全需要具备:可见性、有序性、原子性。然而,**volatile 不保证原子性,所以决定了它不能彻底地保证线程安全**。

我们通过下面的代码即可证明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class VolatileAtomicityDemo {
public volatile static int inc = 0;

public void increase() {
inc++;
}

public static void main(String[] args) throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
VolatileAtomicityDemo volatileAtomicityDemo = new VolatileAtomicityDemo();
for (int i = 0; i < 5; i++) {
threadPool.execute(() -> {
for (int j = 0; j < 500; j++) {
volatileAtomicityDemo.increase();
}
});
}
// 等待 1.5 秒,保证上面程序执行完成
Thread.sleep(1500);
System.out.println(inc);
threadPool.shutdown();
}
}

正常情况下,运行上面的代码理应输出 2500。但你真正运行了上面的代码之后,你会发现每次输出结果都小于 2500

为什么会出现这种情况呢?不是说好了,volatile 可以保证变量的可见性嘛!

也就是说,如果 volatile 能保证 inc++ 操作的原子性的话。每个线程中对 inc 变量自增完之后,其他线程可以立即看到修改后的值。5 个线程分别进行了 500 次操作,那么最终 inc 的值应该是 5*500=2500。

很多人会误认为自增操作 inc++ 是原子性的,实际上,inc++ 其实是一个复合操作,包括三步:

  1. 读取 inc 的值。
  2. 对 inc 加 1。
  3. 将 inc 的值写回内存。

volatile 是无法保证这三个操作是具有原子性的,有可能导致下面这种情况出现:

  1. 线程 1 对 inc 进行读取操作之后,还未对其进行修改。线程 2 又读取了 inc 的值并对其进行修改(+1),再将 inc 的值写回内存。
  2. 线程 2 操作完毕后,线程 1 对 inc 的值进行修改(+1),再将 inc 的值写回内存。

这也就导致两个线程分别对 inc 进行了一次自增操作后,inc 实际上只增加了 1。

其实,如果想要保证上面的代码运行正确也非常简单,利用 synchronizedLock 或者 AtomicInteger 都可以。

::: tabs#线程安全的计数器

@tab synchronized 改进

使用 synchronized 改进:

1
2
3
public synchronized void increase() {
inc++;
}

@tab AtomicInteger 改进

使用 AtomicInteger 改进:

1
2
3
4
5
public AtomicInteger inc = new AtomicInteger();

public void increase() {
inc.getAndIncrement();
}

@tab ReentrantLock 改进

使用 ReentrantLock 改进:

1
2
3
4
5
6
7
8
9
Lock lock = new ReentrantLock();
public void increase() {
lock.lock();
try {
inc++;
} finally {
lock.unlock();
}
}

:::

volatile 的应用

如果 volatile 变量修饰符使用恰当的话,它比 synchronized 的使用和执行成本更低,因为它不会引起线程上下文的切换和调度。但是,**volatile 无法替代 synchronized ,因为 volatile 无法保证操作的原子性**。

volatilesynchronized 的区别在于:

  • volatile 本质是在告诉 jvm 当前变量在寄存器(工作内存)中的值是不确定的,需要从主存中读取;synchronized 则是锁定当前变量,只有当前线程可以访问该变量,其他线程被阻塞住。
  • volatile 仅能修饰变量;synchronized 可以修饰方法和代码块。
  • volatile 仅能实现变量的修改可见性,不能保证原子性;而 synchronized 则可以保证变量的修改可见性和原子性
  • volatile 不会造成线程的阻塞;synchronized 可能会造成线程的阻塞。
  • volatile 标记的变量不会被编译器优化;synchronized 标记的变量可以被编译器优化。

通常来说,使用 volatile 必须具备以下 2 个条件

  • 对变量的写操作不依赖于当前值
  • 该变量没有包含在具有其他变量的表达式中

::: tabs#volatile 的应用

@tab 状态标记量

【示例】状态标记量

1
2
3
4
5
6
7
8
9
volatile boolean flag = false;

while(!flag) {
doSomething();
}

public void setFlag() {
flag = true;
}

@tab 双重锁实现线程安全的单例模式

【示例】双重锁实现线程安全的单例模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Singleton {
private volatile static Singleton instance = null;

private Singleton() {}

public static Singleton getInstance() {
if(instance==null) {
synchronized (Singleton.class) {
if(instance==null)
instance = new Singleton();
}
}
return instance;
}
}

:::

final 内存语义

我们知道,final 成员变量必须在声明的时候初始化或者在构造器中初始化,否则就会报编译错误。 final 关键字的可见性是指:final 字段一旦在声明时或构造器中初始化完成,则其他线程无需同步就能正确看见字段值。这是因为一旦初始化完成,final 变量的值立刻回写到主内存。

long 和 double 的特殊规则

JMM 要求 lockunlockreadloadassignusestorewrite 这 8 种操作都具有原子性,但是对于 64 位的数据类型(long 和 double),在模型中特别定义相对宽松的规定:允许虚拟机将没有被 volatile 修饰的 64 位数据的读写操作分为 2 次 32 位的操作来进行,即允许虚拟机可选择不保证 64 位数据类型的 loadstorereadwrite 这 4 个操作的原子性。由于这种非原子性,有可能导致其他线程读到同步未完成的“32 位的半个变量”的值。

不过实际开发中,Java 内存模型强烈建议虚拟机把 64 位数据的读写实现为具有原子性,目前各种平台下的商用虚拟机都选择把 64 位数据的读写操作作为原子操作来对待,因此我们在编写代码时一般不需要把用到的 longdouble 变量专门声明为 volatile

参考资料

Java 容器之 Stream

Stream 简介

在 Java8 中,Collection 新增了两个流方法,分别是 stream()parallelStream()

Stream 相当于高级版的 Iterator,他可以通过 Lambda 表达式对集合进行各种非常便利、高效的聚合操作(Aggregate Operation),或者大批量数据操作 (Bulk Data Operation)。

Stream 操作分类

官方将 Stream 中的操作分为两大类:中间操作(Intermediate operations)和终结操作(Terminal operations)。

中间操作又可以分为无状态(Stateless)与有状态(Stateful)操作,前者是指元素的处理不受之前元素的影响,后者是指该操作只有拿到所有元素之后才能继续下去。

终结操作又可以分为短路(Short-circuiting)与非短路(Unshort-circuiting)操作,前者是指遇到某些符合条件的元素就可以得到最终结果,后者是指必须处理完所有元素才能得到最终结果。

Stream 源码实现

img

BaseStreamStream 是最顶层的接口类。BaseStream 主要定义了流的基本接口方法,例如,spliterator、isParallel 等;Stream 则定义了一些流的常用操作方法,例如,map、filter 等。

Sink 接口是定义每个 Stream 操作之间关系的协议,他包含 begin()end()cancellationRequested()accpt() 四个方法。ReferencePipeline 最终会将整个 Stream 流操作组装成一个调用链,而这条调用链上的各个 Stream 操作的上下关系就是通过 Sink 接口协议来定义实现的。

ReferencePipeline 是一个结构类,他通过定义内部类组装了各种操作流。他定义了 HeadStatelessOpStatefulOp 三个内部类,实现了 BaseStreamStream 的接口方法。Head 类主要用来定义数据源操作,在初次调用 names.stream() 方法时,会加载 Head 对象,此时为加载数据源操作;接着加载的是中间操作,分别为无状态中间操作 StatelessOp 对象和有状态操作 StatefulOp 对象,此时的 Stage 并没有执行,而是通过 AbstractPipeline 生成了一个中间操作 Stage 链表;当我们调用终结操作时,会生成一个最终的 Stage,通过这个 Stage 触发之前的中间操作,从最后一个 Stage 开始,递归产生一个 Sink 链。

Stream 并行处理

Stream 处理数据的方式有两种,串行处理和并行处理。

4. 参考资料

Java I/O 之 简介

IO 即 Input/Output(输入和输出),指的是:计算机内存与外部设备之间拷贝数据的过程。由于 CPU 访问内存的速度远远高于外部设备,因此 CPU 是先把外部设备的数据读到内存里,然后再进行处理。

UNIX I/O 模型

UNIX 系统下的 I/O 模型有 5 种:

  • 同步阻塞 I/O
  • 同步非阻塞 I/O
  • I/O 多路复用
  • 信号驱动 I/O
  • 异步 I/O

如何去理解 UNIX I/O 模型,大致有以下两个维度:

  • 区分同步或异步(synchronous/asynchronous)。简单来说,同步是一种可靠的有序运行机制,当我们进行同步操作时,后续的任务是等待当前调用返回,才会进行下一步;而异步则相反,其他任务不需要等待当前调用返回,通常依靠事件、回调等机制来实现任务间次序关系。
  • 区分阻塞与非阻塞(blocking/non-blocking)。在进行阻塞操作时,当前线程会处于阻塞状态,无法从事其他任务,只有当条件就绪才能继续,比如 ServerSocket 新连接建立完毕,或数据读取、写入操作完成;而非阻塞则是不管 IO 操作是否结束,直接返回,相应操作在后台继续处理。

不能一概而论认为同步或阻塞就是低效,具体还要看应用和系统特征。

对于一个网络 I/O 通信过程,比如网络数据读取,会涉及两个对象,一个是调用这个 I/O 操作的用户线程,另外一个就是操作系统内核。一个进程的地址空间分为用户空间和内核空间,用户线程不能直接访问内核空间。

当用户线程发起 I/O 操作后,网络数据读取操作会经历两个步骤:

  • 用户线程等待内核将数据从网卡拷贝到内核空间。
  • 内核将数据从内核空间拷贝到用户空间。

各种 I/O 模型的区别就是:它们实现这两个步骤的方式是不一样的。

同步阻塞 I/O

用户线程发起 read 调用后就阻塞了,让出 CPU。内核等待网卡数据到来,把数据从网卡拷贝到内核空间,接着把数据拷贝到用户空间,再把用户线程叫醒。

img

同步非阻塞 I/O

用户线程不断的发起 read 调用,数据没到内核空间时,每次都返回失败,直到数据到了内核空间,这一次 read 调用后,在等待数据从内核空间拷贝到用户空间这段时间里,线程还是阻塞的,等数据到了用户空间再把线程叫醒。

img

I/O 多路复用

用户线程的读取操作分成两步了,线程先发起 select 调用,目的是问内核数据准备好了吗?等内核把数据准备好了,用户线程再发起 read 调用。在等待数据从内核空间拷贝到用户空间这段时间里,线程还是阻塞的。那为什么叫 I/O 多路复用呢?因为一次 select 调用可以向内核查多个数据通道(Channel)的状态,所以叫多路复用。

img

信号驱动 I/O

首先开启 Socket 的信号驱动 I/O 功能,并安装一个信号处理函数,进程继续运行并不阻塞。当数据准备好时,进程会收到一个 SIGIO 信号,可以在信号处理函数中调用 I/O 操作函数处理数据。信号驱动式 I/O 模型的优点是我们在数据报到达期间进程不会被阻塞,我们只要等待信号处理函数的通知即可

异步 I/O

用户线程发起 read 调用的同时注册一个回调函数,read 立即返回,等内核将数据准备好后,再调用指定的回调函数完成处理。在这个过程中,用户线程一直没有阻塞。

img

Java I/O 模型

在 Java 中,主要支持三种 IO 模型:

  • BIO(blocking IO)
  • NIO(non-blocking IO)
  • AIO(Asynchronous IO)

BIO(blocking IO)

BIO(blocking IO) 是同步阻塞 IO 模型。指的主要是传统的 java.io 包,它基于流模型实现。BIO 的数据传输采用同步、阻塞的方式,也就是说,在读取输入流或者写入输出流时,在读、写动作完成之前,线程会一直阻塞在那里。

如果要让 BIO 通信模型 能够同时处理多个客户端请求,就必须使用多线程(主要原因是socket.accept()socket.read()socket.write() 涉及的三个主要函数都是同步阻塞的),但会造成不必要的线程开销。不过可以通过 线程池机制 改善,线程池还可以让线程的创建和回收成本相对较低。

即使可以用线程池略微优化,但是会消耗宝贵的线程资源,并且在百万级并发场景下也撑不住。如果并发访问量增加会导致线程数急剧膨胀可能会导致线程堆栈溢出、创建新线程失败等问题,最终导致进程宕机或者僵死,不能对外提供服务。

NIO(non-blocking IO)

JDK 4 引入了 NIO,源码在 java.nio 包中。NIO(non-blocking IO) 属于 I/O 多路复用模型。NIO 提供了 ChannelSelectorBuffer 等新的抽象,可以构建多路复用的、同步非阻塞 IO 程序,同时提供了更接近操作系统底层的高性能数据操作方式。

NIO 具有以下优点:

  • 使用缓冲区优化读写流 - NIO 与传统 I/O 不同,它是基于块(Block)的,它以块为基本单位处理数据。在 NIO 中,最为重要的两个组件是缓冲区(Buffer)和通道(Channel)。Buffer 是一块连续的内存块,是 NIO 读写数据的缓冲。Buffer 可以将文件一次性读入内存再做后续处理,而传统的方式是边读文件边处理数据。Channel 表示缓冲数据的源头或者目的地,它用于读取缓冲或者写入数据,是访问缓冲的接口。
  • 使用 DirectBuffer 减少内存复制 - NIO 还提供了一个可以直接访问物理内存的类 DirectBuffer。普通的 Buffer 分配的是 JVM 堆内存,而 DirectBuffer 是直接分配物理内存。数据要输出到外部设备,必须先从用户空间复制到内核空间,再复制到输出设备,而 DirectBuffer 则是直接将步骤简化为从内核空间复制到外部设备,减少了数据拷贝。
  • 优化 I/O,避免阻塞 - 传统 I/O 的数据读写是在用户空间和内核空间来回复制,而内核空间的数据是通过操作系统层面的 I/O 接口从磁盘读取或写入。NIO 的 Channel 有自己的处理器,可以完成内核空间和磁盘之间的 I/O 操作。在 NIO 中,我们读取和写入数据都要通过 Channel,由于 Channel 是双向的,所以读、写可以同时进行。

AIO(Asynchronous IO)

AIO(Asynchronous IO) 即异步非阻塞 IO,指的是 JDK7 中,对 NIO 有了进一步的改进,也称为 NIO2,引入了异步非阻塞 IO 方式。异步 IO 操作基于事件和回调机制,可以简单理解为,应用操作直接返回,而不会阻塞在那里,当后台处理完成,操作系统会通知相应线程进行后续工作。

参考资料