Dunwu Blog

大道至简,知易行难

Tomcat 容器

Tomcat 实现热部署和热加载

  • 热加载的实现方式是 Web 容器启动一个后台线程,定期检测类文件的变化,如果有变化,就重新加载类,在这个过程中不会清空 Session ,一般用在开发环境。
  • 热部署原理类似,也是由后台线程定时检测 Web 应用的变化,但它会重新加载整个 Web 应用。这种方式会清空 Session,比热加载更加干净、彻底,一般用在生产环境。

Tomcat 通过开启后台线程,使得各个层次的容器组件都有机会完成一些周期性任务。Tomcat 是基于 ScheduledThreadPoolExecutor 实现周期性任务的:

1
2
3
4
5
bgFuture = exec.scheduleWithFixedDelay(
new ContainerBackgroundProcessor(),// 要执行的 Runnable
backgroundProcessorDelay, // 第一次执行延迟多久
backgroundProcessorDelay, // 之后每次执行间隔多久
TimeUnit.SECONDS); // 时间单位

第一个参数就是要周期性执行的任务类 ContainerBackgroundProcessor,它是一个 Runnable,同时也是 ContainerBase 的内部类,ContainerBase 是所有容器组件的基类,我们来回忆一下容器组件有哪些,有 Engine、Host、Context 和 Wrapper 等,它们具有父子关系。

ContainerBackgroundProcessor 实现

我们接来看 ContainerBackgroundProcessor 具体是如何实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected class ContainerBackgroundProcessor implements Runnable {

@Override
public void run() {
// 请注意这里传入的参数是 " 宿主类 " 的实例
processChildren(ContainerBase.this);
}

protected void processChildren(Container container) {
try {
//1. 调用当前容器的 backgroundProcess 方法。
container.backgroundProcess();

//2. 遍历所有的子容器,递归调用 processChildren,
// 这样当前容器的子孙都会被处理
Container[] children = container.findChildren();
for (int i = 0; i < children.length; i++) {
// 这里请你注意,容器基类有个变量叫做 backgroundProcessorDelay,如果大于 0,表明子容器有自己的后台线程,无需父容器来调用它的 processChildren 方法。
if (children[i].getBackgroundProcessorDelay() <= 0) {
processChildren(children[i]);
}
}
} catch (Throwable t) { ... }

上面的代码逻辑也是比较清晰的,首先 ContainerBackgroundProcessor 是一个 Runnable,它需要实现 run 方法,它的 run 很简单,就是调用了 processChildren 方法。这里有个小技巧,它把“宿主类”,也就是ContainerBase 的类实例当成参数传给了 run 方法

而在 processChildren 方法里,就做了两步:调用当前容器的 backgroundProcess 方法,以及递归调用子孙的 backgroundProcess 方法。请你注意 backgroundProcess 是 Container 接口中的方法,也就是说所有类型的容器都可以实现这个方法,在这个方法里完成需要周期性执行的任务。

这样的设计意味着什么呢?我们只需要在顶层容器,也就是 Engine 容器中启动一个后台线程,那么这个线程不但会执行 Engine 容器的周期性任务,它还会执行所有子容器的周期性任务

backgroundProcess 方法

上述代码都是在基类 ContainerBase 中实现的,那具体容器类需要做什么呢?其实很简单,如果有周期性任务要执行,就实现 backgroundProcess 方法;如果没有,就重用基类 ContainerBase 的方法。ContainerBase 的 backgroundProcess 方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void backgroundProcess() {

//1. 执行容器中 Cluster 组件的周期性任务
Cluster cluster = getClusterInternal();
if (cluster != null) {
cluster.backgroundProcess();
}

//2. 执行容器中 Realm 组件的周期性任务
Realm realm = getRealmInternal();
if (realm != null) {
realm.backgroundProcess();
}

//3. 执行容器中 Valve 组件的周期性任务
Valve current = pipeline.getFirst();
while (current != null) {
current.backgroundProcess();
current = current.getNext();
}

//4. 触发容器的 " 周期事件 ",Host 容器的监听器 HostConfig 就靠它来调用
fireLifecycleEvent(Lifecycle.PERIODIC_EVENT, null);
}

从上面的代码可以看到,不仅每个容器可以有周期性任务,每个容器中的其他通用组件,比如跟集群管理有关的 Cluster 组件、跟安全管理有关的 Realm 组件都可以有自己的周期性任务。

我在前面的专栏里提到过,容器之间的链式调用是通过 Pipeline-Valve 机制来实现的,从上面的代码你可以看到容器中的 Valve 也可以有周期性任务,并且被 ContainerBase 统一处理。

请你特别注意的是,在 backgroundProcess 方法的最后,还触发了容器的“周期事件”。我们知道容器的生命周期事件有初始化、启动和停止等,那“周期事件”又是什么呢?它跟生命周期事件一样,是一种扩展机制,你可以这样理解:

又一段时间过去了,容器还活着,你想做点什么吗?如果你想做点什么,就创建一个监听器来监听这个“周期事件”,事件到了我负责调用你的方法。

总之,有了 ContainerBase 中的后台线程和 backgroundProcess 方法,各种子容器和通用组件不需要各自弄一个后台线程来处理周期性任务,这样的设计显得优雅和整洁。

Tomcat 热加载

有了 ContainerBase 的周期性任务处理“框架”,作为具体容器子类,只需要实现自己的周期性任务就行。而 Tomcat 的热加载,就是在 Context 容器中实现的。Context 容器的 backgroundProcess 方法是这样实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void backgroundProcess() {

//WebappLoader 周期性的检查 WEB-INF/classes 和 WEB-INF/lib 目录下的类文件
Loader loader = getLoader();
if (loader != null) {
loader.backgroundProcess();
}

//Session 管理器周期性的检查是否有过期的 Session
Manager manager = getManager();
if (manager != null) {
manager.backgroundProcess();
}

// 周期性的检查静态资源是否有变化
WebResourceRoot resources = getResources();
if (resources != null) {
resources.backgroundProcess();
}

// 调用父类 ContainerBase 的 backgroundProcess 方法
super.backgroundProcess();
}

从上面的代码我们看到 Context 容器通过 WebappLoader 来检查类文件是否有更新,通过 Session 管理器来检查是否有 Session 过期,并且通过资源管理器来检查静态资源是否有更新,最后还调用了父类 ContainerBase 的 backgroundProcess 方法。

这里我们要重点关注,WebappLoader 是如何实现热加载的,它主要是调用了 Context 容器的 reload 方法,而 Context 的 reload 方法比较复杂,总结起来,主要完成了下面这些任务:

  1. 停止和销毁 Context 容器及其所有子容器,子容器其实就是 Wrapper,也就是说 Wrapper 里面 Servlet 实例也被销毁了。
  2. 停止和销毁 Context 容器关联的 Listener 和 Filter。
  3. 停止和销毁 Context 下的 Pipeline 和各种 Valve。
  4. 停止和销毁 Context 的类加载器,以及类加载器加载的类文件资源。
  5. 启动 Context 容器,在这个过程中会重新创建前面四步被销毁的资源。

在这个过程中,类加载器发挥着关键作用。一个 Context 容器对应一个类加载器,类加载器在销毁的过程中会把它加载的所有类也全部销毁。Context 容器在启动过程中,会创建一个新的类加载器来加载新的类文件。

在 Context 的 reload 方法里,并没有调用 Session 管理器的 distroy 方法,也就是说这个 Context 关联的 Session 是没有销毁的。你还需要注意的是,Tomcat 的热加载默认是关闭的,你需要在 conf 目录下的 Context.xml 文件中设置 reloadable 参数来开启这个功能,像下面这样:

1
<Context reloadable="true"/>

Tomcat 热部署

我们再来看看热部署,热部署跟热加载的本质区别是,热部署会重新部署 Web 应用,原来的 Context 对象会整个被销毁掉,因此这个 Context 所关联的一切资源都会被销毁,包括 Session。

那么 Tomcat 热部署又是由哪个容器来实现的呢?应该不是由 Context,因为热部署过程中 Context 容器被销毁了,那么这个重担就落在 Host 身上了,因为它是 Context 的父容器。

跟 Context 不一样,Host 容器并没有在 backgroundProcess 方法中实现周期性检测的任务,而是通过监听器 HostConfig 来实现的,HostConfig 就是前面提到的“周期事件”的监听器,那“周期事件”达到时,HostConfig 会做什么事呢?

1
2
3
4
5
6
public void lifecycleEvent(LifecycleEvent event) {
// 执行 check 方法。
if (event.getType().equals(Lifecycle.PERIODIC_EVENT)) {
check();
}
}

它执行了 check 方法,我们接着来看 check 方法里做了什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected void check() {

if (host.getAutoDeploy()) {
// 检查这个 Host 下所有已经部署的 Web 应用
DeployedApplication[] apps =
deployed.values().toArray(new DeployedApplication[0]);

for (int i = 0; i < apps.length; i++) {
// 检查 Web 应用目录是否有变化
checkResources(apps[i], false);
}

// 执行部署
deployApps();
}
}

其实 HostConfig 会检查 webapps 目录下的所有 Web 应用:

  • 如果原来 Web 应用目录被删掉了,就把相应 Context 容器整个销毁掉。
  • 是否有新的 Web 应用目录放进来了,或者有新的 WAR 包放进来了,就部署相应的 Web 应用。

因此 HostConfig 做的事情都是比较“宏观”的,它不会去检查具体类文件或者资源文件是否有变化,而是检查 Web 应用目录级别的变化。

Tomcat 的类加载机制

Tomcat 的自定义类加载器 WebAppClassLoader 打破了双亲委派机制,它首先自己尝试去加载某个类,如果找不到再代理给父类加载器,其目的是优先加载 Web 应用自己定义的类。具体实现就是重写 ClassLoader 的两个方法:findClass 和 loadClass。

findClass 方法

我们先来看看 findClass 方法的实现,为了方便理解和阅读,我去掉了一些细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public Class<?> findClass(String name) throws ClassNotFoundException {
...

Class<?> clazz = null;
try {
//1. 先在 Web 应用目录下查找类
clazz = findClassInternal(name);
} catch (RuntimeException e) {
throw e;
}

if (clazz == null) {
try {
//2. 如果在本地目录没有找到,交给父加载器去查找
clazz = super.findClass(name);
} catch (RuntimeException e) {
throw e;
}

//3. 如果父类也没找到,抛出 ClassNotFoundException
if (clazz == null) {
throw new ClassNotFoundException(name);
}

return clazz;
}

在 findClass 方法里,主要有三个步骤:

  1. 先在 Web 应用本地目录下查找要加载的类。
  2. 如果没有找到,交给父加载器去查找,它的父加载器就是上面提到的系统类加载器 AppClassLoader。
  3. 如何父加载器也没找到这个类,抛出 ClassNotFound 异常。

loadClass 方法

接着我们再来看 Tomcat 类加载器的 loadClass 方法的实现,同样我也去掉了一些细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {

synchronized (getClassLoadingLock(name)) {

Class<?> clazz = null;

//1. 先在本地 cache 查找该类是否已经加载过
clazz = findLoadedClass0(name);
if (clazz != null) {
if (resolve)
resolveClass(clazz);
return clazz;
}

//2. 从系统类加载器的 cache 中查找是否加载过
clazz = findLoadedClass(name);
if (clazz != null) {
if (resolve)
resolveClass(clazz);
return clazz;
}

// 3. 尝试用 ExtClassLoader 类加载器类加载,为什么?
ClassLoader javaseLoader = getJavaseClassLoader();
try {
clazz = javaseLoader.loadClass(name);
if (clazz != null) {
if (resolve)
resolveClass(clazz);
return clazz;
}
} catch (ClassNotFoundException e) {
// Ignore
}

// 4. 尝试在本地目录搜索 class 并加载
try {
clazz = findClass(name);
if (clazz != null) {
if (resolve)
resolveClass(clazz);
return clazz;
}
} catch (ClassNotFoundException e) {
// Ignore
}

// 5. 尝试用系统类加载器 (也就是 AppClassLoader) 来加载
try {
clazz = Class.forName(name, false, parent);
if (clazz != null) {
if (resolve)
resolveClass(clazz);
return clazz;
}
} catch (ClassNotFoundException e) {
// Ignore
}
}

//6. 上述过程都加载失败,抛出异常
throw new ClassNotFoundException(name);
}

loadClass 方法稍微复杂一点,主要有六个步骤:

  1. 先在本地 Cache 查找该类是否已经加载过,也就是说 Tomcat 的类加载器是否已经加载过这个类。
  2. 如果 Tomcat 类加载器没有加载过这个类,再看看系统类加载器是否加载过。
  3. 如果都没有,就让ExtClassLoader去加载,这一步比较关键,目的防止 Web 应用自己的类覆盖 JRE 的核心类。因为 Tomcat 需要打破双亲委派机制,假如 Web 应用里自定义了一个叫 Object 的类,如果先加载这个 Object 类,就会覆盖 JRE 里面的那个 Object 类,这就是为什么 Tomcat 的类加载器会优先尝试用 ExtClassLoader 去加载,因为 ExtClassLoader 会委托给 BootstrapClassLoader 去加载,BootstrapClassLoader 发现自己已经加载了 Object 类,直接返回给 Tomcat 的类加载器,这样 Tomcat 的类加载器就不会去加载 Web 应用下的 Object 类了,也就避免了覆盖 JRE 核心类的问题。
  4. 如果 ExtClassLoader 加载器加载失败,也就是说 JRE 核心类中没有这类,那么就在本地 Web 应用目录下查找并加载。
  5. 如果本地目录下没有这个类,说明不是 Web 应用自己定义的类,那么由系统类加载器去加载。这里请你注意,Web 应用是通过Class.forName调用交给系统类加载器的,因为Class.forName的默认加载器就是系统类加载器。
  6. 如果上述加载过程全部失败,抛出 ClassNotFound 异常。

从上面的过程我们可以看到,Tomcat 的类加载器打破了双亲委派机制,没有一上来就直接委托给父加载器,而是先在本地目录下加载,为了避免本地目录下的类覆盖 JRE 的核心类,先尝试用 JVM 扩展类加载器 ExtClassLoader 去加载。那为什么不先用系统类加载器 AppClassLoader 去加载?很显然,如果是这样的话,那就变成双亲委派机制了,这就是 Tomcat 类加载器的巧妙之处。

Tomcat 实现应用隔离

Tomcat 作为 Web 容器,需要解决以下问题:

  1. 如果在 Tomcat 中运行了两个 Web 应用程序,两个 Web 应用中有同名的 Servlet,但是功能不同,Tomcat 需要同时加载和管理这两个同名的 Servlet 类,保证它们不会冲突,因此 Web 应用之间的类需要隔离。
  2. 两个 Web 应用都依赖同一个第三方的 JAR 包,比如 Spring,那 Spring 的 JAR 包被加载到内存后,Tomcat 要保证这两个 Web 应用能够共享,也就是说 Spring 的 JAR 包只被加载一次,否则随着依赖的第三方 JAR 包增多,JVM 的内存会膨胀。
  3. 需要隔离 Tomcat 本身的类和 Web 应用的类。

img

WebAppClassLoader

针对第一个问题:

如果使用 JVM 默认 AppClassLoader 来加载 Web 应用,AppClassLoader 只能加载一个 Servlet 类,在加载第二个同名 Servlet 类时,AppClassLoader 会返回第一个 Servlet 类的 Class 实例,这是因为在 AppClassLoader 看来,同名的 Servlet 类只被加载一次。

Tomcat 的解决方案是自定义一个类加载器 WebAppClassLoader, 并且给每个 Web 应用创建一个类加载器实例。我们知道,Context 容器组件对应一个 Web 应用,因此,每个 Context 容器负责创建和维护一个 WebAppClassLoader 加载器实例。这背后的原理是,不同的加载器实例加载的类被认为是不同的类,即使它们的类名相同。这就相当于在 Java 虚拟机内部创建了一个个相互隔离的 Java 类空间,每一个 Web 应用都有自己的类空间,Web 应用之间通过各自的类加载器互相隔离。

SharedClassLoader

针对第二个问题:

本质需求是两个 Web 应用之间怎么共享库类,并且不能重复加载相同的类。我们知道,在双亲委派机制里,各个子加载器都可以通过父加载器去加载类,那么把需要共享的类放到父加载器的加载路径下不就行了吗,应用程序也正是通过这种方式共享 JRE 的核心类。因此 Tomcat 的设计者又加了一个类加载器 SharedClassLoader,作为 WebAppClassLoader 的父加载器,专门来加载 Web 应用之间共享的类。如果 WebAppClassLoader 自己没有加载到某个类,就会委托父加载器 SharedClassLoader 去加载这个类,SharedClassLoader 会在指定目录下加载共享类,之后返回给 WebAppClassLoader,这样共享的问题就解决了。

CatalinaClassloader

如何隔离 Tomcat 本身的类和 Web 应用的类?

要共享可以通过父子关系,要隔离那就需要兄弟关系了。兄弟关系就是指两个类加载器是平行的,它们可能拥有同一个父加载器,但是两个兄弟类加载器加载的类是隔离的。基于此 Tomcat 又设计一个类加载器 CatalinaClassloader,专门来加载 Tomcat 自身的类。这样设计有个问题,那 Tomcat 和各 Web 应用之间需要共享一些类时该怎么办呢?

CommonClassLoader

老办法,还是再增加一个 CommonClassLoader,作为 CatalinaClassloader 和 SharedClassLoader 的父加载器。CommonClassLoader 能加载的类都可以被 CatalinaClassLoader 和 SharedClassLoader 使用,而 CatalinaClassLoader 和 SharedClassLoader 能加载的类则与对方相互隔离。WebAppClassLoader 可以使用 SharedClassLoader 加载到的类,但各个 WebAppClassLoader 实例之间相互隔离。

Tomcat 实现 Servlet 规范

Servlet 容器最重要的任务就是创建 Servlet 的实例并且调用 Servlet。

一个 Web 应用里往往有多个 Servlet,而在 Tomcat 中一个 Web 应用对应一个 Context 容器,也就是说一个 Context 容器需要管理多个 Servlet 实例。但 Context 容器并不直接持有 Servlet 实例,而是通过子容器 Wrapper 来管理 Servlet,你可以把 Wrapper 容器看作是 Servlet 的包装。

为什么需要 Wrapper 呢?Context 容器直接维护一个 Servlet 数组不就行了吗?这是因为 Servlet 不仅仅是一个类实例,它还有相关的配置信息,比如它的 URL 映射、它的初始化参数,因此设计出了一个包装器,把 Servlet 本身和它相关的数据包起来,没错,这就是面向对象的思想。

除此以外,Servlet 规范中还有两个重要特性:Listener 和 Filter,Tomcat 也需要创建它们的实例,并在合适的时机去调用它们的方法。

Servlet 管理

Tomcat 是用 Wrapper 容器来管理 Servlet 的,那 Wrapper 容器具体长什么样子呢?我们先来看看它里面有哪些关键的成员变量:

1
protected volatile Servlet instance = null;

它拥有一个 Servlet 实例,并且 Wrapper 通过 loadServlet 方法来实例化 Servlet。为了方便你阅读,我简化了代码:

1
2
3
4
5
6
7
8
9
10
11
public synchronized Servlet loadServlet() throws ServletException {
Servlet servlet;

//1. 创建一个 Servlet 实例
servlet = (Servlet) instanceManager.newInstance(servletClass);

//2. 调用了 Servlet 的 init 方法,这是 Servlet 规范要求的
initServlet(servlet);

return servlet;
}

其实 loadServlet 主要做了两件事:创建 Servlet 的实例,并且调用 Servlet 的 init 方法,因为这是 Servlet 规范要求的。

那接下来的问题是,什么时候会调到这个 loadServlet 方法呢?为了加快系统的启动速度,我们往往会采取资源延迟加载的策略,Tomcat 也不例外,默认情况下 Tomcat 在启动时不会加载你的 Servlet,除非你把 Servlet 的loadOnStartup参数设置为true

这里还需要你注意的是,虽然 Tomcat 在启动时不会创建 Servlet 实例,但是会创建 Wrapper 容器,就好比尽管枪里面还没有子弹,先把枪造出来。那子弹什么时候造呢?是真正需要开枪的时候,也就是说有请求来访问某个 Servlet 时,这个 Servlet 的实例才会被创建。

那 Servlet 是被谁调用的呢?我们回忆一下专栏前面提到过 Tomcat 的 Pipeline-Valve 机制,每个容器组件都有自己的 Pipeline,每个 Pipeline 中有一个 Valve 链,并且每个容器组件有一个 BasicValve(基础阀)。Wrapper 作为一个容器组件,它也有自己的 Pipeline 和 BasicValve,Wrapper 的 BasicValve 叫 StandardWrapperValve

你可以想到,当请求到来时,Context 容器的 BasicValve 会调用 Wrapper 容器中 Pipeline 中的第一个 Valve,然后会调用到 StandardWrapperValve。我们先来看看它的 invoke 方法是如何实现的,同样为了方便你阅读,我简化了代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
public final void invoke(Request request, Response response) {

//1. 实例化 Servlet
servlet = wrapper.allocate();

//2. 给当前请求创建一个 Filter 链
ApplicationFilterChain filterChain =
ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);

//3. 调用这个 Filter 链,Filter 链中的最后一个 Filter 会调用 Servlet
filterChain.doFilter(request.getRequest(), response.getResponse());

}

StandardWrapperValve 的 invoke 方法比较复杂,去掉其他异常处理的一些细节,本质上就是三步:

  • 第一步,创建 Servlet 实例;
  • 第二步,给当前请求创建一个 Filter 链;
  • 第三步,调用这个 Filter 链。

你可能会问,为什么需要给每个请求创建一个 Filter 链?这是因为每个请求的请求路径都不一样,而 Filter 都有相应的路径映射,因此不是所有的 Filter 都需要来处理当前的请求,我们需要根据请求的路径来选择特定的一些 Filter 来处理。

第二个问题是,为什么没有看到调到 Servlet 的 service 方法?这是因为 Filter 链的 doFilter 方法会负责调用 Servlet,具体来说就是 Filter 链中的最后一个 Filter 会负责调用 Servlet。

接下来我们来看 Filter 的实现原理。

Filter 管理

我们知道,跟 Servlet 一样,Filter 也可以在web.xml文件里进行配置,不同的是,Filter 的作用域是整个 Web 应用,因此 Filter 的实例是在 Context 容器中进行管理的,Context 容器用 Map 集合来保存 Filter。

1
private Map<String, FilterDef> filterDefs = new HashMap<>();

那上面提到的 Filter 链又是什么呢?Filter 链的存活期很短,它是跟每个请求对应的。一个新的请求来了,就动态创建一个 FIlter 链,请求处理完了,Filter 链也就被回收了。理解它的原理也非常关键,我们还是来看看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public final class ApplicationFilterChain implements FilterChain {

//Filter 链中有 Filter 数组,这个好理解
private ApplicationFilterConfig[] filters = new ApplicationFilterConfig[0];

//Filter 链中的当前的调用位置
private int pos = 0;

// 总共有多少了 Filter
private int n = 0;

// 每个 Filter 链对应一个 Servlet,也就是它要调用的 Servlet
private Servlet servlet = null;

public void doFilter(ServletRequest req, ServletResponse res) {
internalDoFilter(request,response);
}

private void internalDoFilter(ServletRequest req,
ServletResponse res){

// 每个 Filter 链在内部维护了一个 Filter 数组
if (pos < n) {
ApplicationFilterConfig filterConfig = filters[pos++];
Filter filter = filterConfig.getFilter();

filter.doFilter(request, response, this);
return;
}

servlet.service(request, response);

}

从 ApplicationFilterChain 的源码我们可以看到几个关键信息:

  • Filter 链中除了有 Filter 对象的数组,还有一个整数变量 pos,这个变量用来记录当前被调用的 Filter 在数组中的位置。
  • Filter 链中有个 Servlet 实例,这个好理解,因为上面提到了,每个 Filter 链最后都会调到一个 Servlet。
  • Filter 链本身也实现了 doFilter 方法,直接调用了一个内部方法 internalDoFilter。
  • internalDoFilter 方法的实现比较有意思,它做了一个判断,如果当前 Filter 的位置小于 Filter 数组的长度,也就是说 Filter 还没调完,就从 Filter 数组拿下一个 Filter,调用它的 doFilter 方法。否则,意味着所有 Filter 都调到了,就调用 Servlet 的 service 方法。

但问题是,方法体里没看到循环,谁在不停地调用 Filter 链的 doFIlter 方法呢?Filter 是怎么依次调到的呢?

答案是Filter 本身的 doFilter 方法会调用 Filter 链的 doFilter 方法,我们还是来看看代码就明白了:

1
2
3
4
5
6
7
8
9
public void doFilter(ServletRequest request, ServletResponse response,
FilterChain chain){

...

// 调用 Filter 的方法
chain.doFilter(request, response);

}

注意 Filter 的 doFilter 方法有个关键参数 FilterChain,就是 Filter 链。并且每个 Filter 在实现 doFilter 时,必须要调用 Filter 链的 doFilter 方法,而 Filter 链中保存当前 FIlter 的位置,会调用下一个 FIlter 的 doFilter 方法,这样链式调用就完成了。

Filter 链跟 Tomcat 的 Pipeline-Valve 本质都是责任链模式,但是在具体实现上稍有不同,你可以细细体会一下。

Listener 管理

我们接着聊 Servlet 规范里 Listener。跟 Filter 一样,Listener 也是一种扩展机制,你可以监听容器内部发生的事件,主要有两类事件:

  • 第一类是生命状态的变化,比如 Context 容器启动和停止、Session 的创建和销毁。
  • 第二类是属性的变化,比如 Context 容器某个属性值变了、Session 的某个属性值变了以及新的请求来了等。

我们可以在web.xml配置或者通过注解的方式来添加监听器,在监听器里实现我们的业务逻辑。对于 Tomcat 来说,它需要读取配置文件,拿到监听器类的名字,实例化这些类,并且在合适的时机调用这些监听器的方法。

Tomcat 是通过 Context 容器来管理这些监听器的。Context 容器将两类事件分开来管理,分别用不同的集合来存放不同类型事件的监听器:

1
2
3
4
5
// 监听属性值变化的监听器
private List<Object> applicationEventListenersList = new CopyOnWriteArrayList<>();

// 监听生命事件的监听器
private Object applicationLifecycleListenersObjects[] = new Object[0];

剩下的事情就是触发监听器了,比如在 Context 容器的启动方法里,就触发了所有的 ServletContextListener:

1
2
3
4
5
6
7
8
9
10
11
12
//1. 拿到所有的生命周期监听器
Object instances[] = getApplicationLifecycleListeners();

for (int i = 0; i < instances.length; i++) {
//2. 判断 Listener 的类型是不是 ServletContextListener
if (!(instances[i] instanceof ServletContextListener))
continue;

//3. 触发 Listener 的方法
ServletContextListener lr = (ServletContextListener) instances[i];
lr.contextInitialized(event);
}

需要注意的是,这里的 ServletContextListener 接口是一种留给用户的扩展机制,用户可以实现这个接口来定义自己的监听器,监听 Context 容器的启停事件。Spring 就是这么做的。ServletContextListener 跟 Tomcat 自己的生命周期事件 LifecycleListener 是不同的。LifecycleListener 定义在生命周期管理组件中,由基类 LifeCycleBase 统一管理。

Tomcat 支持异步 Servlet

异步示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@WebServlet(urlPatterns = {"/async"}, asyncSupported = true)
public class AsyncServlet extends HttpServlet {

//Web 应用线程池,用来处理异步 Servlet
ExecutorService executor = Executors.newSingleThreadExecutor();

public void service(HttpServletRequest req, HttpServletResponse resp) {
//1. 调用 startAsync 或者异步上下文
final AsyncContext ctx = req.startAsync();

// 用线程池来执行耗时操作
executor.execute(new Runnable() {

@Override
public void run() {

// 在这里做耗时的操作
try {
ctx.getResponse().getWriter().println("Handling Async Servlet");
} catch (IOException e) {}

//3. 异步 Servlet 处理完了调用异步上下文的 complete 方法
ctx.complete();
}

});
}
}

有三个要点:

  1. 通过注解的方式来注册 Servlet,除了 @WebServlet 注解,还需要加上 asyncSupported=true 的属性,表明当前的 Servlet 是一个异步 Servlet。
  2. Web 应用程序需要调用 Request 对象的 startAsync 方法来拿到一个异步上下文 AsyncContext。这个上下文保存了请求和响应对象。
  3. Web 应用需要开启一个新线程来处理耗时的操作,处理完成后需要调用 AsyncContext 的 complete 方法。目的是告诉 Tomcat,请求已经处理完成。

这里请你注意,虽然异步 Servlet 允许用更长的时间来处理请求,但是也有超时限制的,默认是 30 秒,如果 30 秒内请求还没处理完,Tomcat 会触发超时机制,向浏览器返回超时错误,如果这个时候你的 Web 应用再调用ctx.complete方法,会得到一个 IllegalStateException 异常。

异步 Servlet 原理

通过上面的例子,相信你对 Servlet 的异步实现有了基本的理解。要理解 Tomcat 在这个过程都做了什么事情,关键就是要弄清楚req.startAsync方法和ctx.complete方法都做了什么。

startAsync 方法

startAsync 方法其实就是创建了一个异步上下文 AsyncContext 对象,AsyncContext 对象的作用是保存请求的中间信息,比如 Request 和 Response 对象等上下文信息。你来思考一下为什么需要保存这些信息呢?

这是因为 Tomcat 的工作线程在Request.startAsync调用之后,就直接结束回到线程池中了,线程本身不会保存任何信息。也就是说一个请求到服务端,执行到一半,你的 Web 应用正在处理,这个时候 Tomcat 的工作线程没了,这就需要有个缓存能够保存原始的 Request 和 Response 对象,而这个缓存就是 AsyncContext。

有了 AsyncContext,你的 Web 应用通过它拿到 request 和 response 对象,拿到 Request 对象后就可以读取请求信息,请求处理完了还需要通过 Response 对象将 HTTP 响应发送给浏览器。

除了创建 AsyncContext 对象,startAsync 还需要完成一个关键任务,那就是告诉 Tomcat 当前的 Servlet 处理方法返回时,不要把响应发到浏览器,因为这个时候,响应还没生成呢;并且不能把 Request 对象和 Response 对象销毁,因为后面 Web 应用还要用呢。

在 Tomcat 中,负责 flush 响应数据的是 CoyoteAdaptor,它还会销毁 Request 对象和 Response 对象,因此需要通过某种机制通知 CoyoteAdaptor,具体来说是通过下面这行代码:

1
this.request.getCoyoteRequest().action(ActionCode.ASYNC_START, this);

你可以把它理解为一个 Callback,在这个 action 方法里设置了 Request 对象的状态,设置它为一个异步 Servlet 请求。

我们知道连接器是调用 CoyoteAdapter 的 service 方法来处理请求的,而 CoyoteAdapter 会调用容器的 service 方法,当容器的 service 方法返回时,CoyoteAdapter 判断当前的请求是不是异步 Servlet 请求,如果是,就不会销毁 Request 和 Response 对象,也不会把响应信息发到浏览器。你可以通过下面的代码理解一下,这是 CoyoteAdapter 的 service 方法,我对它进行了简化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res) {

// 调用容器的 service 方法处理请求
connector.getService().getContainer().getPipeline().
getFirst().invoke(request, response);

// 如果是异步 Servlet 请求,仅仅设置一个标志,
// 否则说明是同步 Servlet 请求,就将响应数据刷到浏览器
if (request.isAsync()) {
async = true;
} else {
request.finishRequest();
response.finishResponse();
}

// 如果不是异步 Servlet 请求,就销毁 Request 对象和 Response 对象
if (!async) {
request.recycle();
response.recycle();
}
}

接下来,当 CoyoteAdaptor 的 service 方法返回到 ProtocolHandler 组件时,ProtocolHandler 判断返回值,如果当前请求是一个异步 Servlet 请求,它会把当前 Socket 的协议处理者 Processor 缓存起来,将 SocketWrapper 对象和相应的 Processor 存到一个 Map 数据结构里。

1
private final Map<S,Processor> connections = new ConcurrentHashMap<>();

之所以要缓存是因为这个请求接下来还要接着处理,还是由原来的 Processor 来处理,通过 SocketWrapper 就能从 Map 里找到相应的 Processor。

complete 方法

接着我们再来看关键的ctx.complete方法,当请求处理完成时,Web 应用调用这个方法。那么这个方法做了些什么事情呢?最重要的就是把响应数据发送到浏览器。

这件事情不能由 Web 应用线程来做,也就是说ctx.complete方法不能直接把响应数据发送到浏览器,因为这件事情应该由 Tomcat 线程来做,但具体怎么做呢?

我们知道,连接器中的 Endpoint 组件检测到有请求数据达到时,会创建一个 SocketProcessor 对象交给线程池去处理,因此 Endpoint 的通信处理和具体请求处理在两个线程里运行。

在异步 Servlet 的场景里,Web 应用通过调用ctx.complete方法时,也可以生成一个新的 SocketProcessor 任务类,交给线程池处理。对于异步 Servlet 请求来说,相应的 Socket 和协议处理组件 Processor 都被缓存起来了,并且这些对象都可以通过 Request 对象拿到。

讲到这里,你可能已经猜到ctx.complete是如何实现的了:

1
2
3
4
5
6
7
8
public void complete() {
// 检查状态合法性,我们先忽略这句
check();

// 调用 Request 对象的 action 方法,其实就是通知连接器,这个异步请求处理完了
request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE, null);

}

我们可以看到 complete 方法调用了 Request 对象的 action 方法。而在 action 方法里,则是调用了 Processor 的 processSocketEvent 方法,并且传入了操作码 OPEN_READ。

1
2
3
4
5
6
7
case ASYNC_COMPLETE: {
clearDispatches();
if (asyncStateMachine.asyncComplete()) {
processSocketEvent(SocketEvent.OPEN_READ, true);
}
break;
}

我们接着看 processSocketEvent 方法,它调用 SocketWrapper 的 processSocket 方法:

1
2
3
4
5
6
protected void processSocketEvent(SocketEvent event, boolean dispatch) {
SocketWrapperBase<?> socketWrapper = getSocketWrapper();
if (socketWrapper != null) {
socketWrapper.processSocket(event, dispatch);
}
}

而 SocketWrapper 的 processSocket 方法会创建 SocketProcessor 任务类,并通过 Tomcat 线程池来处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {

if (socketWrapper == null) {
return false;
}

SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
// 线程池运行
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
}

请你注意 createSocketProcessor 函数的第二个参数是 SocketEvent,这里我们传入的是 OPEN_READ。通过这个参数,我们就能控制 SocketProcessor 的行为,因为我们不需要再把请求发送到容器进行处理,只需要向浏览器端发送数据,并且重新在这个 Socket 上监听新的请求就行了。

参考资料

Tomcat 优化

Tomcat 启动优化

如果 Tomcat 启动比较慢,可以考虑一些优化点

清理 Tomcat

  • 清理不必要的 Web 应用:首先我们要做的是删除掉 webapps 文件夹下不需要的工程,一般是 host-manager、example、doc 等这些默认的工程,可能还有以前添加的但现在用不着的工程,最好把这些全都删除掉。
  • 清理 XML 配置文件:Tomcat 在启动时会解析所有的 XML 配置文件,解析 XML 较为耗时,所以应该尽量保持配置文件的简洁。
  • 清理 JAR 文件:JVM 的类加载器在加载类时,需要查找每一个 JAR 文件,去找到所需要的类。如果删除了不需要的 JAR 文件,查找的速度就会快一些。这里请注意:Web 应用中的 lib 目录下不应该出现 Servlet API 或者 Tomcat 自身的 JAR,这些 JAR 由 Tomcat 负责提供。
  • 清理其他文件:及时清理日志,删除 logs 文件夹下不需要的日志文件。同样还有 work 文件夹下的 catalina 文件夹,它其实是 Tomcat 把 JSP 转换为 Class 文件的工作目录。有时候我们也许会遇到修改了代码,重启了 Tomcat,但是仍没效果,这时候便可以删除掉这个文件夹,Tomcat 下次启动的时候会重新生成。

禁止 Tomcat TLD 扫描

Tomcat 为了支持 JSP,在应用启动的时候会扫描 JAR 包里面的 TLD 文件,加载里面定义的标签库。所以在 Tomcat 的启动日志里,你可能会碰到这种提示:

At least one JAR was scanned for TLDs yet contained no TLDs. Enable debug logging for this logger for a complete list of JARs that were scanned but no TLDs were found in them. Skipping unneeded JARs during scanning can improve startup time and JSP compilation time.

Tomcat 的意思是,我扫描了你 Web 应用下的 JAR 包,发现 JAR 包里没有 TLD 文件。我建议配置一下 Tomcat 不要去扫描这些 JAR 包,这样可以提高 Tomcat 的启动速度,并节省 JSP 编译时间。

如何配置不去扫描这些 JAR 包呢,这里分两种情况:

  • 如果你的项目没有使用 JSP 作为 Web 页面模板,而是使用 Velocity 之类的模板引擎,你完全可以把 TLD 扫描禁止掉。方法是,找到 Tomcat 的conf/目录下的context.xml文件,在这个文件里 Context 标签下,加上JarScannerJarScanFilter子标签,像下面这样。

    1
    2
    3
    4
    5
    <Context>
    <JarScanner >
    <JarScanFilter defaultTldScan="true" defaultpluggabilityScan="true" />
    </JarScanner>
    </Context>
  • 如果你的项目使用了 JSP 作为 Web 页面模块,意味着 TLD 扫描无法避免,但是我们可以通过配置来告诉 Tomcat,只扫描那些包含 TLD 文件的 JAR 包。方法是,找到 Tomcat 的conf/目录下的catalina.properties文件,在这个文件里的 jarsToSkip 配置项中,加上你的 JAR 包。

    1
    tomcat.util.scan.StandardJarScanFilter.jarsToSkip=xxx.jar

关闭 WebSocket 支持

Tomcat 会扫描 WebSocket 注解的 API 实现,比如 @ServerEndpoint 注解的类。如果不需要使用 WebSockets 就可以关闭它。具体方法是,找到 Tomcat 的 conf/ 目录下的 context.xml 文件,给 Context 标签加一个 containerSciFilter 的属性:

1
2
3
<Context containerSciFilter="org.apache.tomcat.websocket.server.WsSci">
...
</Context>

更进一步,如果你不需要 WebSockets 这个功能,你可以把 Tomcat lib 目录下的 websocket-api.jartomcat-websocket.jar 这两个 JAR 文件删除掉,进一步提高性能。

关闭 JSP 支持

如果不需要使用 JSP,可以关闭 JSP 功能:

1
2
3
<Context containerSciFilter="org.apache.jasper.servlet.JasperInitializer">
...
</Context>

如果要同时关闭 WebSocket 和 Jsp,可以这样配置:

1
2
3
<Context containerSciFilter="org.apache.tomcat.websocket.server.WsSci | org.apache.jasper.servlet.JasperInitializer">
...
</Context>

禁止扫描 Servlet 注解

Servlet 3.0 引入了注解 Servlet,Tomcat 为了支持这个特性,会在 Web 应用启动时扫描你的类文件,因此如果你没有使用 Servlet 注解这个功能,可以告诉 Tomcat 不要去扫描 Servlet 注解。具体配置方法是,在你的 Web 应用的web.xml文件中,设置<web-app>元素的属性metadata-complete="true",像下面这样。

1
2
<web-app metadata-complete="true">
</web-app>

metadata-complete 的意思是,web.xml 里配置的 Servlet 是完整的,不需要再去库类中找 Servlet 的定义。

配置 Web-Fragment 扫描

Servlet 3.0 还引入了“Web 模块部署描述符片段”的 web-fragment.xml,这是一个部署描述文件,可以完成 web.xml 的配置功能。而这个 web-fragment.xml 文件必须存放在 JAR 文件的 META-INF 目录下,而 JAR 包通常放在 WEB-INF/lib 目录下,因此 Tomcat 需要对 JAR 文件进行扫描才能支持这个功能。

可以通过配置 web.xml 里面的 <absolute-ordering> 元素直接指定了哪些 JAR 包需要扫描 web fragment,如果 <absolute-ordering/> 元素是空的, 则表示不需要扫描,像下面这样。

1
2
3
4
5
<web-app metadata-complete="true">
...
<absolute-ordering />
...
</web-app>

随机数熵源优化

Tomcat 7 以上的版本依赖 Java 的 SecureRandom 类来生成随机数,比如 Session ID。而 JVM 默认使用阻塞式熵源(/dev/random), 在某些情况下就会导致 Tomcat 启动变慢。当阻塞时间较长时, 你会看到这样一条警告日志:

1
2
<DATE> org.apache.catalina.util.SessionIdGenerator createSecureRandom
INFO: Creation of SecureRandom instance for session ID generation using [SHA1PRNG] took [8152] milliseconds.

解决方案是通过设置,让 JVM 使用非阻塞式的熵源。

我们可以设置 JVM 的参数:

1
-Djava.security.egd=file:/dev/./urandom

或者是设置 java.security 文件,位于 $JAVA_HOME/jre/lib/security 目录之下: securerandom.source=file:/dev/./urandom

这里请你注意,/dev/./urandom 中间有个 ./ 的原因是 Oracle JRE 中的 Bug,Java 8 里面的 SecureRandom 类已经修正这个 Bug。 阻塞式的熵源(/dev/random)安全性较高, 非阻塞式的熵源(/dev/./urandom)安全性会低一些,因为如果你对随机数的要求比较高, 可以考虑使用硬件方式生成熵源。

并行启动多个 Web 应用

Tomcat 启动的时候,默认情况下 Web 应用都是一个一个启动的,等所有 Web 应用全部启动完成,Tomcat 才算启动完毕。如果在一个 Tomcat 下有多个 Web 应用,为了优化启动速度,你可以配置多个应用程序并行启动,可以通过修改 server.xml 中 Host 元素的 startStopThreads 属性来完成。startStopThreads 的值表示你想用多少个线程来启动你的 Web 应用,如果设成 0 表示你要并行启动 Web 应用,像下面这样的配置。

1
2
3
4
5
6
7
<Engine startStopThreads="0">
...
<Host startStopThreads="0">
...
</Host>
...
</Engine>

需要注意的是,Engine 元素里也配置了这个参数,这意味着如果你的 Tomcat 配置了多个 Host(虚拟主机),Tomcat 会以并行的方式启动多个 Host。

参考资料

Tomcat 和 Jetty

Web 容器 Tomcat 或 Jetty,作为重要的系统中间件,连接着浏览器和你的 Web 应用,并且支撑着 Web 程序的运行,可以说,弄懂了 Tomcat 和 Jetty 的原理,Java Web 开发对你来说就毫无秘密可言

Web 容器

早期的 Web 应用主要用于浏览新闻等静态页面,HTTP 服务器(比如 Apache、Nginx)向浏览器返回静态 HTML,浏览器负责解析 HTML,将结果呈现给用户。

随着互联网的发展,我们已经不满足于仅仅浏览静态页面,还希望通过一些交互操作,来获取动态结果,因此也就需要一些扩展机制能够让 HTTP 服务器调用服务端程序。

于是 Sun 公司推出了 Servlet 技术。你可以把 Servlet 简单理解为运行在服务端的 Java 小程序,但是 Servlet 没有 main 方法,不能独立运行,因此必须把它部署到 Servlet 容器中,由容器来实例化并调用 Servlet。

而 Tomcat 和 Jetty 就是一个 Servlet 容器。为了方便使用,它们也具有 HTTP 服务器的功能,因此Tomcat 或者 Jetty 就是一个“HTTP 服务器 + Servlet 容器”,我们也叫它们 Web 容器。

其他应用服务器比如 JBoss 和 WebLogic,它们不仅仅有 Servlet 容器的功能,也包含 EJB 容器,是完整的 Java EE 应用服务器。从这个角度看,Tomcat 和 Jetty 算是一个轻量级的应用服务器。

在微服务架构日渐流行的今天,开发人员更喜欢稳定的、轻量级的应用服务器,并且应用程序用内嵌的方式来运行 Servlet 容器也逐渐流行起来。之所以选择轻量级,是因为在微服务架构下,我们把一个大而全的单体应用,拆分成一个个功能单一的微服务,在这个过程中,服务的数量必然要增加,但为了减少资源的消耗,并且降低部署的成本,我们希望运行服务的 Web 容器也是轻量级的,Web 容器本身应该消耗较少的内存和 CPU 资源,并且由应用本身来启动一个嵌入式的 Web 容器,而不是通过 Web 容器来部署和启动应用,这样可以降低应用部署的复杂度。

Dubbo 快速入门

Apache Dubbo 是一款高性能、轻量级的开源 Java RPC 框架,它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现。

一、Dubbo 简介

Apache Dubbo 是一款高性能、轻量级的开源 Java RPC 框架。

Dubbo 提供了三大核心能力:

  • 面向接口的远程方法调用
  • 智能容错和负载均衡
  • 服务自动注册和发现

RPC 原理简介

什么是 RPC

RPC(Remote Procedure Call),即远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。比如两个不同的服务 A、B 部署在两台不同的机器上,那么服务 A 如果想要调用服务 B 中的某个方法该怎么办呢?使用 HTTP 请求 当然可以,但是可能会比较慢而且一些优化做的并不好。 RPC 的出现就是为了解决这个问题。

RPC 工作流程

img

  1. 服务消费方(client)调用以本地调用方式调用服务;
  2. client stub 接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体;
  3. client stub 找到服务地址,并将消息发送到服务端;
  4. server stub 收到消息后进行解码;
  5. server stub 根据解码结果调用本地的服务;
  6. 本地服务执行并将结果返回给 server stub;
  7. server stub 将返回结果打包成消息并发送至消费方;
  8. client stub 接收到消息,并进行解码;
  9. 服务消费方得到最终结果。

为什么需要 Dubbo

如果你要开发分布式程序,你也可以直接基于 HTTP 接口进行通信,但是为什么要用 Dubbo 呢?

我觉得主要可以从 Dubbo 提供的下面四点特性来说为什么要用 Dubbo:

  1. 负载均衡——同一个服务部署在不同的机器时该调用那一台机器上的服务。
  2. 服务调用链路——随着系统的发展,服务越来越多,服务间依赖关系变得错踪复杂,甚至分不清哪个应用要在哪个应用之前启动,架构师都不能完整的描述应用的架构关系。Dubbo 可以为我们解决服务之间互相是如何调用的。
  3. 服务访问压力以及时长统计、资源调度和治理——基于访问压力实时管理集群容量,提高集群利用率。
  4. 服务治理——某个服务挂掉之后调用备用服务。

另外,Dubbo 除了能够应用在分布式系统中,也可以应用在现在比较火的微服务系统中。不过,由于 Spring Cloud 在微服务中应用更加广泛,所以,我觉得一般我们提 Dubbo 的话,大部分是分布式系统的情况。

二、QuickStart

(1)添加 maven 依赖

1
2
3
4
5
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
<version>${dubbo.version}</version>
</dependency>

(2)定义 Provider

1
2
3
4
5
package com.alibaba.dubbo.demo;

public interface DemoService {
String sayHello(String name);
}

(3)实现 Provider

1
2
3
4
5
6
7
8
package com.alibaba.dubbo.demo.provider;
import com.alibaba.dubbo.demo.DemoService;

public class DemoServiceImpl implements DemoService {
public String sayHello(String name) {
return "Hello " + name;
}
}

(4)配置 Provider

1
2
3
4
5
6
7
8
9
10
11
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<dubbo:application name="demo-provider"/>
<dubbo:registry address="multicast://224.5.6.7:1234"/>
<dubbo:protocol name="dubbo" port="20880"/>
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>
</beans>

(5)启动 Provider

1
2
3
4
5
6
7
8
9
10
11
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Provider {
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
new String[] {"META-INF/spring/dubbo-demo-provider.xml"});
context.start();
// press any key to exit
System.in.read();
}
}

(6)配置 Consumer

1
2
3
4
5
6
7
8
9
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<dubbo:application name="demo-consumer"/>
<dubbo:registry address="multicast://224.5.6.7:1234"/>
<dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService"/>
</beans>

(7)启动 Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import com.alibaba.dubbo.demo.DemoService;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Consumer {
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
context.start();
// obtain proxy object for remote invocation
DemoService demoService = (DemoService) context.getBean("demoService");
// execute remote invocation
String hello = demoService.sayHello("world");
// show the result
System.out.println(hello);
}
}

三、Dubbo 配置

Dubbo 所有配置最终都将转换为 URL 表示,并由服务提供方生成,经注册中心传递给消费方,各属性对应 URL 的参数,参见配置项一览表中的 “对应 URL 参数” 列。

只有 group,interface,version 是服务的匹配条件,三者决定是不是同一个服务,其它配置项均为调优和治理参数。

URL 格式:protocol://username:password@host:port/path?key=value&key=value

配置方式

Dubbo 支持多种配置方式:

  • xml 配置
  • properties 配置
  • API 配置
  • 注解配置

如果同时存在多种配置方式,遵循以下覆盖策略:

  • JVM 启动 -D 参数优先,这样可以使用户在部署和启动时进行参数重写,比如在启动时需改变协议的端口。
  • XML 次之,如果在 XML 中有配置,则 dubbo.properties 中的相应配置项无效。
  • Properties 最后,相当于缺省值,只有 XML 没有配置时,dubbo.properties 的相应配置项才会生效,通常用于共享公共配置,比如应用名。

xml 配置

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<!-- 提供方应用信息,用于计算依赖关系 -->
<dubbo:application name="hello-world-app" />
<!-- 使用 multicast 广播注册中心暴露服务地址 -->
<dubbo:registry address="multicast://224.5.6.7:1234" />
<!-- 用 dubbo 协议在 20880 端口暴露服务 -->
<dubbo:protocol name="dubbo" port="20880" />
<!-- 声明需要暴露的服务接口 -->
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoServiceLocal" />
<!-- 和本地 bean 一样实现服务 -->
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl" />

<!-- 生成远程服务代理,可以和本地 bean 一样使用 demoService -->
<dubbo:reference id="demoServiceRemote" interface="com.alibaba.dubbo.demo.DemoService" />
</beans>

Dubbo 会把以上配置项解析成下面的 URL 格式:

1
dubbo://host-ip:20880/com.alibaba.dubbo.demo.DemoService

然后基于扩展点自适应机制,通过 URL 的 dubbo:// 协议头识别,就会调用 DubboProtocol 的 export() 方法,打开服务端口 20880,就可以把服务 demoService 暴露到 20880 端口了。

properties 配置

示例:

1
2
3
dubbo.application.name=foo
dubbo.application.owner=bar
dubbo.registry.address=10.20.153.10:9090

配置项

所有配置项分为三大类:

  • 服务发现:表示该配置项用于服务的注册与发现,目的是让消费方找到提供方。
  • 服务治理:表示该配置项用于治理服务间的关系,或为开发测试提供便利条件。
  • 性能调优:表示该配置项用于调优性能,不同的选项对性能会产生影响。

配置项清单:

标签 用途 解释
dubbo:service 服务配置 用于暴露一个服务,定义服务的元信息,一个服务可以用多个协议暴露,一个服务也可以注册到多个注册中心
dubbo:reference 引用配置 用于创建一个远程服务代理,一个引用可以指向多个注册中心
dubbo:protocol 协议配置 用于配置提供服务的协议信息,协议由提供方指定,消费方被动接受
dubbo:application 应用配置 用于配置当前应用信息,不管该应用是提供者还是消费者
dubbo:module 模块配置 用于配置当前模块信息,可选
dubbo:registry 注册中心配置 用于配置连接注册中心相关信息
dubbo:monitor 监控中心配置 用于配置连接监控中心相关信息,可选
dubbo:provider 提供方配置 当 ProtocolConfig 和 ServiceConfig 某属性没有配置时,采用此缺省值,可选
dubbo:consumer 消费方配置 当 ReferenceConfig 某属性没有配置时,采用此缺省值,可选
dubbo:method 方法配置 用于 ServiceConfig 和 ReferenceConfig 指定方法级的配置信息
dubbo:argument 参数配置 用于指定方法参数配置

详细配置说明请参考:官方配置

配置之间的关系

配置覆盖关系

以 timeout 为例,显示了配置的查找顺序,其它 retries, loadbalance, actives 等类似:

  • 方法级优先,接口级次之,全局配置再次之
  • 如果级别一样,则消费方优先,提供方次之

其中,服务提供方配置,通过 URL 经由注册中心传递给消费方。

### 动态配置中心

配置中心(v2.7.0)在 Dubbo 中承担两个职责:

  1. 外部化配置。启动配置的集中式存储 (简单理解为 dubbo.properties 的外部化存储)。
  2. 服务治理。服务治理规则的存储与通知。

启用动态配置:

1
<dubbo:config-center address="zookeeper://127.0.0.1:2181"/>

或者

1
dubbo.config-center.address=zookeeper://127.0.0.1:2181

或者

1
2
ConfigCenterConfig configCenter = new ConfigCenterConfig();
configCenter.setAddress("zookeeper://127.0.0.1:2181");

四、Dubbo 架构

Dubbo 核心组件

节点角色:

节点 角色说明
Provider 暴露服务的服务提供方
Consumer 调用远程服务的服务消费方
Registry 服务注册与发现的注册中心
Monitor 统计服务的调用次数和调用时间的监控中心
Container 服务运行容器

调用关系:

  1. 服务容器负责启动,加载,运行服务提供者。
  2. 服务提供者在启动时,向注册中心注册自己提供的服务。
  3. 服务消费者在启动时,向注册中心订阅自己所需的服务。
  4. 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
  5. 服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。
  6. 服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。

重要知识点总结:

  • 注册中心负责服务地址的注册与查找,相当于目录服务,服务提供者和消费者只在启动时与注册中心交互,注册中心不转发请求,压力较小
  • 监控中心负责统计各服务调用次数,调用时间等,统计先在内存汇总后每分钟一次发送到监控中心服务器,并以报表展示
  • 注册中心,服务提供者,服务消费者三者之间均为长连接,监控中心除外
  • 注册中心通过长连接感知服务提供者的存在,服务提供者宕机,注册中心将立即推送事件通知消费者
  • 注册中心和监控中心全部宕机,不影响已运行的提供者和消费者,消费者在本地缓存了提供者列表
  • 注册中心和监控中心都是可选的,服务消费者可以直连服务提供者
  • 服务提供者无状态,任意一台宕掉后,不影响使用
  • 服务提供者全部宕掉后,服务消费者应用将无法使用,并无限次重连等待服务提供者恢复

问:注册中心挂了可以继续通信吗?

答:可以,因为刚开始初始化的时候,消费者会将提供者的地址等信息拉取到本地缓存,所以注册中心挂了可以继续通信。

Dubbo 架构层次

图例说明:

  • 图中左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口。
  • 图中从下至上分为十层,各层均为单向依赖,右边的黑色箭头代表层之间的依赖关系,每一层都可以剥离上层被复用,其中,Service 和 Config 层为 API,其它各层均为 SPI。
  • 图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类。
  • 图中蓝色虚线为初始化过程,即启动时组装链,红色实线为方法调用过程,即运行时调时链,紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法。

各层说明

  • config 配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类
  • proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory
  • registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService
  • cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance
  • monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorService
  • protocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
  • exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
  • transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
  • serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool
  • serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool

各层关系说明

  • 在 RPC 中,Protocol 是核心层,也就是只要有 Protocol + Invoker + Exporter 就可以完成非透明的 RPC 调用,然后在 Invoker 的主过程上 Filter 拦截点。
  • 图中的 Consumer 和 Provider 是抽象概念,只是想让看图者更直观的了解哪些类分属于客户端与服务器端,不用 Client 和 Server 的原因是 Dubbo 在很多场景下都使用 Provider, Consumer, Registry, Monitor 划分逻辑拓普节点,保持统一概念。
  • 而 Cluster 是外围概念,所以 Cluster 的目的是将多个 Invoker 伪装成一个 Invoker,这样其它人只要关注 Protocol 层 Invoker 即可,加上 Cluster 或者去掉 Cluster 对其它层都不会造成影响,因为只有一个提供者时,是不需要 Cluster 的。
  • Proxy 层封装了所有接口的透明化代理,而在其它层都以 Invoker 为中心,只有到了暴露给用户使用时,才用 Proxy 将 Invoker 转成接口,或将接口实现转成 Invoker,也就是去掉 Proxy 层 RPC 是可以 Run 的,只是不那么透明,不那么看起来像调本地服务一样调远程服务。
  • 而 Remoting 实现是 Dubbo 协议的实现,如果你选择 RMI 协议,整个 Remoting 都不会用上,Remoting 内部再划为 Transport 传输层和 Exchange 信息交换层,Transport 层只负责单向消息传输,是对 Mina, Netty, Grizzly 的抽象,它也可以扩展 UDP 传输,而 Exchange 层是在传输层之上封装了 Request-Response 语义。
  • Registry 和 Monitor 实际上不算一层,而是一个独立的节点,只是为了全局概览,用层的方式画在一起。

五、服务发现

服务提供者注册服务的过程:

Dubbo 配置项 dubbo://registry 声明了注册中心的地址,Dubbo 会把以上配置项解析成类似下面的 URL 格式:

1
registry://multicast://224.5.6.7:1234/com.alibaba.dubbo.registry.RegistryService?export=URL.encode("dubbo://host-ip:20880/com.alibaba.dubbo.demo.DemoService")

然后基于扩展点自适应机制,通过 URL 的“registry://”协议头识别,就会调用 RegistryProtocol 的 export() 方法,将 export 参数中的提供者 URL,注册到注册中心。

服务消费者发现服务的过程:

Dubbo 配置项 dubbo://registry 声明了注册中心的地址,跟服务注册的原理类似,Dubbo 也会把以上配置项解析成下面的 URL 格式:

1
registry://multicast://224.5.6.7:1234/com.alibaba.dubbo.registry.RegistryService?refer=URL.encode("consummer://host-ip/com.alibaba.dubbo.demo.DemoService")

然后基于扩展点自适应机制,通过 URL 的“registry://”协议头识别,就会调用 RegistryProtocol 的 refer() 方法,基于 refer 参数中的条件,查询服务 demoService 的地址。

启动时检查

Dubbo 缺省会在启动时检查依赖的服务是否可用,不可用时会抛出异常,阻止 Spring 初始化完成,以便上线时,能及早发现问题,默认 check="true"

可以通过 xml、properties、-D 参数三种方式设置。启动时检查

六、Dubbo 协议

Dubbo 支持多种通信协议,不同的协议针对不同的序列化方式。

dubbo 协议

dubbo 协议是 Dubbo 的默认通信协议,采用单一长连接和 NIO 异步通信,基于 hessian 作为序列化协议。

dubbo 协议适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。反之,Dubbo 缺省协议不适合传送大数据量的服务,比如传文件,传视频等,除非请求量很低。

为了要支持高并发场景,一般是服务提供者就几台机器,但是服务消费者有上百台,可能每天调用量达到上亿次!此时用长连接是最合适的,就是跟每个服务消费者维持一个长连接就可以,可能总共就 100 个连接。然后后面直接基于长连接 NIO 异步通信,可以支撑高并发请求。

rmi 协议

rmi - 采用 JDK 标准的 java.rmi.* 实现,采用阻塞式短连接和 JDK 标准序列化方式。

注意:如果正在使用 RMI 提供服务给外部访问,同时应用里依赖了老的 common-collections 包的情况下,存在反序列化安全风险。

hessian 协议

hessian 协议用于集成 Hessian 的服务,Hessian 底层采用 Http 通讯,采用 Servlet 暴露服务,Dubbo 缺省内嵌 Jetty 作为服务器实现。

Dubbo 的 Hessian 协议可以和原生 Hessian 服务互操作,即:

  • 提供者用 Dubbo 的 Hessian 协议暴露服务,消费者直接用标准 Hessian 接口调用
  • 或者提供方用标准 Hessian 暴露服务,消费方用 Dubbo 的 Hessian 协议调用。

thrift 协议

当前 dubbo 支持的 thrift 协议是对 thrift 原生协议的扩展,在原生协议的基础上添加了一些额外的头信息,比如 service name,magic number 等。

使用 dubbo thrift 协议同样需要使用 thrift 的 idl compiler 编译生成相应的 java 代码,后续版本中会在这方面做一些增强。

http 协议

http 协议基于 HTTP 表单的远程调用协议,采用 Spring 的 HttpInvoker 实现。

使用 JSON 序列化方式。

webservice 协议

基于 WebService 的远程调用协议,基于 Apache CXFfrontend-simpletransports-http 实现。

使用 SOAP 序列化方式。

可以和原生 WebService 服务互操作,即:

  • 提供者用 Dubbo 的 WebService 协议暴露服务,消费者直接用标准 WebService 接口调用,
  • 或者提供方用标准 WebService 暴露服务,消费方用 Dubbo 的 WebService 协议调用。

rest 协议

基于标准的 Java REST API——JAX-RS 2.0(Java API for RESTful Web Services 的简写)实现的 REST 调用支持

memcached 协议

基于 memcached 实现的 RPC 协议。

redis 协议

基于 redis 实现的 RPC 协议。

在现实世界中,序列化有多种方式。

JDK 自身提供的序列化方式,效率不高,但是 Java 程序使用最多。

如果想要较好的可读性,可以使用 JSON (常见库有:jacksongsonfastjson)或 SOAP (即 xml 形式)

如果想要更好的性能,可以使用 thriftprotobufhessian

想深入了解可以参考:序列化

七、集群容错

在集群调用失败时,Dubbo 提供了多种容错方案,缺省为 failover 重试。

  • Failover - 失败自动切换,当出现失败,重试其它服务器。通常用于读操作,但重试会带来更长延迟。可通过 retries=”2” 来设置重试次数(不含第一次)。
  • Failfast - 快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。
  • Failsafe - 失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
  • Failback - 失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
  • Forking - 并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=”2” 来设置最大并行数。
  • Broadcast - 广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。

集群容错配置示例:

1
2
<dubbo:service cluster="failsafe" />
<dubbo:reference cluster="failsafe" />

八、负载均衡

Dubbo 提供了多种负载均衡(LoadBalance)策略,缺省为 Random 随机调用。

Dubbo 的负载均衡配置可以细粒度到服务、方法级别,且 dubbo:servicedubbo:reference 均可配置。

1
2
3
4
5
6
7
8
9
10
11
12
<!-- 服务端服务级别 -->
<dubbo:service interface="..." loadbalance="roundrobin" />
<!-- 客户端服务级别 -->
<dubbo:reference interface="..." loadbalance="roundrobin" />
<!-- 服务端方法级别 -->
<dubbo:service interface="...">
<dubbo:method name="..." loadbalance="roundrobin"/>
</dubbo:service>
<!-- 客户端方法级别 -->
<dubbo:reference interface="...">
<dubbo:method name="..." loadbalance="roundrobin"/>
</dubbo:reference>

Random

  • 随机,按权重设置随机概率。
  • 在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。

RoundRobin

  • 轮询,按公约后的权重设置轮询比率。
  • 存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

LeastActive

  • 最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
  • 使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。

ConsistentHash

  • 一致性 Hash,相同参数的请求总是发到同一提供者。
  • 当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
  • 算法参见:http://en.wikipedia.org/wiki/Consistent_hashing
  • 缺省只对第一个参数 Hash,如果要修改,请配置 <dubbo:parameter key="hash.arguments" value="0,1" />
  • 缺省用 160 份虚拟节点,如果要修改,请配置 <dubbo:parameter key="hash.nodes" value="320" />

九、Dubbo 服务治理

服务治理简介

  • 当服务越来越多时,服务 URL 配置管理变得非常困难,F5 硬件负载均衡器的单点压力也越来越大。
  • 当进一步发展,服务间依赖关系变得错踪复杂,甚至分不清哪个应用要在哪个应用之前启动,架构师都不能完整的描述应用的架构关系。
  • 接着,服务的调用量越来越大,服务的容量问题就暴露出来,这个服务需要多少机器支撑?什么时候该加机器?

以上问题可以归纳为服务治理问题,这也是 Dubbo 的核心功能。

调用链路

一个微服务架构,往往由大量分布式服务组成。那么这些服务之间互相是如何调用的?调用链路是啥?说实话,几乎到后面没人搞的清楚了,因为服务实在太多了,可能几百个甚至几千个服务。

那就需要基于 dubbo 做的分布式系统中,对各个服务之间的调用自动记录下来,然后自动将各个服务之间的依赖关系和调用链路生成出来,做成一张图,显示出来,大家才可以看到对吧。

服务访问压力以及时长统计

需要自动统计各个接口和服务之间的调用次数以及访问延时,而且要分成两个级别。

  • 一个级别是接口粒度,就是每个服务的每个接口每天被调用多少次,TP50/TP90/TP99,三个档次的请求延时分别是多少;
  • 第二个级别是从源头入口开始,一个完整的请求链路经过几十个服务之后,完成一次请求,每天全链路走多少次,全链路请求延时的 TP50/TP90/TP99,分别是多少。

其他

  • 服务分层(避免循环依赖)
  • 调用链路失败监控和报警
  • 服务鉴权
  • 每个服务的可用性的监控(接口调用成功率?几个 9?99.99%,99.9%,99%)

所谓失败重试,就是 consumer 调用 provider 要是失败了,比如抛异常了,此时应该是可以重试的,或者调用超时了也可以重试。配置如下:

1
<dubbo:reference id="xxxx" interface="xx" check="true" async="false" retries="3" timeout="2000"/>

举个栗子。

某个服务的接口,要耗费 5s,你这边不能干等着,你这边配置了 timeout 之后,我等待 2s,还没返回,我直接就撤了,不能干等你。

可以结合你们公司具体的场景来说说你是怎么设置这些参数的:

  • timeout:一般设置为 200ms,我们认为不能超过 200ms 还没返回。
  • retries:设置 retries,一般是在读请求的时候,比如你要查询个数据,你可以设置个 retries,如果第一次没读到,报错,重试指定的次数,尝试再次读取。

路由规则

路由规则决定一次 dubbo 服务调用的目标服务器,分为条件路由规则和脚本路由规则,并且支持可扩展。

向注册中心写入路由规则的操作通常由监控中心或治理中心的页面完成。

1
2
3
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://10.20.153.10:2181"));
registry.register(URL.valueOf("condition://0.0.0.0/com.foo.BarService?category=routers&dynamic=false&rule=" + URL.encode("host = 10.20.153.10 => host = 10.20.153.11") + "));
  • condition:// - 表示路由规则的类型,支持条件路由规则和脚本路由规则,可扩展,必填。
  • 0.0.0.0 - 表示对所有 IP 地址生效,如果只想对某个 IP 的生效,请填入具体 IP,必填。
  • com.foo.BarService - 表示只对指定服务生效,必填。
  • category=routers - 表示该数据为动态配置类型,必填。
  • dynamic=false - 表示该数据为持久数据,当注册方退出时,数据依然保存在注册中心,必填。
  • enabled=true - 覆盖规则是否生效,可不填,缺省生效。
  • force=false - 当路由结果为空时,是否强制执行,如果不强制执行,路由结果为空的路由规则将自动失效,可不填,缺省为 flase。
  • runtime=false - 是否在每次调用时执行路由规则,否则只在提供者地址列表变更时预先执行并缓存结果,调用时直接从缓存中获取路由结果。如果用了参数路由,必须设为 true,需要注意设置会影响调用的性能,可不填,缺省为 flase。
  • priority=1 - 路由规则的优先级,用于排序,优先级越大越靠前执行,可不填,缺省为 0。
  • rule=URL.encode(“host = 10.20.153.10 => host = 10.20.153.11”) - 表示路由规则的内容,必填。

服务降级

可以通过服务降级功能临时屏蔽某个出错的非关键服务,并定义降级后的返回策略。

向注册中心写入动态配置覆盖规则:

1
2
3
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://10.20.153.10:2181"));
registry.register(URL.valueOf("override://0.0.0.0/com.foo.BarService?category=configurators&dynamic=false&application=foo&mock=force:return+null"));

其中:

mock=force:return+null 表示消费方对该服务的方法调用都直接返回 null 值,不发起远程调用。用来屏蔽不重要服务不可用时对调用方的影响。
还可以改为 mock=fail:return+null 表示消费方对该服务的方法调用在失败后,再返回 null 值,不抛异常。用来容忍不重要服务不稳定时对调用方的影响。

比如说服务 A 调用服务 B,结果服务 B 挂掉了,服务 A 重试几次调用服务 B,还是不行,那么直接降级,走一个备用的逻辑,给用户返回响应。

举个例子,我们有接口 HelloServiceHelloServiceImpl 有该接口的具体实现。

1
2
3
4
5
6
7
8
9
public interface HelloService {
void sayHello();
}

public class HelloServiceImpl implements HelloService {
public void sayHello() {
System.out.println("hello world......");
}
}

Dubbo 配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<!-- provider 配置 -->
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">

<dubbo:application name="dubbo-provider" />
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<dubbo:protocol name="dubbo" port="20880" />
<dubbo:service interface="com.zhss.service.HelloService" ref="helloServiceImpl" timeout="10000" />
<bean id="helloServiceImpl" class="com.zhss.service.HelloServiceImpl" />

</beans>

<!-- consumer 配置 -->
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">

<dubbo:application name="dubbo-consumer" />

<dubbo:registry address="zookeeper://127.0.0.1:2181" />

<dubbo:reference id="fooService" interface="com.test.service.FooService" timeout="10000" check="false" mock="return null">
</dubbo:reference>

</beans>

我们调用接口失败的时候,可以通过 mock 统一返回 null。

mock 的值也可以修改为 true,然后再跟接口同一个路径下实现一个 Mock 类,命名规则是 “接口名称+Mock” 后缀。然后在 Mock 类里实现自己的降级逻辑。

1
2
3
4
5
public class HelloServiceMock implements HelloService {
public void sayHello() {
// 降级逻辑
}
}

访问控制

直连

在开发及测试环境下,经常需要绕过注册中心,只测试指定服务提供者,这时候可能需要点对点直连,点对点直联方式,将以服务接口为单位,忽略注册中心的提供者列表,A 接口配置点对点,不影响 B 接口从注册中心获取列表。

配置方式:

(1)通过 XML 配置

如果是线上需求需要点对点,可在 dubbo:reference 中配置 url 指向提供者,将绕过注册中心,多个地址用分号隔开,配置如下:

1
<dubbo:reference id="xxxService" interface="com.alibaba.xxx.XxxService" url="dubbo://localhost:20890" />

(2)通过 -D 参数指定

在 JVM 启动参数中加入-D 参数映射服务地址:

1
java -Dcom.alibaba.xxx.XxxService=dubbo://localhost:20890

(3)通过文件映射
如果服务比较多,也可以用文件映射,用 -Ddubbo.resolve.file 指定映射文件路径,此配置优先级高于 dubbo:reference 中的配置:

1
java -Ddubbo.resolve.file=xxx.properties

然后在映射文件 xxx.properties 中加入配置,其中 key 为服务名,value 为服务提供者 URL:

1
com.alibaba.xxx.XxxService=dubbo://localhost:20890

只订阅

为方便开发测试,经常会在线下共用一个所有服务可用的注册中心,这时,如果一个正在开发中的服务提供者注册,可能会影响消费者不能正常运行。

可以让服务提供者开发方,只订阅服务(开发的服务可能依赖其它服务),而不注册正在开发的服务,通过直连测试正在开发的服务。

禁用注册配置:

1
<dubbo:registry address="10.20.153.10:9090" register="false" />

或者

1
<dubbo:registry address="10.20.153.10:9090?register=false" />

只注册

如果有两个镜像环境,两个注册中心,有一个服务只在其中一个注册中心有部署,另一个注册中心还没来得及部署,而两个注册中心的其它应用都需要依赖此服务。这个时候,可以让服务提供者方只注册服务到另一注册中心,而不从另一注册中心订阅服务。

禁用订阅配置

1
2
<dubbo:registry id="hzRegistry" address="10.20.153.10:9090" />
<dubbo:registry id="qdRegistry" address="10.20.141.150:9090" subscribe="false" />

或者

1
2
<dubbo:registry id="hzRegistry" address="10.20.153.10:9090" />
<dubbo:registry id="qdRegistry" address="10.20.141.150:9090?subscribe=false" />

静态服务

有时候希望人工管理服务提供者的上线和下线,此时需将注册中心标识为非动态管理模式。

1
<dubbo:registry address="10.20.141.150:9090" dynamic="false" />

或者

1
<dubbo:registry address="10.20.141.150:9090?dynamic=false" />

服务提供者初次注册时为禁用状态,需人工启用。断线时,将不会被自动删除,需人工禁用。

动态配置

向注册中心写入动态配置覆盖规则。该功能通常由监控中心或治理中心的页面完成。

1
2
3
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://10.20.153.10:2181"));
registry.register(URL.valueOf("override://0.0.0.0/com.foo.BarService?category=configurators&dynamic=false&application=foo&timeout=1000"));

其中:

  • override:// - 表示数据采用覆盖方式,支持 override 和 absent,可扩展,必填。
  • 0.0.0.0 - 表示对所有 IP 地址生效,如果只想覆盖某个 IP 的数据,请填入具体 IP,必填。
  • com.foo.BarService - 表示只对指定服务生效,必填。
  • category=configurators - 表示该数据为动态配置类型,必填。
  • dynamic=false - 表示该数据为持久数据,当注册方退出时,数据依然保存在注册中心,必填。
  • enabled=true - 覆盖规则是否生效,可不填,缺省生效。
  • application=foo - 表示只对指定应用生效,可不填,表示对所有应用生效。
  • timeout=1000 - 表示将满足以上条件的 timeout 参数的值覆盖为 1000。如果想覆盖其它参数,直接加在 override 的 URL 参数上。

示例:

  • 禁用提供者:(通常用于临时踢除某台提供者机器,相似的,禁止消费者访问请使用路由规则)
1
override://10.20.153.10/com.foo.BarService?category=configurators&dynamic=false&disbaled=true
  • 调整权重:(通常用于容量评估,缺省权重为 100)
1
override://10.20.153.10/com.foo.BarService?category=configurators&dynamic=false&weight=200
  • 调整负载均衡策略:(缺省负载均衡策略为 random)
1
override://10.20.153.10/com.foo.BarService?category=configurators&dynamic=false&loadbalance=leastactive
  • 服务降级:(通常用于临时屏蔽某个出错的非关键服务)
1
override://0.0.0.0/com.foo.BarService?category=configurators&dynamic=false&application=foo&mock=force:return+null

十、多版本

当一个接口实现,出现不兼容升级时,可以用版本号过渡,版本号不同的服务相互间不引用。

可以按照以下的步骤进行版本迁移:

  1. 在低压力时间段,先升级一半提供者为新版本
  2. 再将所有消费者升级为新版本
  3. 然后将剩下的一半提供者升级为新版本

老版本服务提供者配置:

1
<dubbo:service interface="com.foo.BarService" version="1.0.0" />

新版本服务提供者配置:

1
<dubbo:service interface="com.foo.BarService" version="2.0.0" />

老版本服务消费者配置:

1
<dubbo:reference id="barService" interface="com.foo.BarService" version="1.0.0" />

新版本服务消费者配置:

1
<dubbo:reference id="barService" interface="com.foo.BarService" version="2.0.0" />

如果不需要区分版本,可以按照以下的方式配置 [1]

1
<dubbo:reference id="barService" interface="com.foo.BarService" version="*" />

十一、Dubbo SPI

SPI 全称为 Service Provider Interface,是一种服务发现机制。SPI 的本质是将接口实现类的全限定名配置在文件中,并由服务加载器读取配置文件,加载实现类。这样可以在运行时,动态为接口替换实现类。正因此特性,我们可以很容易的通过 SPI 机制为我们的程序提供拓展功能。SPI 机制在第三方框架中也有所应用,比如 Dubbo 就是通过 SPI 机制加载所有的组件。不过,Dubbo 并未使用 Java 原生的 SPI 机制,而是对其进行了增强,使其能够更好的满足需求。在 Dubbo 中,SPI 是一个非常重要的模块。基于 SPI,我们可以很容易的对 Dubbo 进行拓展。

Dubbo SPI 的相关逻辑被封装在了 ExtensionLoader 类中,通过 ExtensionLoader,我们可以加载指定的实现类。Dubbo SPI 所需的配置文件需放置在 META-INF/dubbo 路径下。

参考资料

消息队列面试夺命连环问

为什么使用消息队列?

解耦

看这么个场景。A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃……

img

在这个场景中,A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。A 系统要时时刻刻考虑 BCDE 四个系统如果挂了该咋办?要不要重发,要不要把消息存起来?头发都白了啊!

如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。

img

总结:通过一个 MQ,Pub/Sub 发布订阅消息这么一个模型,A 系统就跟其它系统彻底解耦了。

面试技巧:你需要去考虑一下你负责的系统中是否有类似的场景,就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦,也是可以的,你就需要去考虑在你的项目里,是不是可以运用这个 MQ 去进行系统的解耦。在简历中体现出来这块东西,用 MQ 作解耦。

异步

再来看一个场景,A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求,等待个 1s,这几乎是不可接受的。

img

一般互联网类的企业,对于用户直接的操作,一般要求是每个请求都必须在 200 ms 以内完成,对用户几乎是无感知的。

如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms,对于用户而言,其实感觉上就是点个按钮,8ms 以后就直接返回了,爽!网站做得真好,真快!

img

削峰

每天 0:00 到 12:00,A 系统风平浪静,每秒并发请求数量就 50 个。结果每次一到 12:00 ~ 13:00 ,每秒并发请求数量突然会暴增到 5k+ 条。但是系统是直接基于 MySQL 的,大量的请求涌入 MySQL,每秒钟对 MySQL 执行约 5k 条 SQL。

一般的 MySQL,扛到每秒 2k 个请求就差不多了,如果每秒请求到 5k 的话,可能就直接把 MySQL 给打死了,导致系统崩溃,用户也就没法再使用系统了。

但是高峰期一过,到了下午的时候,就成了低峰期,可能也就 1w 的用户同时在网站上操作,每秒中的请求数量可能也就 50 个请求,对整个系统几乎没有任何的压力。

img

如果使用 MQ,每秒 5k 个请求写入 MQ,A 系统每秒钟最多处理 2k 个请求,因为 MySQL 每秒钟最多处理 2k 个。A 系统从 MQ 中慢慢拉取请求,每秒钟就拉取 2k 个请求,不要超过自己每秒能处理的最大请求数量就 ok,这样下来,哪怕是高峰期的时候,A 系统也绝对不会挂掉。而 MQ 每秒钟 5k 个请求进来,就 2k 个请求出去,结果就导致在中午高峰期(1 个小时),可能有几十万甚至几百万的请求积压在 MQ 中。

img

这个短暂的高峰期积压是 ok 的,因为高峰期过了之后,每秒钟就 50 个请求进 MQ,但是 A 系统依然会按照每秒 2k 个请求的速度在处理。所以说,只要高峰期一过,A 系统就会快速将积压的消息给解决掉。

消息队列有什么优缺点

优点上面已经说了,就是在特殊场景下有其对应的好处解耦异步削峰

缺点有以下几个:

  • 系统可用性降低
    系统引入的外部依赖越多,越容易挂掉。本来你就是 A 系统调用 BCD 三个系统的接口就好了,人 ABCD 四个系统好好的,没啥问题,你偏加个 MQ 进来,万一 MQ 挂了咋整,MQ 一挂,整套系统崩溃的,你不就完了?如何保证消息队列的高可用,可以点击这里查看
  • 系统复杂度提高
    硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?头大头大,问题一大堆,痛苦不已。
  • 一致性问题
    A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。

所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关键时刻,用,还是得用的。

Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,比 RocketMQ、Kafka 低一个数量级 同 ActiveMQ 10 万级,支撑高吞吐 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic 数量对吞吐量的影响 topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性 ms 级 微秒级,这是 RabbitMQ 的一大特点,延迟最低 ms 级 延迟在 ms 级以内
可用性 高,基于主从架构实现高可用 同 ActiveMQ 非常高,分布式架构 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到 0 丢失 同 RocketMQ
功能支持 MQ 领域的功能极其完备 基于 erlang 开发,并发能力很强,性能极好,延时很低 MQ 功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

综上,各种对比之后,有如下建议:

  • 一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;
  • 后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;
  • 不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。
  • 所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。
  • 如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

如何保证消息队列的高可用?

RabbitMQ 的高可用性

RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。

RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。

单机模式

单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的 😄,没人生产用单机模式。

普通集群模式(无高可用性)

普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。

img

这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈

而且如果那个放 queue 的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。

所以这个事儿就比较尴尬了,这就没有什么所谓的高可用性这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。

镜像集群模式(高可用性)

这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。

img

那么如何开启这个镜像集群模式呢?其实很简单,RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。

这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!第二,这么玩儿,不是分布式的,就没有扩展性可言了,如果某个 queue 负载很重,你加机器,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展你的 queue。你想,如果这个 queue 的数据量很大,大到这个机器上的容量无法容纳了,此时该怎么办呢?

Kafka 的高可用性

Kafka 一个最基本的架构认识:由多个 broker 组成,每个 broker 是一个节点;你创建一个 topic,这个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 就放一部分数据。

这就是天然的分布式消息队列,就是说一个 topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据

实际上 RabbmitMQ 之类的,并不是分布式消息队列,它就是传统的消息队列,只不过提供了一些集群、HA(High Availability, 高可用性) 的机制而已,因为无论怎么玩儿,RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据。

Kafka 0.8 以前,是没有 HA 机制的,就是任何一个 broker 宕机了,那个 broker 上的 partition 就废了,没法写也没法读,没有什么高可用性可言。

比如说,我们假设创建了一个 topic,指定其 partition 数量是 3 个,分别在三台机器上。但是,如果第二台机器宕机了,会导致这个 topic 的 1/3 的数据就丢了,因此这个是做不到高可用的。

img

Kafka 0.8 以后,提供了 HA 机制,就是 replica(复制品) 副本机制。每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上的数据即可。只能读写 leader?很简单,要是你可以随意读写每个 follower,那么就要 care 数据一致性的问题,系统复杂度太高,很容易出问题。Kafka 会均匀地将一个 partition 的所有 replica 分布在不同的机器上,这样才可以提高容错性。

img

这么搞,就有所谓的高可用性了,因为如果某个 broker 宕机了,没事儿,那个 broker 上面的 partition 在其他机器上都有副本的。如果这个宕机的 broker 上面有某个 partition 的 leader,那么此时会从 follower 中重新选举一个新的 leader 出来,大家继续读写那个新的 leader 即可。这就有所谓的高可用性了。

写数据的时候,生产者就写 leader,然后 leader 将数据落地写本地磁盘,接着其他 follower 自己主动从 leader 来 pull 数据。一旦所有 follower 同步好数据了,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功的消息给生产者。(当然,这只是其中一种模式,还可以适当调整这个行为)

消费的时候,只会从 leader 去读,但是只有当一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到。

看到这里,相信你大致明白了 Kafka 是如何保证高可用机制的了,对吧?不至于一无所知,现场还能给面试官画画图。要是遇上面试官确实是 Kafka 高手,深挖了问,那你只能说不好意思,太深入的你没研究过。

如何保证消息不被重复消费?(如何保证消息消费的幂等性)

首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为这问题通常不是 MQ 自己保证的,是由我们开发来保证的。挑一个 Kafka 来举个例子,说说怎么重复消费吧。

Kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示“我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset 来继续消费吧”。

但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接 kill 进程了,再重启。这会导致 consumer 有些消息处理了,但是没来得及提交 offset,尴尬了。重启之后,少数消息会再次消费一次。

举个栗子。

有这么个场景。数据 1/2/3 依次进入 kafka,kafka 会给这三条数据每条分配一个 offset,代表这条数据的序号,我们就假设分配的 offset 依次是 152/153/154。消费者从 kafka 去消费的时候,也是按照这个顺序去消费。假如当消费者消费了 offset=153 的这条数据,刚准备去提交 offset 到 zookeeper,此时消费者进程被重启了。那么此时消费过的数据 1/2 的 offset 并没有提交,kafka 也就不知道你已经消费了 offset=153 这条数据。那么重启之后,消费者会找 kafka 说,嘿,哥儿们,你给我接着把上次我消费到的那个地方后面的数据继续给我传递过来。由于之前的 offset 没有提交成功,那么数据 1/2 会再次传过来,如果此时消费者没有去重的话,那么就会导致重复消费。

img

如果消费者干的事儿是拿一条数据就往数据库里写一条,会导致说,你可能就把数据 1/2 在数据库里插入了 2 次,那么数据就错啦。

其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性

举个例子吧。假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。

一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。

幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错

所以第二个问题来了,怎么保证消息队列消费的幂等性?

其实还是得结合业务来思考,我这里给几个思路:

  • 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
  • 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
  • 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
  • 比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

img

当然,如何保证 MQ 的消费是幂等性的,需要结合具体的业务来看。

如何保证消息的可靠性传输?(如何处理消息丢失的问题)

数据的丢失问题,可能出现在生产者、MQ、消费者中,咱们从 RabbitMQ 和 Kafka 分别来分析一下吧。

RabbitMQ

img

生产者弄丢了数据

生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。

此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit

1
2
3
4
5
6
7
8
9
10
11
12
// 开启事务
channel.txSelect
try {
// 这里发送消息
} catch (Exception e) {
channel.txRollback

// 这里再次重发这条消息
}

// 提交事务
channel.txCommit

但是问题是,RabbitMQ 事务机制(同步)一搞,基本上吞吐量会下来,因为太耗性能

所以一般来说,如果你要确保说写 RabbitMQ 的消息别丢,可以开启 confirm 模式,在生产者那里设置开启 confirm模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。

事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。

所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。

RabbitMQ 弄丢了数据

就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。

设置持久化有两个步骤

  • 创建 queue 的时候将其设置为持久化
    这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
  • 第二个是发送消息的时候将消息的 deliveryMode 设置为 2
    就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。

必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。

注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。

所以,持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack,你也是可以自己重发的。

消费端弄丢了数据

RabbitMQ 如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。

这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

img

Kafka

消费端弄丢了数据

唯一可能导致消费者弄丢数据的情况,就是说,你消费到了这个消息,然后消费者那边自动提交了 offset,让 Kafka 以为你已经消费好了这个消息,但其实你才刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。

这不是跟 RabbitMQ 差不多吗,大家都知道 Kafka 会自动提交 offset,那么只要关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢。但是此时确实还是可能会有重复消费,比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。

生产环境碰到的一个问题,就是说我们的 Kafka 消费者消费到了数据之后是写到一个内存的 queue 里先缓冲一下,结果有的时候,你刚把消息写入内存 queue,然后消费者会自动提交 offset。然后此时我们重启了系统,就会导致内存 queue 里还没来得及处理的数据就丢失了。

Kafka 弄丢了数据

这块比较常见的一个场景,就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。大家想想,要是此时其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不就少了一些数据?这就丢了一些数据啊。

生产环境也遇到过,我们也是,之前 Kafka 的 leader 机器宕机了,将 follower 切换为 leader 之后,就会发现说这个数据就丢了。

所以此时一般是要求起码设置如下 4 个参数:

  • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。
  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。
  • 在 producer 端设置 acks=all:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了
  • 在 producer 端设置 retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。

我们生产环境就是按照上述要求配置的,这样配置之后,至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。

生产者会不会弄丢数据?

如果按照上述的思路设置了 acks=all,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

如何保证消息的顺序性?

我举个例子,我们以前做过一个 mysql binlog 同步的系统,压力还是非常大的,日同步数据要达到上亿,就是说数据从一个 mysql 库原封不动地同步到另一个 mysql 库里面去(mysql -> mysql)。常见的一点在于说比如大数据 team,就需要同步一个 mysql 库过来,对公司的业务系统的数据做各种复杂的操作。

你在 mysql 里增删改一条数据,对应出来了增删改 3 条 binlog 日志,接着这三条 binlog 发送到 MQ 里面,再消费出来依次执行,起码得保证人家是按照顺序来的吧?不然本来是:增加、修改、删除;你楞是换了顺序给执行成删除、修改、增加,不全错了么。

本来这个数据同步过来,应该最后这个数据被删除了;结果你搞错了这个顺序,最后这个数据保留下来了,数据同步就出错了。

先看看顺序会错乱的俩场景:

  • RabbitMQ:一个 queue,多个 consumer。比如,生产者向 RabbitMQ 里发送了三条数据,顺序依次是 data1/data2/data3,压入的是 RabbitMQ 的一个内存队列。有三个消费者分别从 MQ 中消费这三条数据中的一条,结果消费者 2 先执行完操作,把 data2 存入数据库,然后是 data1/data3。这不明显乱了。

img

  • Kafka:比如说我们建了一个 topic,有三个 partition。生产者在写的时候,其实可以指定一个 key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。
    消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是 ok 的,没有错乱。接着,我们在消费者里可能会搞多个线程来并发处理消息。因为如果消费者是单线程消费处理,而处理比较耗时的话,比如处理一条消息耗时几十 ms,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。而多个线程并发跑的话,顺序可能就乱掉了。

img

解决方案

RabbitMQ

img

Kafka

  • 一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
  • 写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。

img

如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?

你看这问法,其实本质针对的场景,都是说,可能你的消费端出了问题,不消费了;或者消费的速度极其慢。接着就坑爹了,可能你的消息队列集群的磁盘都快写满了,都没人消费,这个时候怎么办?或者是这整个就积压了几个小时,你这个时候怎么办?或者是你积压的时间太长了,导致比如 RabbitMQ 设置了消息过期时间后就没了怎么办?

所以就这事儿,其实线上挺常见的,一般不出,一出就是大 case。一般常见于,举个例子,消费端每次消费之后要写 mysql,结果 mysql 挂了,消费端 hang 那儿了,不动了;或者是消费端出了个什么岔子,导致消费速度极其慢。

面试题剖析

关于这个事儿,我们一个一个来梳理吧,先假设一个场景,我们现在消费端出故障了,然后大量消息在 mq 里积压,现在出事故了,慌了。

大量消息在 mq 里积压了几个小时了还没解决

几千万条数据在 MQ 里积压了七八个小时,从下午 4 点多,积压到了晚上 11 点多。这个是我们真实遇到过的一个场景,确实是线上故障了,这个时候要不然就是修复 consumer 的问题,让它恢复消费速度,然后傻傻的等待几个小时消费完毕。这个肯定不能在面试的时候说吧。

一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟就是 18 万条。所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概 1 小时的时间才能恢复过来。

一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下:

  • 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉。
  • 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
  • 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
  • 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
  • 等快速消费完积压数据之后,得恢复原先部署的架构重新用原先的 consumer 机器来消费消息。

mq 中的消息过期失效了

假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢

这个情况下,就不是说要增加 consumer 消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上 12 点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。

假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。

mq 都快写满了

如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。

如果让你写一个消息队列,该如何进行架构设计啊?说一下你的思路。

面试官心理分析

其实聊到这个问题,一般面试官要考察两块:

  • 你有没有对某一个消息队列做过较为深入的原理的了解,或者从整体了解把握住一个消息队列的架构原理。
  • 看看你的设计能力,给你一个常见的系统,就是消息队列系统,看看你能不能从全局把握一下整体架构设计,给出一些关键点出来。

说实话,问类似问题的时候,大部分人基本都会蒙,因为平时从来没有思考过类似的问题,大多数人就是平时埋头用,从来不去思考背后的一些东西。类似的问题,比如,如果让你来设计一个 Spring 框架你会怎么做?如果让你来设计一个 Dubbo 框架你会怎么做?如果让你来设计一个 MyBatis 框架你会怎么做?

面试题剖析

其实回答这类问题,说白了,不求你看过那技术的源码,起码你要大概知道那个技术的基本原理、核心组成部分、基本架构构成,然后参照一些开源的技术把一个系统设计出来的思路说一下就好。

比如说这个消息队列系统,我们从以下几个角度来考虑一下:

  • 首先这个 mq 得支持可伸缩性吧,就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎么搞?设计个分布式的系统呗,参照一下 kafka 的设计理念,broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给 topic 增加 partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了?
  • 其次你得考虑一下这个 mq 的数据要不要落地磁盘吧?那肯定要了,落磁盘才能保证别进程挂了数据就丢了。那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是 kafka 的思路。
  • 其次你考虑一下你的 mq 的可用性啊?这个事儿,具体参考之前可用性那个环节讲解的 kafka 的高可用保障机制。多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。
  • 能不能支持数据 0 丢失啊?可以的,参考我们之前说的那个 kafka 数据零丢失方案。

mq 肯定是很复杂的,面试官问你这个问题,其实是个开放题,他就是看看你有没有从架构角度整体构思和设计的思维以及能力。确实这个问题可以刷掉一大批人,因为大部分人平时不思考这些东西。

RocketMQ 快速入门

Apache RocketMQ 是一个分布式 MQ 和流处理平台,具有低延迟、高性能和可靠性、万亿级容量和灵活的可扩展性。

RocketMQ 由阿里巴巴孵化,被捐赠给 Apache,成为 Apache 的顶级项目。

RocketMQ 概念

img

消息模型(Message Model)

RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个 Topic 的消息,每个 Topic 的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个 Topic 中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个 Consumer 实例构成。

消息(Message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ 中每个消息拥有唯一的 Message ID,且可以携带具有业务标识的 Key。系统提供了通过 Message ID 和 Key 查询消息的功能。

消息还可以具有可选 Tag 和额外的键值对。例如,您可以为消息设置业务密钥,并在代理服务器上查找消息以诊断开发期间的问题。

标签(Tag)

为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。消费者可以根据 Tag 实现对不同子主题的不同消费逻辑,实现更好的扩展性。

Tag 相当于子主题,为用户提供了额外的灵活性。对于 Tag,来自同一业务模块的具有不同目的的消息可以具有相同的主题和不同的 Tag。

主题(Topic)

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位。

代理服务器(Broker Server)

消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

名称服务(Name Server)

名称服务充当路由消息的提供者。生产者或消费者能够通过名称服务查找各主题相应的 Broker IP 列表。多个 Namesrv 实例组成集群,但相互独立,没有信息交换。

消息生产者(Producer)

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。

生产者组(Producer Group)

同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

警告:考虑到提供的 Producer 在发送消息方面足够强大,每个 Producer 组只允许一个实例,以避免不必要的生成器实例初始化

消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

消费者组(Consumer Group)

同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的 Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

拉取式消费(Pull Consumer)

Consumer 消费的一种类型,应用通常主动调用 Consumer 的拉消息方法从 Broker 服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

推动式消费(Push Consumer)

Consumer 消费的一种类型,该模式下 Broker 收到数据后会主动推送给消费端,该消费模式一般实时性较高。

集群消费(Clustering)

集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。

广播消费(Broadcasting)

广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。

普通顺序消息(Normal Ordered Message)

普通顺序消费模式下,消费者通过同一个消息队列( Topic 分区,称作 Message Queue) 收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。

严格顺序消息(Strictly Ordered Message)

严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

RocketMQ 特性

订阅与发布

消息的发布是指某个生产者向某个 topic 发送消息;消息的订阅是指某个消费者关注了某个 topic 中带有某些 tag 的消息,进而从该 topic 消费数据。

消息顺序

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ 可以严格的保证消息有序。

顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个 Topic 下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

  • 全局顺序 对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
  • 分区顺序 对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。 适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

消息过滤

RocketMQ 的消费者可以根据 Tag 进行消息过滤,也支持自定义属性过滤。消息过滤目前是在 Broker 端实现的,优点是减少了对于 Consumer 无用消息的网络传输,缺点是增加了 Broker 的负担、而且实现相对复杂。

消息可靠性

RocketMQ 支持消息的高可靠,影响消息可靠性的几种情况:

  1. Broker 非正常关闭
  2. Broker 异常 Crash
  3. OS Crash
  4. 机器掉电,但是能立即恢复供电情况
  5. 机器无法开机(可能是 cpu、主板、内存等关键设备损坏)
  6. 磁盘设备损坏

1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,RocketMQ 在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。

5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ 在这两种情况下,通过异步复制,可保证 99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与 Money 相关的应用。注:RocketMQ 从 3.0 版本开始支持同步双写。

至少一次

至少一次(At least Once)指每个消息必须投递一次。Consumer 先 Pull 消息到本地,消费完成后,才向服务器返回 ack,如果没有消费一定不会 ack 消息,所以 RocketMQ 可以很好的支持此特性。

回溯消费

回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在向 Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒。

事务消息

RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

定时消息

定时消息(延迟队列)是指消息发送到 broker 后,不会立即被消费,等待特定时间投递给真正的 topic。 broker 有配置项 messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18 个 level。可以配置自定义 messageDelayLevel。注意,messageDelayLevel 是 broker 的属性,不属于某个 topic。发消息时,设置 delayLevel 等级即可:msg.setDelayLevel(level)。level 有以下三种情况:

  • level == 0,消息为非延迟消息
  • 1<=level<=maxLevel,消息延迟特定时间,例如 level==1,延迟 1s
  • level > maxLevel,则 level== maxLevel,例如 level==20,延迟 2h

定时消息会暂存在名为 SCHEDULE_TOPIC_XXXX 的 topic 中,并根据 delayTimeLevel 存入特定的 queue,queueId = delayTimeLevel – 1,即一个 queue 只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker 会调度地消费 SCHEDULE_TOPIC_XXXX,将消息写入真实的 topic。

需要注意的是,定时消息会在第一次写入和调度写入真实 topic 时都会计数,因此发送数量、tps 都会变高。

消息重试

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败通常可以认为有以下几种情况:

  • 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过 10 秒后再重试。
  • 由于依赖的下游应用服务不可用,例如 db 连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用 sleep 30s,再消费下一条消息,这样可以减轻 Broker 重试消息的压力。

RocketMQ 会为每个消费组都设置一个 Topic 名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个 Topic 的重试队列是针对消费组,而不是针对每个 Topic 设置的),用于暂时保存因为各种异常而导致 Consumer 端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试消息的处理是先保存至 Topic 名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行 Delay 后重新保存至“%RETRY%+consumerGroup”的重试队列中。

消息重投

生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway 没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在 RocketMQ 中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer 负载变化也会导致重复消息。如下方法可以设置消息重试策略:

  • retryTimesWhenSendFailed:同步发送失败重投次数,默认为 2,因此生产者会最多尝试发送 retryTimesWhenSendFailed + 1 次。不会选择上次失败的 broker,尝试向其他 broker 发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投。
  • retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他 broker,仅在同一个 broker 上做重试,不保证消息不丢。
  • retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或 slave 不可用(返回状态非 SEND_OK),是否尝试发送到其他 broker,默认 false。十分重要消息可以开启。

量控制

生产者流控,因为 broker 处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。

生产者流控:

  • commitLog 文件被锁时间超过 osPageCacheBusyTimeOutMills 时,参数默认为 1000ms,返回流控。
  • 如果开启 transientStorePoolEnable == true,且 broker 为异步刷盘的主机,且 transientStorePool 中资源不足,拒绝当前 send 请求,返回流控。
  • broker 每隔 10ms 检查 send 请求队列头部请求的等待时间,如果超过 waitTimeMillsInSendQueue,默认 200ms,拒绝当前 send 请求,返回流控。
  • broker 通过拒绝 send 请求方式实现流量控制。

注意,生产者流控,不会尝试消息重投。

消费者流控:

  • 消费者本地缓存消息数超过 pullThresholdForQueue 时,默认 1000。
  • 消费者本地缓存消息大小超过 pullThresholdSizeForQueue 时,默认 100MB。
  • 消费者本地缓存消息跨度超过 consumeConcurrentlyMaxSpan 时,默认 2000。

消费者流控的结果是降低拉取频率。

死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在 RocketMQ 中,可以通过使用 console 控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

RocketMQ 组件

img

RocketMQ 由四部分组成:NameServer、Broker、Producer、Consumer。其中任意一个组成都可以水平扩展为集群模式,以避免单点故障问题。

NameServer(命名服务器)

NameServer 是一个 Topic 路由注册中心,其角色类似 Kafka 中的 zookeeper,支持 Broker 的动态注册与发现。每个 NameServer 记录完整的路由信息,提供相应的读写服务,支持快速存储扩展。

NameServer 主要包括两个功能:

  • Broker 管理,NameServer 接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查 Broker 是否还存活;
  • 路由信息管理,每个 NameServer 将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。然后 Producer 和 Conumser 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费。

NameServer 通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker 是向每一台 NameServer 注册自己的路由信息,所以每一个 NameServer 实例上面都保存一份完整的路由信息。当某个 NameServer 因某种原因下线了,Broker 仍然可以向其它 NameServer 同步其路由信息,Producer、Consumer 仍然可以动态感知 Broker 的路由的信息。

NameServer 是一个功能齐全的服务器,主要包括两个功能:

  1. Broker 管理 - NameServer 接受来自 Broker 集群的注册,并提供心跳机制来检查 Broker 节点是否存活。
  2. 路由管理 - 每个 NameServer 将保存有关 Broker 集群的完整路由信息和客户端查询的查询队列。

RocketMQ 客户端(Producer/Consumer)将从 NameServer 查询队列路由信息。

将 NameServer 地址列表提供给客户端有四种方法:

  1. 编程方式 - 类似:producer.setNamesrvAddr("ip:port")
  2. Java 选项 - 使用 rocketmq.namesrv.addr 参数
  3. 环境变量 - 设置环境变量 NAMESRV_ADDR
  4. HTTP 端点

更详细信息可以参考官方文档:here

Broker(代理)

Broker 主要负责消息的存储、投递和查询以及服务高可用保证。

Broker 同时支持推拉模型,包含容错机制(2 副本或 3 副本),并提供强大的峰值填充和按原始时间顺序累积数千亿消息的能力。此外,Broker 提供了灾难恢复、丰富的指标统计和警报机制,这些都是传统 MQ 所缺乏的。

为了实现这些功能,Broker 包含了以下几个重要子模块:

  • Remoting Module:整个 Broker 的实体,负责处理来自 clients 端的请求。
  • Client Manager:负责管理客户端(Producer/Consumer)和维护 Consumer 的 Topic 订阅信息。
  • Store Service:提供方便简单的 API 接口处理消息存储到物理硬盘和查询功能。
  • HA Service:高可用服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能。
  • Index Service:根据特定的 Message key 对投递到 Broker 的消息进行索引服务,以提供消息的快速查询。

img

Producer(生产者)

Producers 支持分布式集群方式部署。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

Consumer(消费者)

Consumer 支持分布式集群方式部署。支持以 push 推,pull 拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

RocketMQ 安装

环境要求

  • 推荐 64 位操作系统:Linux/Unix/Mac
  • 64bit JDK 1.8+
  • Maven 3.2.x
  • Git

下载解压

进入官方下载地址:https://rocketmq.apache.org/dowloading/releases/,选择合适版本

建议选择 binary 版本。

解压到本地:

1
2
> unzip rocketmq-all-4.2.0-source-release.zip
> cd rocketmq-all-4.2.0/

启动 Name Server

1
2
3
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

启动 Broker

1
2
3
> nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
> tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success...

收发消息

执行收发消息操作之前,不许告诉客户端命名服务器的位置。在 RocketMQ 中有多种方法来实现这个目的。这里,我们使用最简单的方法——设置环境变量 NAMESRV_ADDR

1
2
3
4
5
6
> export NAMESRV_ADDR=localhost:9876
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

关闭服务器

1
2
3
4
5
6
7
> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK

RocketMQ 入门级示例

首先在项目中引入 maven 依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>

Producer

Producer 在 RocketMQ 中负责发送消息。

RocketMQ 有三种消息发送方式:

  • 可靠的同步发送
  • 可靠的异步发送
  • 单项发送

可靠的同步发送

可靠的同步传输用于广泛的场景,如重要的通知消息,短信通知,短信营销系统等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}

可靠的异步发送

异步传输通常用于响应时间敏感的业务场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
//Launch the instance.
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}

单向传输

单向传输用于需要中等可靠性的情况,例如日志收集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class OnewayProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);

}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}

Consumer

Consumer 在 RocketMQ 中负责接收消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setNamesrvAddr(RocketConfig.HOST);

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

consumer.registerMessageListener(new MessageListenerOrderly() {

AtomicLong consumeTimes = new AtomicLong(0);

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;

}
});

consumer.start();

System.out.printf("Consumer Started.%n");
}
}

RocketMQ 官方示例

参考资料

ActiveMQ 快速入门

JMS 基本概念

JMSJava 消息服务(Java Message Service)API,是一个 Java 平台中关于面向消息中间件的 API。它用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对 JMS 提供支持。

消息模型

JMS 有两种消息模型:

  • Point-to-Point(P2P)
  • Publish/Subscribe(Pub/Sub)

P2P 的特点

img

在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列 javax.jms.Queue 相关联。

每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)。

发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。

接收者在成功接收消息之后需向队列应答成功。

如果你希望发送的每个消息都应该被成功处理的话,那么你需要 P2P 模式。

Pub/Sub 的特点

img

发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题 javax.jms.Topic 关联。

每个消息可以有多个消费者。

发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。

为了缓和这样严格的时间相关性,JMS 允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

如果你希望发送的消息可以不被做任何处理、或者被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用 Pub/Sub 模型。

JMS 编程模型

img

ConnectionFactory

创建 Connection 对象的工厂,针对两种不同的 jms 消息模型,分别有 QueueConnectionFactoryTopicConnectionFactory 两种。可以通过 JNDI 来查找 ConnectionFactory 对象。

Connection

Connection 表示在客户端和 JMS 系统之间建立的链接(对 TCP/IP socket 的包装)。Connection 可以产生一个或多个Session。跟 ConnectionFactory 一样,Connection 也有两种类型:QueueConnectionTopicConnection

Destination

Destination 是一个包装了消息目标标识符的被管对象。消息目标是指消息发布和接收的地点,或者是队列 Queue ,或者是主题 Topic 。JMS 管理员创建这些对象,然后用户通过 JNDI 发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的 Queue,以及发布者/订阅者模型的 Topic

Session

Session 表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息,生产者来发送消息,消费者来接收消息。同样,Session 也分 QueueSessionTopicSession

MessageConsumer

MessageConsumerSession 创建,并用于将消息发送到 Destination。消费者可以同步地(阻塞模式),或(非阻塞)接收 QueueTopic 类型的消息。同样,消息生产者分两种类型:QueueSenderTopicPublisher

MessageProducer

MessageProducerSession 创建,用于接收被发送到 Destination 的消息。MessageProducer 有两种类型:QueueReceiverTopicSubscriber。可分别通过 sessioncreateReceiver(Queue)createSubscriber(Topic) 来创建。当然,也可以 sessioncreatDurableSubscriber 方法来创建持久化的订阅者。

Message

是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分:

  • 消息头(必须):包含用于识别和为消息寻找路由的操作设置。
  • 一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
  • 一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。

消息接口非常灵活,并提供了许多方式来定制消息的内容。

Common Point-to-Point Publish-Subscribe
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopicSession
MessageProducer QueueSender TopicPublisher
MessageSender QueueReceiver, QueueBrowser TopicSubscriber

安装

安装条件

JDK1.7 及以上版本

本地配置了 JAVA_HOME 环境变量。

下载

支持 Windows/Unix/Linux/Cygwin

ActiveMQ 官方下载地址

Windows 下运行

(1)解压压缩包到本地;

(2)打开控制台,进入安装目录的 bin 目录下;

1
cd [activemq_install_dir]

(3)执行 activemq start 来启动 ActiveMQ

1
bin\activemq start

测试安装是否成功

(1)ActiveMQ 默认监听端口为 61616

1
netstat -an|find “61616”

(2)访问 http://127.0.0.1:8161/admin/

(3)输入用户名、密码

1
2
Login: admin
Passwort: admin

(4)点击导航栏的 Queues 菜单

(5)添加一个队列(queue)

项目中的应用

引入依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.1</version>
</dependency>

Sender.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class Sender {
private static final int SEND_NUMBER = 4;

public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory;
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
Session session;
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// MessageProducer:消息发送者
MessageProducer producer;
// TextMessage message;
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
// 得到消息生成者【发送者】
producer = session.createProducer(destination);
// 设置不持久化,此处学习,实际根据项目决定
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 构造消息,此处写死,项目就是参数,或者方法获取
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}

public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("ActiveMq 发送的消息" + i);
// 发送消息到目的地方
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
}
}
}

Receiver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class Receiver {
public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory;
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
Session session;
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// 消费者,消息接收者
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
consumer = session.createConsumer(destination);
while (true) {
//设置接收者接收消息的时间,为了便于测试,这里谁定为100s
TextMessage message = (TextMessage) consumer.receive(100000);
if (null != message) {
System.out.println("收到消息" + message.getText());
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}

运行

先运行 Receiver.java 进行消息监听,再运行 Send.java 发送消息。

输出

Send 的输出内容

1
2
3
4
发送消息:Activemq 发送消息0
发送消息:Activemq 发送消息1
发送消息:Activemq 发送消息2
发送消息:Activemq 发送消息3

Receiver 的输出内容

1
2
3
4
收到消息ActiveMQ 发送消息0
收到消息ActiveMQ 发送消息1
收到消息ActiveMQ 发送消息2
收到消息ActiveMQ 发送消息3

资源

Flink 简介

关键概念:源源不断的流式数据处理、事件时间、有状态流处理和状态快照

流处理

任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。

数据可以被作为 无界 或者 有界 流来处理。

  1. 无界流 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
  2. 有界流 有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。

Bounded and unbounded streams

Apache Flink 擅长处理无界和有界数据集 精确的时间控制和状态化使得 Flink 的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。

批处理是有界数据流处理的范例。在这种模式下,你可以选择在计算结果输出之前输入整个数据集,这也就意味着你可以对整个数据集的数据进行排序、统计或汇总计算后再输出结果。

流处理正相反,其涉及无界数据流。至少理论上来说,它的数据输入永远不会结束,因此程序必须持续不断地对到达的数据进行处理。

在 Flink 中,应用程序由用户自定义算子转换而来的流式 dataflows 所组成。这些流式 dataflows 形成了有向图,以一个或多个(source)开始,并以一个或多个(sink)结束。

A DataStream program, and its dataflow.

通常,程序代码中的 transformation 和 dataflow 中的算子(operator)之间是一一对应的。但有时也会出现一个 transformation 包含多个算子的情况,如上图所示。

Flink 应用程序可以消费来自消息队列或分布式日志这类流式数据源(例如 Apache Kafka 或 Kinesis)的实时数据,也可以从各种的数据源中消费有界的历史数据。同样,Flink 应用程序生成的结果流也可以发送到各种数据汇中。

Flink application with sources and sinks

并行 Dataflows

Flink 程序本质上是分布式并行程序。在程序执行期间,一个流有一个或多个流分区(Stream Partition),每个算子有一个或多个算子子任务(Operator Subtask)。每个子任务彼此独立,并在不同的线程中运行,或在不同的计算机或容器中运行。

算子子任务数就是其对应算子的并行度。在同一程序中,不同算子也可能具有不同的并行度。

A parallel dataflow

Flink 算子之间可以通过一对一(_直传_)模式或重新分发模式传输数据:

  • 一对一模式(例如上图中的 Sourcemap() 算子之间)可以保留元素的分区和顺序信息。这意味着 map() 算子的 subtask[1] 输入的数据以及其顺序与 Source 算子的 subtask[1] 输出的数据和顺序完全相同,即同一分区的数据只会进入到下游算子的同一分区。
  • 重新分发模式(例如上图中的 map()keyBy/window 之间,以及 keyBy/windowSink 之间)则会更改数据所在的流分区。当你在程序中选择使用不同的 _transformation_,每个算子子任务也会根据不同的 transformation 将数据发送到不同的目标子任务。例如以下这几种 transformation 和其对应分发数据的模式:_keyBy()_(通过散列键重新分区)、_broadcast()_(广播)或 _rebalance()(随机重新分发)。在重新分发数据的过程中,元素只有在每对输出和输入子任务之间才能保留其之间的顺序信息(例如,_keyBy/window 的 subtask[2] 接收到的 map() 的 subtask[1] 中的元素都是有序的)。因此,上图所示的 keyBy/windowSink 算子之间数据的重新分发时,不同键(key)的聚合结果到达 Sink 的顺序是不确定的。

自定义时间流处理

对于大多数流数据处理应用程序而言,能够使用处理实时数据的代码重新处理历史数据并产生确定并一致的结果非常有价值。

在处理流式数据时,我们通常更需要关注事件本身发生的顺序而不是事件被传输以及处理的顺序,因为这能够帮助我们推理出一组事件(事件集合)是何时发生以及结束的。例如电子商务交易或金融交易中涉及到的事件集合。

为了满足上述这类的实时流处理场景,我们通常会使用记录在数据流中的事件时间的时间戳,而不是处理数据的机器时钟的时间戳。

有状态流处理

Flink 中的算子可以是有状态的。这意味着如何处理一个事件可能取决于该事件之前所有事件数据的累积结果。Flink 中的状态不仅可以用于简单的场景(例如统计仪表板上每分钟显示的数据),也可以用于复杂的场景(例如训练作弊检测模型)。

Flink 应用程序可以在分布式集群上并行运行,其中每个算子的各个并行实例会在单独的线程中独立运行,并且通常情况下是会在不同的机器上运行。

有状态算子的并行实例组在存储其对应状态时通常是按照键(key)进行分片存储的。每个并行实例算子负责处理一组特定键的事件数据,并且这组键对应的状态会保存在本地。

如下图的 Flink 作业,其前三个算子的并行度为 2,最后一个 sink 算子的并行度为 1,其中第三个算子是有状态的,并且你可以看到第二个算子和第三个算子之间是全互联的(fully-connected),它们之间通过网络进行数据分发。通常情况下,实现这种类型的 Flink 程序是为了通过某些键对数据流进行分区,以便将需要一起处理的事件进行汇合,然后做统一计算处理。

State is sharded

Flink 应用程序的状态访问都在本地进行,因为这有助于其提高吞吐量和降低延迟。通常情况下 Flink 应用程序都是将状态存储在 JVM 堆上,但如果状态太大,我们也可以选择将其以结构化数据格式存储在高速磁盘中。

State is local

通过状态快照实现的容错

通过状态快照和流重放两种方式的组合,Flink 能够提供可容错的,精确一次计算的语义。这些状态快照在执行时会获取并存储分布式 pipeline 中整体的状态,它会将数据源中消费数据的偏移量记录下来,并将整个 job graph 中算子获取到该数据(记录的偏移量对应的数据)时的状态记录并存储下来。当发生故障时,Flink 作业会恢复上次存储的状态,重置数据源从状态中记录的上次消费的偏移量开始重新进行消费处理。而且状态快照在执行时会异步获取状态并存储,并不会阻塞正在进行的数据处理逻辑。

状态

只有在每一个单独的事件上进行转换操作的应用才不需要状态,换言之,每一个具有一定复杂度的流处理应用都是有状态的。任何运行基本业务逻辑的流处理应用都需要在一定时间内存储所接收的事件或中间结果,以供后续的某个时间点(例如收到下一个事件或者经过一段特定时间)进行访问并进行后续处理。

img

应用状态是 Flink 中的一等公民,Flink 提供了许多状态管理相关的特性支持,其中包括:

  • 多种状态基础类型:Flink 为多种不同的数据结构提供了相对应的状态基础类型,例如原子值(value),列表(list)以及映射(map)。开发者可以基于处理函数对状态的访问方式,选择最高效、最适合的状态基础类型。
  • 插件化的 State Backend:State Backend 负责管理应用程序状态,并在需要的时候进行 checkpoint。Flink 支持多种 state backend,可以将状态存在内存或者 RocksDB。RocksDB 是一种高效的嵌入式、持久化键值存储引擎。Flink 也支持插件式的自定义 state backend 进行状态存储。
  • 精确一次语义:Flink 的 checkpoint 和故障恢复算法保证了故障发生后应用状态的一致性。因此,Flink 能够在应用程序发生故障时,对应用程序透明,不造成正确性的影响。
  • 超大数据量状态:Flink 能够利用其异步以及增量式的 checkpoint 算法,存储数 TB 级别的应用状态。
  • 可弹性伸缩的应用:Flink 能够通过在更多或更少的工作节点上对状态进行重新分布,支持有状态应用的分布式的横向伸缩。

时间

时间语义

Flink 支持以下三种时间语义:

  • **事件时间(event time)**: 事件产生的时间,记录的是设备生产(或者存储)事件的时间
  • **摄取时间(ingestion time)**: Flink 读取事件时记录的时间
  • **处理时间(processing time)**: Flink pipeline 中具体算子处理事件的时间

为了获得可重现的结果,例如在计算过去的特定一天里第一个小时股票的最高价格时,我们应该使用事件时间。这样的话,无论什么时间去计算都不会影响输出结果。然而如果使用处理时间的话,实时应用程序的结果是由程序运行的时间所决定。多次运行基于处理时间的实时程序,可能得到的结果都不相同,也可能会导致再次分析历史数据或者测试新代码变得异常困难。

Event Time

如果想要使用事件时间,需要额外给 Flink 提供一个时间戳提取器和 Watermark 生成器,Flink 将使用它们来跟踪事件时间的进度。

Watermark

让我们通过一个简单的示例来演示为什么需要 watermarks 及其工作方式。

在此示例中,我们将看到带有混乱时间戳的事件流,如下所示。显示的数字表达的是这些事件实际发生时间的时间戳。到达的第一个事件发生在时间 4,随后发生的事件发生在更早的时间 2,依此类推:

··· 23 19 22 24 21 14 17 13 12 15 9 11 7 2 4 →

假设我们要对数据流排序,我们想要达到的目的是:应用程序应该在数据流里的事件到达时就有一个算子(我们暂且称之为排序)开始处理事件,这个算子所输出的流是按照时间戳排序好的。

让我们重新审视这些数据:

(1) 我们的排序器看到的第一个事件的时间戳是 4,但是我们不能立即将其作为已排序的流释放。因为我们并不能确定它是有序的,并且较早的事件有可能并未到达。事实上,如果站在上帝视角,我们知道,必须要等到时间戳为 2 的元素到来时,排序器才可以有事件输出。

需要一些缓冲,需要一些时间,但这都是值得的

(2) 接下来的这一步,如果我们选择的是固执的等待,我们永远不会有结果。首先,我们看到了时间戳为 4 的事件,然后看到了时间戳为 2 的事件。可是,时间戳小于 2 的事件接下来会不会到来呢?可能会,也可能不会。再次站在上帝视角,我们知道,我们永远不会看到时间戳 1。

最终,我们必须勇于承担责任,并发出指令,把带有时间戳 2 的事件作为已排序的事件流的开始

(3) 然后,我们需要一种策略,该策略定义:对于任何给定时间戳的事件,Flink 何时停止等待较早事件的到来。

这正是 watermarks 的作用 — 它们定义何时停止等待较早的事件。

Flink 中事件时间的处理取决于 _watermark 生成器_,后者将带有时间戳的特殊元素插入流中形成 _watermarks_。事件时间 t 的 watermark 代表 t 之前(很可能)都已经到达。

当 watermark 以 2 或更大的时间戳到达时,事件流的排序器应停止等待,并输出 2 作为已经排序好的流。

(4) 我们可能会思考,如何决定 watermarks 的不同生成策略

每个事件都会延迟一段时间后到达,然而这些延迟有所不同,有些事件可能比其他事件延迟得更多。一种简单的方法是假定这些延迟受某个最大延迟的限制。Flink 将此策略称为 最大无序边界 (bounded-out-of-orderness) watermark。当然,我们可以想像出更好的生成 watermark 的方法,但是对于大多数应用而言,固定延迟策略已经足够了。

延迟 VS 正确性

watermarks 给了开发者流处理的一种选择,它们使开发人员在开发应用程序时可以控制延迟和完整性之间的权衡。与批处理不同,批处理中的奢侈之处在于可以在产生任何结果之前完全了解输入,而使用流式传输,我们不被允许等待所有的时间都产生了,才输出排序好的数据,这与流相违背。

我们可以把 watermarks 的边界时间配置的相对较短,从而冒着在输入了解不完全的情况下产生结果的风险-即可能会很快产生错误结果。或者,你可以等待更长的时间,并利用对输入流的更全面的了解来产生结果。

当然也可以实施混合解决方案,先快速产生初步结果,然后在处理其他(最新)数据时向这些结果提供更新。对于有一些对延迟的容忍程度很低,但是又对结果有很严格的要求的场景下,或许是一个福音。

延迟

延迟是相对于 watermarks 定义的。Watermark(t) 表示事件流的时间已经到达了 t; watermark 之后的时间戳 ≤ t 的任何事件都被称之为延迟事件。

使用 Watermarks

如果想要使用基于带有事件时间戳的事件流,Flink 需要知道与每个事件相关的时间戳,而且流必须包含 watermark。

动手练习中使用的出租车数据源已经为我们处理了这些详细信息。但是,在您自己的应用程序中,您将必须自己进行处理,这通常是通过实现一个类来实现的,该类从事件中提取时间戳,并根据需要生成 watermarks。最简单的方法是使用 WatermarkStrategy

1
2
3
4
5
6
7
8
DataStream<Event> stream = ...

WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.timestamp);

DataStream<Event> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(strategy);

窗口

我们在操作无界数据流时,经常需要应对以下问题,我们经常把无界数据流分解成有界数据流聚合分析:

  • 每分钟的浏览量
  • 每位用户每周的会话数
  • 每个传感器每分钟的最高温度

用 Flink 计算窗口分析取决于两个主要的抽象操作:_Window Assigners_,将事件分配给窗口(根据需要创建新的窗口对象),以及 _Window Functions_,处理窗口内的数据。

Flink 的窗口 API 还具有 TriggersEvictors 的概念,Triggers 确定何时调用窗口函数,而 Evictors 则可以删除在窗口中收集的元素。

举一个简单的例子,我们一般这样使用键控事件流(基于 key 分组的输入事件流):

1
2
3
4
stream.
.keyBy(<key selector>)
.window(<window assigner>)
.reduce|aggregate|process(<window function>)

您不是必须使用键控事件流(keyed stream),但是值得注意的是,如果不使用键控事件流,我们的程序就不能 并行 处理。

1
2
3
stream.
.windowAll(<window assigner>)
.reduce|aggregate|process(<window function>)

窗口分配器

Flink 有一些内置的窗口分配器,如下所示:

Window assigners

通过一些示例来展示关于这些窗口如何使用,或者如何区分它们:

  • 滚动时间窗口
    • 每分钟页面浏览量
    • TumblingEventTimeWindows.of(Time.minutes(1))
  • 滑动时间窗口
    • 每 10 秒钟计算前 1 分钟的页面浏览量
    • SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))
  • 会话窗口
    • 每个会话的网页浏览量,其中会话之间的间隔至少为 30 分钟
    • EventTimeSessionWindows.withGap(Time.minutes(30))

以下都是一些可以使用的间隔时间 Time.milliseconds(n), Time.seconds(n), Time.minutes(n), Time.hours(n), 和 Time.days(n)

基于时间的窗口分配器(包括会话时间)既可以处理 事件时间,也可以处理 处理时间。这两种基于时间的处理没有哪一个更好,我们必须折衷。使用 处理时间,我们必须接受以下限制:

  • 无法正确处理历史数据,
  • 无法正确处理超过最大无序边界的数据,
  • 结果将是不确定的,

但是有自己的优势,较低的延迟。

使用基于计数的窗口时,请记住,只有窗口内的事件数量到达窗口要求的数值时,这些窗口才会触发计算。尽管可以使用自定义触发器自己实现该行为,但无法应对超时和处理部分窗口。

我们可能在有些场景下,想使用全局 window assigner 将每个事件(相同的 key)都分配给某一个指定的全局窗口。 很多情况下,一个比较好的建议是使用 ProcessFunction,具体介绍在这里

窗口应用函数

我们有三种最基本的操作窗口内的事件的选项:

  1. 像批量处理,ProcessWindowFunction 会缓存 Iterable 和窗口内容,供接下来全量计算;
  2. 或者像流处理,每一次有事件被分配到窗口时,都会调用 ReduceFunction 或者 AggregateFunction 来增量计算;
  3. 或者结合两者,通过 ReduceFunction 或者 AggregateFunction 预聚合的增量计算结果在触发窗口时, 提供给 ProcessWindowFunction 做全量计算。

接下来展示一段 1 和 3 的示例,每一个实现都是计算传感器的最大值。在每一个一分钟大小的事件时间窗口内, 生成一个包含 (key,end-of-window-timestamp, max_value) 的一组结果。

ProcessWindowFunction 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
DataStream<SensorReading> input = ...

input
.keyBy(x -> x.key)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(new MyWastefulMax());

public static class MyWastefulMax extends ProcessWindowFunction<
SensorReading, // 输入类型
Tuple3<String, Long, Integer>, // 输出类型
String, // 键类型
TimeWindow> { // 窗口类型

@Override
public void process(
String key,
Context context,
Iterable<SensorReading> events,
Collector<Tuple3<String, Long, Integer>> out) {

int max = 0;
for (SensorReading event : events) {
max = Math.max(event.value, max);
}
out.collect(Tuple3.of(key, context.window().getEnd(), max));
}
}

在当前实现中有一些值得关注的地方:

  • Flink 会缓存所有分配给窗口的事件流,直到触发窗口为止。这个操作可能是相当昂贵的。
  • Flink 会传递给 ProcessWindowFunction 一个 Context 对象,这个对象内包含了一些窗口信息。Context 接口 展示大致如下:
1
2
3
4
5
6
7
8
9
public abstract class Context implements java.io.Serializable {
public abstract W window();

public abstract long currentProcessingTime();
public abstract long currentWatermark();

public abstract KeyedStateStore windowState();
public abstract KeyedStateStore globalState();
}

windowStateglobalState 可以用来存储当前的窗口的 key、窗口或者当前 key 的每一个窗口信息。这在一些场景下会很有用,试想,我们在处理当前窗口的时候,可能会用到上一个窗口的信息。

增量聚合示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
DataStream<SensorReading> input = ...

input
.keyBy(x -> x.key)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(new MyReducingMax(), new MyWindowFunction());

private static class MyReducingMax implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r1 : r2;
}
}

private static class MyWindowFunction extends ProcessWindowFunction<
SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {

@Override
public void process(
String key,
Context context,
Iterable<SensorReading> maxReading,
Collector<Tuple3<String, Long, SensorReading>> out) {

SensorReading max = maxReading.iterator().next();
out.collect(Tuple3.of(key, context.window().getEnd(), max));
}
}

请注意 Iterable<SensorReading> 将只包含一个读数 – MyReducingMax 计算出的预先汇总的最大值。

晚到的事件

默认场景下,超过最大无序边界的事件会被删除,但是 Flink 给了我们两个选择去控制这些事件。

您可以使用一种称为旁路输出 的机制来安排将要删除的事件收集到侧输出流中,这里是一个示例:

1
2
3
4
5
6
7
8
9
OutputTag<Event> lateTag = new OutputTag<Event>("late"){};

SingleOutputStreamOperator<Event> result = stream.
.keyBy(...)
.window(...)
.sideOutputLateData(lateTag)
.process(...);

DataStream<Event> lateStream = result.getSideOutput(lateTag);

我们还可以指定 允许的延迟(allowed lateness) 的间隔,在这个间隔时间内,延迟的事件将会继续分配给窗口(同时状态会被保留),默认状态下,每个延迟事件都会导致窗口函数被再次调用(有时也称之为 late firing )。

默认情况下,允许的延迟为 0。换句话说,watermark 之后的元素将被丢弃(或发送到侧输出流)。

举例说明:

1
2
3
4
5
stream.
.keyBy(...)
.window(...)
.allowedLateness(Time.seconds(10))
.process(...);

当允许的延迟大于零时,只有那些超过最大无序边界以至于会被丢弃的事件才会被发送到侧输出流(如果已配置)。

深入了解窗口操作

Flink 的窗口 API 某些方面有一些奇怪的行为,可能和我们预期的行为不一致。 根据 Flink 用户邮件列表 和其他地方一些频繁被问起的问题, 以下是一些有关 Windows 的底层事实,这些信息可能会让您感到惊讶。

滑动窗口是通过复制来实现的

滑动窗口分配器可以创建许多窗口对象,并将每个事件复制到每个相关的窗口中。例如,如果您每隔 15 分钟就有 24 小时的滑动窗口,则每个事件将被复制到 4 * 24 = 96 个窗口中。

时间窗口会和时间对齐

仅仅因为我们使用的是一个小时的处理时间窗口并在 12:05 开始运行您的应用程序,并不意味着第一个窗口将在 1:05 关闭。第一个窗口将长 55 分钟,并在 1:00 关闭。

请注意,滑动窗口和滚动窗口分配器所采用的 offset 参数可用于改变窗口的对齐方式。有关详细的信息,请参见 滚动窗口滑动窗口

window 后面可以接 window

比如说:

1
2
3
4
5
6
stream
.keyBy(t -> t.key)
.window(<window assigner>)
.reduce(<reduce function>)
.windowAll(<same window assigner>)
.reduce(<same reduce function>)

可能我们会猜测以 Flink 的能力,想要做到这样看起来是可行的(前提是你使用的是 ReduceFunction 或 AggregateFunction ),但不是。

之所以可行,是因为时间窗口产生的事件是根据窗口结束时的时间分配时间戳的。例如,一个小时小时的窗口所产生的所有事件都将带有标记一个小时结束的时间戳。后面的窗口内的数据消费和前面的流产生的数据是一致的。

空的时间窗口不会输出结果

事件会触发窗口的创建。换句话说,如果在特定的窗口内没有事件,就不会有窗口,就不会有输出结果。

延迟时间会导致延迟聚合

会话窗口的实现是基于窗口的一个抽象能力,窗口可以 _聚合_。会话窗口中的每个数据在初始被消费时,都会被分配一个新的窗口,但是如果窗口之间的间隔足够小,多个窗口就会被聚合。延迟事件可以弥合两个先前分开的会话间隔,从而产生一个虽然有延迟但是更加准确地结果。

参考资料

Flink ETL

Apache Flink 的一种常见应用场景是 ETL(抽取、转换、加载)管道任务。从一个或多个数据源获取数据,进行一些转换操作和信息补充,将结果存储起来。在这个教程中,我们将介绍如何使用 Flink 的 DataStream API 实现这类应用。

这里注意,Flink 的 Table 和 SQL API 完全可以满足很多 ETL 使用场景。但无论你最终是否直接使用 DataStream API,对这里介绍的基本知识有扎实的理解都是有价值的。

无状态的转换

本节涵盖了 map()flatmap(),这两种算子可以用来实现无状态转换的基本操作。

map()

在第一个练习中,你将过滤出租车行程数据中的事件。在同一代码仓库中,有一个 GeoUtils 类,提供了一个静态方法 GeoUtils.mapToGridCell(float lon, float lat),它可以将位置坐标(经度,维度)映射到 100x100 米的对应不同区域的网格单元。

现在让我们为每个出租车行程时间的数据对象增加 startCellendCell 字段。你可以创建一个继承 TaxiRideEnrichedRide 类,添加这些字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static class EnrichedRide extends TaxiRide {
public int startCell;
public int endCell;

public EnrichedRide() {}

public EnrichedRide(TaxiRide ride) {
this.rideId = ride.rideId;
this.isStart = ride.isStart;
...
this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
}

public String toString() {
return super.toString() + "," +
Integer.toString(this.startCell) + "," +
Integer.toString(this.endCell);
}
}

然后你可以创建一个应用来转换这个流

1
2
3
4
5
6
7
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));

DataStream<EnrichedRide> enrichedNYCRides = rides
.filter(new RideCleansingSolution.NYCFilter())
.map(new Enrichment());

enrichedNYCRides.print();

使用这个 MapFunction:

1
2
3
4
5
6
7
public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {

@Override
public EnrichedRide map(TaxiRide taxiRide) throws Exception {
return new EnrichedRide(taxiRide);
}
}

flatmap()

MapFunction 只适用于一对一的转换:对每个进入算子的流元素,map() 将仅输出一个转换后的元素。对于除此以外的场景,你将要使用 flatmap()

1
2
3
4
5
6
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));

DataStream<EnrichedRide> enrichedNYCRides = rides
.flatMap(new NYCEnrichment());

enrichedNYCRides.print();

其中用到的 FlatMapFunction :

1
2
3
4
5
6
7
8
9
10
public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {

@Override
public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
if (valid.filter(taxiRide)) {
out.collect(new EnrichedRide(taxiRide));
}
}
}

使用接口中提供的 Collectorflatmap() 可以输出你想要的任意数量的元素,也可以一个都不发。

Keyed Streams

keyBy()

将一个流根据其中的一些属性来进行分区是十分有用的,这样我们可以使所有具有相同属性的事件分到相同的组里。例如,如果你想找到从每个网格单元出发的最远的出租车行程。按 SQL 查询的方式来考虑,这意味着要对 startCell 进行 GROUP BY 再排序,在 Flink 中这部分可以用 keyBy(KeySelector) 实现。

1
2
3
rides
.flatMap(new NYCEnrichment())
.keyBy(enrichedRide -> enrichedRide.startCell)

每个 keyBy 会通过 shuffle 来为数据流进行重新分区。总体来说这个开销是很大的,它涉及网络通信、序列化和反序列化。

keyBy and network shuffle

通过计算得到键

KeySelector 不仅限于从事件中抽取键。你也可以按想要的方式计算得到键值,只要最终结果是确定的,并且实现了 hashCode()equals()。这些限制条件不包括产生随机数或者返回 Arrays 或 Enums 的 KeySelector,但你可以用元组和 POJO 来组成键,只要他们的元素遵循上述条件。

键必须按确定的方式产生,因为它们会在需要的时候被重新计算,而不是一直被带在流记录中。

例如,比起创建一个新的带有 startCell 字段的 EnrichedRide 类,用这个字段作为 key:

1
keyBy(enrichedRide -> enrichedRide.startCell)

我们更倾向于这样做:

1
keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))

Keyed Stream 的聚合

以下代码为每个行程结束事件创建了一个新的包含 startCell 和时长(分钟)的元组流:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.joda.time.Interval;

DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
.flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {

@Override
public void flatMap(EnrichedRide ride,
Collector<Tuple2<Integer, Minutes>> out) throws Exception {
if (!ride.isStart) {
Interval rideInterval = new Interval(ride.startTime, ride.endTime);
Minutes duration = rideInterval.toDuration().toStandardMinutes();
out.collect(new Tuple2<>(ride.startCell, duration));
}
}
});

现在就可以产生一个流,对每个 startCell 仅包含那些最长行程的数据。

有很多种方法表示使用哪个字段作为键。前面使用 EnrichedRide POJO 的例子,用字段名来指定键。而这个使用 Tuple2 对象的例子中,用字段在元组中的序号(从 0 开始)来指定键。

1
2
3
4
minutesByStartCell
.keyBy(value -> value.f0) // .keyBy(value -> value.startCell)
.maxBy(1) // duration
.print();

现在每次行程时长达到新的最大值,都会输出一条新记录,例如下面这个对应 50797 网格单元的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
...
4> (64549,5M)
4> (46298,18M)
1> (51549,14M)
1> (53043,13M)
1> (56031,22M)
1> (50797,6M)
...
1> (50797,8M)
...
1> (50797,11M)
...
1> (50797,12M)

(隐式的)状态

这是培训中第一个涉及到有状态流的例子。尽管状态的处理是透明的,Flink 必须跟踪每个不同的键的最大时长。

只要应用中有状态,你就应该考虑状态的大小。如果键值的数量是无限的,那 Flink 的状态需要的空间也同样是无限的。

在流处理场景中,考虑有限窗口的聚合往往比整个流聚合更有意义。

reduce() 和其他聚合算子

上面用到的 maxBy() 只是 Flink 中 KeyedStream 上众多聚合函数中的一个。还有一个更通用的 reduce() 函数可以用来实现你的自定义聚合。

有状态的转换

在 Flink 不参与管理状态的情况下,你的应用也可以使用状态,但 Flink 为其管理状态提供了一些引人注目的特性:

  • 本地性: Flink 状态是存储在使用它的机器本地的,并且可以以内存访问速度来获取
  • 持久性: Flink 状态是容错的,例如,它可以自动按一定的时间间隔产生 checkpoint,并且在任务失败后进行恢复
  • 纵向可扩展性: Flink 状态可以存储在集成的 RocksDB 实例中,这种方式下可以通过增加本地磁盘来扩展空间
  • 横向可扩展性: Flink 状态可以随着集群的扩缩容重新分布
  • 可查询性: Flink 状态可以通过使用 状态查询 API 从外部进行查询。

在本节中你将学习如何使用 Flink 的 API 来管理 keyed state。

Rich Functions

至此,你已经看到了 Flink 的几种函数接口,包括 FilterFunctionMapFunction,和 FlatMapFunction。这些都是单一抽象方法模式。

对其中的每一个接口,Flink 同样提供了一个所谓 “rich” 的变体,如 RichFlatMapFunction,其中增加了以下方法,包括:

  • open(Configuration c)
  • close()
  • getRuntimeContext()

open() 仅在算子初始化时调用一次。可以用来加载一些静态数据,或者建立外部服务的链接等。

getRuntimeContext() 为整套潜在有趣的东西提供了一个访问途径,最明显的,它是你创建和访问 Flink 状态的途径。

一个使用 Keyed State 的例子

在这个例子里,想象你有一个要去重的事件数据流,对每个键只保留第一个事件。下面是完成这个功能的应用,使用一个名为 DeduplicatorRichFlatMapFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static class Event {
public final String key;
public final long timestamp;
...
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(new EventSource())
.keyBy(e -> e.key)
.flatMap(new Deduplicator())
.print();

env.execute();
}

为了实现这个功能,Deduplicator 需要记录每个键是否已经有了相应的记录。它将通过使用 Flink 的 keyed state 接口来做这件事。

当你使用像这样的 keyed stream 的时候,Flink 会为每个状态中管理的条目维护一个键值存储。

Flink 支持几种不同方式的 keyed state,这个例子使用的是最简单的一个,叫做 ValueState。意思是对于 每个键 ,Flink 将存储一个单一的对象 —— 在这个例子中,存储的是一个 Boolean 类型的对象。

我们的 Deduplicator 类有两个方法:open()flatMap()open() 方法通过定义 ValueStateDescriptor<Boolean> 建立了管理状态的使用。构造器的参数定义了这个状态的名字(“keyHasBeenSeen”),并且为如何序列化这些对象提供了信息(在这个例子中的 Types.BOOLEAN)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
ValueState<Boolean> keyHasBeenSeen;

@Override
public void open(Configuration conf) {
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
keyHasBeenSeen = getRuntimeContext().getState(desc);
}

@Override
public void flatMap(Event event, Collector<Event> out) throws Exception {
if (keyHasBeenSeen.value() == null) {
out.collect(event);
keyHasBeenSeen.update(true);
}
}
}

当 flatMap 方法调用 keyHasBeenSeen.value() 时,Flink 会在 当前键的上下文 中检索状态值,只有当状态为 null 时,才会输出当前事件。这种情况下,它同时也将更新 keyHasBeenSeentrue

这种访问和更新按键分区的状态的机制也许看上去很神奇,因为在 Deduplicator 的实现中,键不是明确可见的。当 Flink 运行时调用 RichFlatMapFunctionopen 方法时, 是没有事件的,所以这个时候上下文中不含有任何键。但当它调用 flatMap 方法,被处理的事件的键在运行时中就是可用的了,并且被用来确定操作哪个 Flink 状态后端的入口。

部署在分布式集群时,将会有很多 Deduplicator 的实例,每一个实例将负责整个键空间的互斥子集中的一个。所以,当你看到一个单独的 ValueState,比如

1
ValueState<Boolean> keyHasBeenSeen;

要理解这个代表的不仅仅是一个单独的布尔类型变量,而是一个分布式的共享键值存储。

清理状态

上面例子有一个潜在的问题:当键空间是无界的时候将发生什么?Flink 会对每个使用过的键都存储一个 Boolean 类型的实例。如果是键是有限的集合还好,但在键无限增长的应用中,清除再也不会使用的状态是很必要的。这通过在状态对象上调用 clear() 来实现,如下:

1
keyHasBeenSeen.clear()

对一个给定的键值,你也许想在它一段时间不使用后来做这件事。当学习 ProcessFunction 的相关章节时,你将看到在事件驱动的应用中怎么用定时器来做这个。

也可以选择使用 状态的过期时间(TTL),为状态描述符配置你想要旧状态自动被清除的时间。

Non-keyed State

在没有键的上下文中我们也可以使用 Flink 管理的状态。这也被称作 算子的状态。它包含的接口是很不一样的,由于对用户定义的函数来说使用 non-keyed state 是不太常见的,所以这里就不多介绍了。这个特性最常用于 source 和 sink 的实现。

Connected Streams

相比于下面这种预先定义的转换:

simple transformation

有时你想要更灵活地调整转换的某些功能,比如数据流的阈值、规则或者其他参数。Flink 支持这种需求的模式称为 connected streams ,一个单独的算子有两个输入流。

connected streams

connected stream 也可以被用来实现流的关联。

示例

在这个例子中,一个控制流是用来指定哪些词需要从 streamOfWords 里过滤掉的。 一个称为 ControlFunctionRichCoFlatMapFunction 作用于连接的流来实现这个功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> control = env
.fromElements("DROP", "IGNORE")
.keyBy(x -> x);

DataStream<String> streamOfWords = env
.fromElements("Apache", "DROP", "Flink", "IGNORE")
.keyBy(x -> x);

control
.connect(streamOfWords)
.flatMap(new ControlFunction())
.print();

env.execute();
}

这里注意两个流只有键一致的时候才能连接。 keyBy 的作用是将流数据分区,当 keyed stream 被连接时,他们必须按相同的方式分区。这样保证了两个流中所有键相同的事件发到同一个实例上。这样也使按键关联两个流成为可能。

在这个例子中,两个流都是 DataStream<String> 类型的,并且都将字符串作为键。正如你将在下面看到的,RichCoFlatMapFunction 在状态中存了一个布尔类型的变量,这个变量被两个流共享。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private ValueState<Boolean> blocked;

@Override
public void open(Configuration config) {
blocked = getRuntimeContext()
.getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}

@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}

@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(data_value);
}
}
}

RichCoFlatMapFunction 是一种可以被用于一对连接流的 FlatMapFunction,并且它可以调用 rich function 的接口。这意味着它可以是有状态的。

布尔变量 blocked 被用于记录在数据流 control 中出现过的键(在这个例子中是单词),并且这些单词从 streamOfWords 过滤掉。这是 keyed state,并且它是被两个流共享的,这也是为什么两个流必须有相同的键值空间。

在 Flink 运行时中,flatMap1flatMap2 在连接流有新元素到来时被调用 —— 在我们的例子中,control 流中的元素会进入 flatMap1streamOfWords 中的元素会进入 flatMap2。这是由两个流连接的顺序决定的,本例中为 control.connect(streamOfWords)

认识到你没法控制 flatMap1flatMap2 的调用顺序是很重要的。这两个输入流是相互竞争的关系,Flink 运行时将根据从一个流或另一个流中消费的事件做它要做的。对于需要保证时间和/或顺序的场景,你会发现在 Flink 的管理状态中缓存事件一直到它们能够被处理是必须的。(注意:如果你真的感到绝望,可以使用自定义的算子实现 InputSelectable 接口,在两输入算子消费它的输入流时增加一些顺序上的限制。)

参考资料

Flink 事件驱动

处理函数(Process Functions)

简介

ProcessFunction 将事件处理与 Timer,State 结合在一起,使其成为流处理应用的强大构建模块。 这是使用 Flink 创建事件驱动应用程序的基础。它和 RichFlatMapFunction 十分相似, 但是增加了 Timer。

示例

如果你已经体验了 流式分析训练动手实践, 你应该记得,它是采用 TumblingEventTimeWindow 来计算每个小时内每个司机的小费总和, 像下面的示例这样:

1
2
3
4
5
// 计算每个司机每小时的小费总和
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.process(new AddTips());

使用 KeyedProcessFunction 去实现相同的操作更加直接且更有学习意义。 让我们开始用以下代码替换上面的代码:

1
2
3
4
// 计算每个司机每小时的小费总和
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.process(new PseudoWindow(Time.hours(1)));

在这个代码片段中,一个名为 PseudoWindowKeyedProcessFunction 被应用于 KeyedStream, 其结果是一个 DataStream<Tuple3<Long, Long, Float>> (与使用 Flink 内置时间窗口的实现生成的流相同)。

PseudoWindow 的总体轮廓示意如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 在时长跨度为一小时的窗口中计算每个司机的小费总和。
// 司机ID作为 key。
public static class PseudoWindow extends
KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {

private final long durationMsec;

public PseudoWindow(Time duration) {
this.durationMsec = duration.toMilliseconds();
}

@Override
// 在初始化期间调用一次。
public void open(Configuration conf) {
. . .
}

@Override
// 每个票价事件(TaxiFare-Event)输入(到达)时调用,以处理输入的票价事件。
public void processElement(
TaxiFare fare,
Context ctx,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {

. . .
}

@Override
// 当当前水印(watermark)表明窗口现在需要完成的时候调用。
public void onTimer(long timestamp,
OnTimerContext context,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {

. . .
}
}

注意事项:

  • 有几种类型的 ProcessFunctions – 不仅包括 KeyedProcessFunction,还包括 CoProcessFunctionsBroadcastProcessFunctions 等.
  • KeyedProcessFunction 是一种 RichFunction。作为 RichFunction,它可以访问使用 Managed Keyed State 所需的 opengetRuntimeContext 方法。
  • 有两个回调方法须要实现: processElementonTimer。每个输入事件都会调用 processElement 方法; 当计时器触发时调用 onTimer。它们可以是基于事件时间(event time)的 timer,也可以是基于处理时间(processing time)的 timer。 除此之外,processElementonTimer 都提供了一个上下文对象,该对象可用于与 TimerService 交互。 这两个回调还传递了一个可用于发出结果的 Collector

open() 方法

1
2
3
4
5
6
7
8
9
10
11
// 每个窗口都持有托管的 Keyed state 的入口,并且根据窗口的结束时间执行 keyed 策略。
// 每个司机都有一个单独的MapState对象。
private transient MapState<Long, Float> sumOfTips;

@Override
public void open(Configuration conf) {

MapStateDescriptor<Long, Float> sumDesc =
new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
sumOfTips = getRuntimeContext().getMapState(sumDesc);
}

由于票价事件(fare-event)可能会乱序到达,有时需要在计算输出前一个小时结果前,处理下一个小时的事件。 这样能够保证“乱序造成的延迟数据”得到正确处理(放到前一个小时中)。 实际上,如果 Watermark 延迟比窗口长度长得多,则可能有多个窗口同时打开,而不仅仅是两个。 此实现通过使用 MapState 来支持处理这一点,该 MapState 将每个窗口的结束时间戳映射到该窗口的小费总和。

processElement() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void processElement(
TaxiFare fare,
Context ctx,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {

long eventTime = fare.getEventTime();
TimerService timerService = ctx.timerService();

if (eventTime <= timerService.currentWatermark()) {
// 事件延迟;其对应的窗口已经触发。
} else {
// 将 eventTime 向上取值并将结果赋值到包含当前事件的窗口的末尾时间点。
long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);

// 在窗口完成时将启用回调
timerService.registerEventTimeTimer(endOfWindow);

// 将此票价的小费添加到该窗口的总计中。
Float sum = sumOfTips.get(endOfWindow);
if (sum == null) {
sum = 0.0F;
}
sum += fare.tip;
sumOfTips.put(endOfWindow, sum);
}
}

需要考虑的事项:

  • 延迟的事件怎么处理?watermark 后面的事件(即延迟的)正在被删除。 如果你想做一些比这更高级的操作,可以考虑使用旁路输出(Side outputs),这将在下一节中解释。
  • 本例使用一个 MapState,其中 keys 是时间戳(timestamp),并为同一时间戳设置一个 Timer。 这是一种常见的模式;它使得在 Timer 触发时查找相关信息变得简单高效。

onTimer() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public void onTimer(
long timestamp,
OnTimerContext context,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {

long driverId = context.getCurrentKey();
// 查找刚结束的一小时结果。
Float sumOfTips = this.sumOfTips.get(timestamp);

Tuple3<Long, Long, Float> result = Tuple3.of(driverId, timestamp, sumOfTips);
out.collect(result);
this.sumOfTips.remove(timestamp);
}

注意:

  • 传递给 onTimerOnTimerContext context 可用于确定当前 key。
  • 我们的 pseudo-windows 在当前 Watermark 到达每小时结束时触发,此时调用 onTimer。 这个 onTimer 方法从 sumOfTips 中删除相关的条目,这样做的效果是不可能容纳延迟的事件。 这相当于在使用 Flink 的时间窗口时将 allowedLateness 设置为零。

性能考虑

Flink 提供了为 RocksDB 优化的 MapStateListState 类型。 相对于 ValueState,更建议使用 MapStateListState,因为使用 RocksDBStateBackend 的情况下, MapStateListStateValueState 性能更好。 RocksDBStateBackend 可以附加到 ListState,而无需进行(反)序列化, 对于 MapState,每个 key/value 都是一个单独的 RocksDB 对象,因此可以有效地访问和更新 MapState

旁路输出(Side Outputs)

简介

有几个很好的理由希望从 Flink 算子获得多个输出流,如下报告条目:

  • 异常情况(exceptions)
  • 格式错误的事件(malformed events)
  • 延迟的事件(late events)
  • operator 告警(operational alerts),如与外部服务的连接超时

旁路输出(Side outputs)是一种方便的方法。除了错误报告之外,旁路输出也是实现流的 n 路分割的好方法。

示例

现在你可以对上一节中忽略的延迟事件执行某些操作。

Side output channel 与 OutputTag<T> 相关联。这些标记拥有自己的名称,并与对应 DataStream 类型一致。

1
private static final OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {};

上面显示的是一个静态 OutputTag<TaxiFare> ,当在 PseudoWindowprocessElement 方法中发出延迟事件时,可以引用它:

1
2
3
4
5
6
if (eventTime <= timerService.currentWatermark()) {
// 事件延迟,其对应的窗口已经触发。
ctx.output(lateFares, fare);
} else {
. . .
}

以及当在作业的 main 中从该旁路输出访问流时:

1
2
3
4
5
6
// 计算每个司机每小时的小费总和
SingleOutputStreamOperator hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.process(new PseudoWindow(Time.hours(1)));

hourlyTips.getSideOutput(lateFares).print();

或者,可以使用两个同名的 OutputTag 来引用同一个旁路输出,但如果这样做,它们必须具有相同的类型。

结语

在本例中,你已经了解了如何使用 ProcessFunction 重新实现一个简单的时间窗口。 当然,如果 Flink 内置的窗口 API 能够满足你的开发需求,那么一定要优先使用它。 但如果你发现自己在考虑用 Flink 的窗口做些错综复杂的事情,不要害怕自己动手。

此外,ProcessFunctions 对于计算分析之外的许多其他用例也很有用。 下面的实践练习提供了一个完全不同的例子。

ProcessFunctions 的另一个常见用例是清理过时 State。如果你回想一下 Rides and Fares Exercise , 其中使用 RichCoFlatMapFunction 来计算简单 Join,那么示例方案假设 TaxiRides 和 TaxiFares 两个事件是严格匹配为一个有效 数据对(必须同时出现)并且每一组这样的有效数据对都和一个唯一的 rideId 严格对应。如果数据对中的某个 TaxiRides 事件(TaxiFares 事件) 丢失,则同一 rideId 对应的另一个出现的 TaxiFares 事件(TaxiRides 事件)对应的 State 则永远不会被清理掉。 所以这里可以使用 KeyedCoProcessFunction 的实现代替它(RichCoFlatMapFunction),并且可以使用计时器来检测和清除任何过时 的 State。

参考资料