Dunwu Blog

大道至简,知易行难

Flink 事件驱动

处理函数(Process Functions)

简介

ProcessFunction 将事件处理与 Timer,State 结合在一起,使其成为流处理应用的强大构建模块。 这是使用 Flink 创建事件驱动应用程序的基础。它和 RichFlatMapFunction 十分相似, 但是增加了 Timer。

示例

如果你已经体验了 流式分析训练动手实践, 你应该记得,它是采用 TumblingEventTimeWindow 来计算每个小时内每个司机的小费总和, 像下面的示例这样:

1
2
3
4
5
// 计算每个司机每小时的小费总和
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.process(new AddTips());

使用 KeyedProcessFunction 去实现相同的操作更加直接且更有学习意义。 让我们开始用以下代码替换上面的代码:

1
2
3
4
// 计算每个司机每小时的小费总和
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.process(new PseudoWindow(Time.hours(1)));

在这个代码片段中,一个名为 PseudoWindowKeyedProcessFunction 被应用于 KeyedStream, 其结果是一个 DataStream<Tuple3<Long, Long, Float>> (与使用 Flink 内置时间窗口的实现生成的流相同)。

PseudoWindow 的总体轮廓示意如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 在时长跨度为一小时的窗口中计算每个司机的小费总和。
// 司机ID作为 key。
public static class PseudoWindow extends
KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {

private final long durationMsec;

public PseudoWindow(Time duration) {
this.durationMsec = duration.toMilliseconds();
}

@Override
// 在初始化期间调用一次。
public void open(Configuration conf) {
. . .
}

@Override
// 每个票价事件(TaxiFare-Event)输入(到达)时调用,以处理输入的票价事件。
public void processElement(
TaxiFare fare,
Context ctx,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {

. . .
}

@Override
// 当当前水印(watermark)表明窗口现在需要完成的时候调用。
public void onTimer(long timestamp,
OnTimerContext context,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {

. . .
}
}

注意事项:

  • 有几种类型的 ProcessFunctions – 不仅包括 KeyedProcessFunction,还包括 CoProcessFunctionsBroadcastProcessFunctions 等.
  • KeyedProcessFunction 是一种 RichFunction。作为 RichFunction,它可以访问使用 Managed Keyed State 所需的 opengetRuntimeContext 方法。
  • 有两个回调方法须要实现: processElementonTimer。每个输入事件都会调用 processElement 方法; 当计时器触发时调用 onTimer。它们可以是基于事件时间(event time)的 timer,也可以是基于处理时间(processing time)的 timer。 除此之外,processElementonTimer 都提供了一个上下文对象,该对象可用于与 TimerService 交互。 这两个回调还传递了一个可用于发出结果的 Collector

open() 方法

1
2
3
4
5
6
7
8
9
10
11
// 每个窗口都持有托管的 Keyed state 的入口,并且根据窗口的结束时间执行 keyed 策略。
// 每个司机都有一个单独的MapState对象。
private transient MapState<Long, Float> sumOfTips;

@Override
public void open(Configuration conf) {

MapStateDescriptor<Long, Float> sumDesc =
new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
sumOfTips = getRuntimeContext().getMapState(sumDesc);
}

由于票价事件(fare-event)可能会乱序到达,有时需要在计算输出前一个小时结果前,处理下一个小时的事件。 这样能够保证“乱序造成的延迟数据”得到正确处理(放到前一个小时中)。 实际上,如果 Watermark 延迟比窗口长度长得多,则可能有多个窗口同时打开,而不仅仅是两个。 此实现通过使用 MapState 来支持处理这一点,该 MapState 将每个窗口的结束时间戳映射到该窗口的小费总和。

processElement() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void processElement(
TaxiFare fare,
Context ctx,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {

long eventTime = fare.getEventTime();
TimerService timerService = ctx.timerService();

if (eventTime <= timerService.currentWatermark()) {
// 事件延迟;其对应的窗口已经触发。
} else {
// 将 eventTime 向上取值并将结果赋值到包含当前事件的窗口的末尾时间点。
long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);

// 在窗口完成时将启用回调
timerService.registerEventTimeTimer(endOfWindow);

// 将此票价的小费添加到该窗口的总计中。
Float sum = sumOfTips.get(endOfWindow);
if (sum == null) {
sum = 0.0F;
}
sum += fare.tip;
sumOfTips.put(endOfWindow, sum);
}
}

需要考虑的事项:

  • 延迟的事件怎么处理?watermark 后面的事件(即延迟的)正在被删除。 如果你想做一些比这更高级的操作,可以考虑使用旁路输出(Side outputs),这将在下一节中解释。
  • 本例使用一个 MapState,其中 keys 是时间戳(timestamp),并为同一时间戳设置一个 Timer。 这是一种常见的模式;它使得在 Timer 触发时查找相关信息变得简单高效。

onTimer() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public void onTimer(
long timestamp,
OnTimerContext context,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {

long driverId = context.getCurrentKey();
// 查找刚结束的一小时结果。
Float sumOfTips = this.sumOfTips.get(timestamp);

Tuple3<Long, Long, Float> result = Tuple3.of(driverId, timestamp, sumOfTips);
out.collect(result);
this.sumOfTips.remove(timestamp);
}

注意:

  • 传递给 onTimerOnTimerContext context 可用于确定当前 key。
  • 我们的 pseudo-windows 在当前 Watermark 到达每小时结束时触发,此时调用 onTimer。 这个 onTimer 方法从 sumOfTips 中删除相关的条目,这样做的效果是不可能容纳延迟的事件。 这相当于在使用 Flink 的时间窗口时将 allowedLateness 设置为零。

性能考虑

Flink 提供了为 RocksDB 优化的 MapStateListState 类型。 相对于 ValueState,更建议使用 MapStateListState,因为使用 RocksDBStateBackend 的情况下, MapStateListStateValueState 性能更好。 RocksDBStateBackend 可以附加到 ListState,而无需进行(反)序列化, 对于 MapState,每个 key/value 都是一个单独的 RocksDB 对象,因此可以有效地访问和更新 MapState

旁路输出(Side Outputs)

简介

有几个很好的理由希望从 Flink 算子获得多个输出流,如下报告条目:

  • 异常情况(exceptions)
  • 格式错误的事件(malformed events)
  • 延迟的事件(late events)
  • operator 告警(operational alerts),如与外部服务的连接超时

旁路输出(Side outputs)是一种方便的方法。除了错误报告之外,旁路输出也是实现流的 n 路分割的好方法。

示例

现在你可以对上一节中忽略的延迟事件执行某些操作。

Side output channel 与 OutputTag<T> 相关联。这些标记拥有自己的名称,并与对应 DataStream 类型一致。

1
private static final OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {};

上面显示的是一个静态 OutputTag<TaxiFare> ,当在 PseudoWindowprocessElement 方法中发出延迟事件时,可以引用它:

1
2
3
4
5
6
if (eventTime <= timerService.currentWatermark()) {
// 事件延迟,其对应的窗口已经触发。
ctx.output(lateFares, fare);
} else {
. . .
}

以及当在作业的 main 中从该旁路输出访问流时:

1
2
3
4
5
6
// 计算每个司机每小时的小费总和
SingleOutputStreamOperator hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.process(new PseudoWindow(Time.hours(1)));

hourlyTips.getSideOutput(lateFares).print();

或者,可以使用两个同名的 OutputTag 来引用同一个旁路输出,但如果这样做,它们必须具有相同的类型。

结语

在本例中,你已经了解了如何使用 ProcessFunction 重新实现一个简单的时间窗口。 当然,如果 Flink 内置的窗口 API 能够满足你的开发需求,那么一定要优先使用它。 但如果你发现自己在考虑用 Flink 的窗口做些错综复杂的事情,不要害怕自己动手。

此外,ProcessFunctions 对于计算分析之外的许多其他用例也很有用。 下面的实践练习提供了一个完全不同的例子。

ProcessFunctions 的另一个常见用例是清理过时 State。如果你回想一下 Rides and Fares Exercise , 其中使用 RichCoFlatMapFunction 来计算简单 Join,那么示例方案假设 TaxiRides 和 TaxiFares 两个事件是严格匹配为一个有效 数据对(必须同时出现)并且每一组这样的有效数据对都和一个唯一的 rideId 严格对应。如果数据对中的某个 TaxiRides 事件(TaxiFares 事件) 丢失,则同一 rideId 对应的另一个出现的 TaxiFares 事件(TaxiRides 事件)对应的 State 则永远不会被清理掉。 所以这里可以使用 KeyedCoProcessFunction 的实现代替它(RichCoFlatMapFunction),并且可以使用计时器来检测和清除任何过时 的 State。

参考资料

Flink 架构

Apache Flink 是一个分布式系统,它需要计算资源来执行应用程序。Flink 集成了所有常见的集群资源管理器,例如 Hadoop YARNApache MesosKubernetes,但同时也可以作为独立集群运行。

Flink 被设计为能够很好地工作在上述每个资源管理器中,这是通过资源管理器特定(resource-manager-specific)的部署模式实现的。Flink 可以采用与当前资源管理器相适应的方式进行交互。

部署 Flink 应用程序时,Flink 会根据应用程序配置的并行性自动标识所需的资源,并从资源管理器请求这些资源。在发生故障的情况下,Flink 通过请求新资源来替换发生故障的容器。提交或控制应用程序的所有通信都是通过 REST 调用进行的,这可以简化 Flink 与各种环境中的集成。

运行任意规模应用

Flink 旨在任意规模上运行有状态流式应用。因此,应用程序被并行化为可能数千个任务,这些任务分布在集群中并发执行。所以应用程序能够充分利用无尽的 CPU、内存、磁盘和网络 IO。而且 Flink 很容易维护非常大的应用程序状态。其异步和增量的检查点算法对处理延迟产生最小的影响,同时保证精确一次状态的一致性。

Flink 用户报告了其生产环境中一些令人印象深刻的扩展性数字

  • 处理每天处理数万亿的事件,
  • 应用维护几 TB 大小的状态
  • 应用在数千个内核上运行

利用内存性能

有状态的 Flink 程序针对本地状态访问进行了优化。任务的状态始终保留在内存中,如果状态大小超过可用内存,则会保存在能高效访问的磁盘数据结构中。任务通过访问本地(通常在内存中)状态来进行所有的计算,从而产生非常低的处理延迟。Flink 通过定期和异步地对本地状态进行持久化存储来保证故障场景下精确一次的状态一致性。

img

Flink 运行时由两种类型的进程组成:一个 JobManager 和一个或者多个 **TaskManager**。

The processes involved in executing a Flink dataflow

Client 不是运行时和程序执行的一部分,而是用于准备数据流并将其发送给 JobManager。之后,客户端可以断开连接(_分离模式_),或保持连接来接收进程报告(_附加模式_)。客户端可以作为触发执行 Java/Scala 程序的一部分运行,也可以在命令行进程 ./bin/flink run ... 中运行。

可以通过多种方式启动 JobManagerTaskManager:直接在机器上作为 standalone 集群启动、在容器中启动、或者通过 YARN 等资源框架管理并启动。TaskManager 连接到 JobManager,宣布自己可用,并被分配工作。

JobManager

JobManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责,它决定

  • 何时调度下一个 task(或一组 task)
  • 对完成的 task 或执行失败做出反应
  • 协调 checkpoint
  • 协调从失败中恢复
  • 等等

JobManager 由三个不同的组件组成:

  • ResourceManager:负责 Flink 集群中的资源提供、回收、分配 - 它管理 task slots,这是 Flink 集群中资源调度的单位(请参考 TaskManager。Flink 为不同的环境和资源提供者(例如 YARN、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。在 standalone 设置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行启动新的 TaskManager。

  • Dispatcher:提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执行信息。

  • JobMaster:负责管理单个 JobGraph 的执行。Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster。

始终至少有一个 JobManager。高可用(HA)设置中可能有多个 JobManager,其中一个始终是 _leader_,其他的则是 _standby_(请参考 高可用(HA))。

TaskManager

_TaskManager_(也称为 _worker_)执行作业流的 task,并且缓存和交换数据流。

必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task _slot_。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子(请参考Tasks 和算子链)。

Tasks 和算子链

对于分布式执行,Flink 将算子的 subtasks 链接成 _tasks_。每个 task 由一个线程执行。将算子链接成 task 是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量。链行为是可以配置的;请参考链文档以获取详细信息。

下图中样例数据流用 5 个 subtask 执行,因此有 5 个并行线程。

Operator chaining into Tasks

Task Slots 和资源

每个 worker(TaskManager)都是一个 _JVM 进程_,可以在单独的线程中执行一个或多个 subtask。为了控制一个 TaskManager 中接受多少个 task,就有了所谓的 task slots(至少一个)。

每个 task slot 代表 TaskManager 中资源的固定子集。例如,具有 3 个 slot 的 TaskManager,会将其托管内存 1/3 用于每个 slot。分配资源意味着 subtask 不会与其他作业的 subtask 竞争托管内存,而是具有一定数量的保留托管内存。注意此处没有 CPU 隔离;当前 slot 仅分离 task 的托管内存。

通过调整 task slot 的数量,用户可以定义 subtask 如何互相隔离。每个 TaskManager 有一个 slot,这意味着每个 task 组都在单独的 JVM 中运行(例如,可以在单独的容器中启动)。具有多个 slot 意味着更多 subtask 共享同一 JVM。同一 JVM 中的 task 共享 TCP 连接(通过多路复用)和心跳信息。它们还可以共享数据集和数据结构,从而减少了每个 task 的开销。

A TaskManager with Task Slots and Tasks

默认情况下,Flink 允许 subtask 共享 slot,即便它们是不同的 task 的 subtask,只要是来自于同一作业即可。结果就是一个 slot 可以持有整个作业管道。允许 slot 共享有两个主要优点:

  • Flink 集群所需的 task slot 和作业中使用的最大并行度恰好一样。无需计算程序总共包含多少个 task(具有不同并行度)。
  • 容易获得更好的资源利用。如果没有 slot 共享,非密集 subtask(_source/map()_)将阻塞和密集型 subtask(_window_) 一样多的资源。通过 slot 共享,我们示例中的基本并行度从 2 增加到 6,可以充分利用分配的资源,同时确保繁重的 subtask 在 TaskManager 之间公平分配。

TaskManagers with shared Task Slots

Flink 应用程序 是从其 main() 方法产生的一个或多个 Flink 作业的任何用户程序。这些作业的执行可以在本地 JVM(LocalEnvironment)中进行,或具有多台机器的集群的远程设置(RemoteEnvironment)中进行。对于每个程序,ExecutionEnvironment 提供了一些方法来控制作业执行(例如设置并行度)并与外界交互(请参考 Flink 程序剖析 )。

Flink 应用程序的作业可以被提交到长期运行的 Flink Session 集群、专用的 Flink Job 集群Flink Application 集群。这些选项之间的差异主要与集群的生命周期和资源隔离保证有关。

  • 集群生命周期:在 Flink Session 集群中,客户端连接到一个预先存在的、长期运行的集群,该集群可以接受多个作业提交。即使所有作业完成后,集群(和 JobManager)仍将继续运行直到手动停止 session 为止。因此,Flink Session 集群的寿命不受任何 Flink 作业寿命的约束。
  • 资源隔离:TaskManager slot 由 ResourceManager 在提交作业时分配,并在作业完成时释放。由于所有作业都共享同一集群,因此在集群资源方面存在一些竞争 — 例如提交工作阶段的网络带宽。此共享设置的局限性在于,如果 TaskManager 崩溃,则在此 TaskManager 上运行 task 的所有作业都将失败;类似的,如果 JobManager 上发生一些致命错误,它将影响集群中正在运行的所有作业。
  • 其他注意事项:拥有一个预先存在的集群可以节省大量时间申请资源和启动 TaskManager。有种场景很重要,作业执行时间短并且启动时间长会对端到端的用户体验产生负面的影响 — 就像对简短查询的交互式分析一样,希望作业可以使用现有资源快速执行计算。

以前,Flink Session 集群也被称为 session 模式下的 Flink 集群。

  • 集群生命周期:在 Flink Job 集群中,可用的集群管理器(例如 YARN)用于为每个提交的作业启动一个集群,并且该集群仅可用于该作业。在这里,客户端首先从集群管理器请求资源启动 JobManager,然后将作业提交给在这个进程中运行的 Dispatcher。然后根据作业的资源请求惰性的分配 TaskManager。一旦作业完成,Flink Job 集群将被拆除。
  • 资源隔离:JobManager 中的致命错误仅影响在 Flink Job 集群中运行的一个作业。
  • 其他注意事项:由于 ResourceManager 必须应用并等待外部资源管理组件来启动 TaskManager 进程和分配资源,因此 Flink Job 集群更适合长期运行、具有高稳定性要求且对较长的启动时间不敏感的大型作业。

以前,Flink Job 集群也被称为 job (or per-job) 模式下的 Flink 集群。

Kubernetes 不支持 Flink Job 集群。 请参考 Standalone KubernetesNative Kubernetes

  • 集群生命周期:Flink Application 集群是专用的 Flink 集群,仅从 Flink 应用程序执行作业,并且 main()方法在集群上而不是客户端上运行。提交作业是一个单步骤过程:无需先启动 Flink 集群,然后将作业提交到现有的 session 集群;相反,将应用程序逻辑和依赖打包成一个可执行的作业 JAR 中,并且集群入口(ApplicationClusterEntryPoint)负责调用 main()方法来提取 JobGraph。例如,这允许你像在 Kubernetes 上部署任何其他应用程序一样部署 Flink 应用程序。因此,Flink Application 集群的寿命与 Flink 应用程序的寿命有关。
  • 资源隔离:在 Flink Application 集群中,ResourceManager 和 Dispatcher 作用于单个的 Flink 应用程序,相比于 Flink Session 集群,它提供了更好的隔离。

Flink Job 集群可以看做是 Flink Application 集群”客户端运行“的替代方案。

默认情况下,每个 Flink 集群只有一个 JobManager 实例。这会导致 _单点故障(SPOF)_:如果 JobManager 崩溃,则不能提交任何新程序,运行中的程序也会失败。

使用 JobManager 高可用模式,你可以从 JobManager 失败中恢复,从而消除单点故障。

如何启用集群高可用

JobManager 高可用是指,在任何时候都有一个 JobManager Leader,如果 Leader 出现故障,则有多个备用 JobManager 来接管领导。这解决了单点故障问题,只要有备用 JobManager 担任领导者,程序就可以继续运行。

如下是一个使用三个 JobManager 实例的例子:

img

Flink 的 高可用服务 封装了所需的服务,使一切可以正常工作:

  • 领导者选举:从 n 个候选者中选出一个领导者
  • 服务发现:检索当前领导者的地址
  • 状态持久化:继承程序恢复作业所需的持久化状态(JobGraphs、用户代码 jar、已完成的检查点)

Flink 提供了两种高可用服务实现:

  • ZooKeeper:每个 Flink 集群部署都可以使用 ZooKeeper HA 服务。它们需要一个运行的 ZooKeeper 复制组(quorum)。
  • Kubernetes:Kubernetes HA 服务只能运行在 Kubernetes 上。

高可用数据生命周期

为了恢复提交的作业,Flink 持久化元数据和 job 组件。高可用数据将一直保存,直到相应的作业执行成功、被取消或最终失败。当这些情况发生时,将删除所有高可用数据,包括存储在高可用服务中的元数据。

参考资料

Flink 简介

关键概念:源源不断的流式数据处理、事件时间、有状态流处理和状态快照

流处理

任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。

数据可以被作为 无界 或者 有界 流来处理。

  1. 无界流 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
  2. 有界流 有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。

Bounded and unbounded streams

Apache Flink 擅长处理无界和有界数据集 精确的时间控制和状态化使得 Flink 的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。

批处理是有界数据流处理的范例。在这种模式下,你可以选择在计算结果输出之前输入整个数据集,这也就意味着你可以对整个数据集的数据进行排序、统计或汇总计算后再输出结果。

流处理正相反,其涉及无界数据流。至少理论上来说,它的数据输入永远不会结束,因此程序必须持续不断地对到达的数据进行处理。

在 Flink 中,应用程序由用户自定义算子转换而来的流式 dataflows 所组成。这些流式 dataflows 形成了有向图,以一个或多个(source)开始,并以一个或多个(sink)结束。

A DataStream program, and its dataflow.

通常,程序代码中的 transformation 和 dataflow 中的算子(operator)之间是一一对应的。但有时也会出现一个 transformation 包含多个算子的情况,如上图所示。

Flink 应用程序可以消费来自消息队列或分布式日志这类流式数据源(例如 Apache Kafka 或 Kinesis)的实时数据,也可以从各种的数据源中消费有界的历史数据。同样,Flink 应用程序生成的结果流也可以发送到各种数据汇中。

Flink application with sources and sinks

并行 Dataflows

Flink 程序本质上是分布式并行程序。在程序执行期间,一个流有一个或多个流分区(Stream Partition),每个算子有一个或多个算子子任务(Operator Subtask)。每个子任务彼此独立,并在不同的线程中运行,或在不同的计算机或容器中运行。

算子子任务数就是其对应算子的并行度。在同一程序中,不同算子也可能具有不同的并行度。

A parallel dataflow

Flink 算子之间可以通过一对一(_直传_)模式或重新分发模式传输数据:

  • 一对一模式(例如上图中的 Sourcemap() 算子之间)可以保留元素的分区和顺序信息。这意味着 map() 算子的 subtask[1] 输入的数据以及其顺序与 Source 算子的 subtask[1] 输出的数据和顺序完全相同,即同一分区的数据只会进入到下游算子的同一分区。
  • 重新分发模式(例如上图中的 map()keyBy/window 之间,以及 keyBy/windowSink 之间)则会更改数据所在的流分区。当你在程序中选择使用不同的 _transformation_,每个算子子任务也会根据不同的 transformation 将数据发送到不同的目标子任务。例如以下这几种 transformation 和其对应分发数据的模式:_keyBy()_(通过散列键重新分区)、_broadcast()_(广播)或 _rebalance()(随机重新分发)。在重新分发数据的过程中,元素只有在每对输出和输入子任务之间才能保留其之间的顺序信息(例如,_keyBy/window 的 subtask[2] 接收到的 map() 的 subtask[1] 中的元素都是有序的)。因此,上图所示的 keyBy/windowSink 算子之间数据的重新分发时,不同键(key)的聚合结果到达 Sink 的顺序是不确定的。

自定义时间流处理

对于大多数流数据处理应用程序而言,能够使用处理实时数据的代码重新处理历史数据并产生确定并一致的结果非常有价值。

在处理流式数据时,我们通常更需要关注事件本身发生的顺序而不是事件被传输以及处理的顺序,因为这能够帮助我们推理出一组事件(事件集合)是何时发生以及结束的。例如电子商务交易或金融交易中涉及到的事件集合。

为了满足上述这类的实时流处理场景,我们通常会使用记录在数据流中的事件时间的时间戳,而不是处理数据的机器时钟的时间戳。

有状态流处理

Flink 中的算子可以是有状态的。这意味着如何处理一个事件可能取决于该事件之前所有事件数据的累积结果。Flink 中的状态不仅可以用于简单的场景(例如统计仪表板上每分钟显示的数据),也可以用于复杂的场景(例如训练作弊检测模型)。

Flink 应用程序可以在分布式集群上并行运行,其中每个算子的各个并行实例会在单独的线程中独立运行,并且通常情况下是会在不同的机器上运行。

有状态算子的并行实例组在存储其对应状态时通常是按照键(key)进行分片存储的。每个并行实例算子负责处理一组特定键的事件数据,并且这组键对应的状态会保存在本地。

如下图的 Flink 作业,其前三个算子的并行度为 2,最后一个 sink 算子的并行度为 1,其中第三个算子是有状态的,并且你可以看到第二个算子和第三个算子之间是全互联的(fully-connected),它们之间通过网络进行数据分发。通常情况下,实现这种类型的 Flink 程序是为了通过某些键对数据流进行分区,以便将需要一起处理的事件进行汇合,然后做统一计算处理。

State is sharded

Flink 应用程序的状态访问都在本地进行,因为这有助于其提高吞吐量和降低延迟。通常情况下 Flink 应用程序都是将状态存储在 JVM 堆上,但如果状态太大,我们也可以选择将其以结构化数据格式存储在高速磁盘中。

State is local

通过状态快照实现的容错

通过状态快照和流重放两种方式的组合,Flink 能够提供可容错的,精确一次计算的语义。这些状态快照在执行时会获取并存储分布式 pipeline 中整体的状态,它会将数据源中消费数据的偏移量记录下来,并将整个 job graph 中算子获取到该数据(记录的偏移量对应的数据)时的状态记录并存储下来。当发生故障时,Flink 作业会恢复上次存储的状态,重置数据源从状态中记录的上次消费的偏移量开始重新进行消费处理。而且状态快照在执行时会异步获取状态并存储,并不会阻塞正在进行的数据处理逻辑。

状态

只有在每一个单独的事件上进行转换操作的应用才不需要状态,换言之,每一个具有一定复杂度的流处理应用都是有状态的。任何运行基本业务逻辑的流处理应用都需要在一定时间内存储所接收的事件或中间结果,以供后续的某个时间点(例如收到下一个事件或者经过一段特定时间)进行访问并进行后续处理。

img

应用状态是 Flink 中的一等公民,Flink 提供了许多状态管理相关的特性支持,其中包括:

  • 多种状态基础类型:Flink 为多种不同的数据结构提供了相对应的状态基础类型,例如原子值(value),列表(list)以及映射(map)。开发者可以基于处理函数对状态的访问方式,选择最高效、最适合的状态基础类型。
  • 插件化的 State Backend:State Backend 负责管理应用程序状态,并在需要的时候进行 checkpoint。Flink 支持多种 state backend,可以将状态存在内存或者 RocksDB。RocksDB 是一种高效的嵌入式、持久化键值存储引擎。Flink 也支持插件式的自定义 state backend 进行状态存储。
  • 精确一次语义:Flink 的 checkpoint 和故障恢复算法保证了故障发生后应用状态的一致性。因此,Flink 能够在应用程序发生故障时,对应用程序透明,不造成正确性的影响。
  • 超大数据量状态:Flink 能够利用其异步以及增量式的 checkpoint 算法,存储数 TB 级别的应用状态。
  • 可弹性伸缩的应用:Flink 能够通过在更多或更少的工作节点上对状态进行重新分布,支持有状态应用的分布式的横向伸缩。

时间

时间语义

Flink 支持以下三种时间语义:

  • **事件时间(event time)**: 事件产生的时间,记录的是设备生产(或者存储)事件的时间
  • **摄取时间(ingestion time)**: Flink 读取事件时记录的时间
  • **处理时间(processing time)**: Flink pipeline 中具体算子处理事件的时间

为了获得可重现的结果,例如在计算过去的特定一天里第一个小时股票的最高价格时,我们应该使用事件时间。这样的话,无论什么时间去计算都不会影响输出结果。然而如果使用处理时间的话,实时应用程序的结果是由程序运行的时间所决定。多次运行基于处理时间的实时程序,可能得到的结果都不相同,也可能会导致再次分析历史数据或者测试新代码变得异常困难。

Event Time

如果想要使用事件时间,需要额外给 Flink 提供一个时间戳提取器和 Watermark 生成器,Flink 将使用它们来跟踪事件时间的进度。

Watermark

让我们通过一个简单的示例来演示为什么需要 watermarks 及其工作方式。

在此示例中,我们将看到带有混乱时间戳的事件流,如下所示。显示的数字表达的是这些事件实际发生时间的时间戳。到达的第一个事件发生在时间 4,随后发生的事件发生在更早的时间 2,依此类推:

··· 23 19 22 24 21 14 17 13 12 15 9 11 7 2 4 →

假设我们要对数据流排序,我们想要达到的目的是:应用程序应该在数据流里的事件到达时就有一个算子(我们暂且称之为排序)开始处理事件,这个算子所输出的流是按照时间戳排序好的。

让我们重新审视这些数据:

(1) 我们的排序器看到的第一个事件的时间戳是 4,但是我们不能立即将其作为已排序的流释放。因为我们并不能确定它是有序的,并且较早的事件有可能并未到达。事实上,如果站在上帝视角,我们知道,必须要等到时间戳为 2 的元素到来时,排序器才可以有事件输出。

需要一些缓冲,需要一些时间,但这都是值得的

(2) 接下来的这一步,如果我们选择的是固执的等待,我们永远不会有结果。首先,我们看到了时间戳为 4 的事件,然后看到了时间戳为 2 的事件。可是,时间戳小于 2 的事件接下来会不会到来呢?可能会,也可能不会。再次站在上帝视角,我们知道,我们永远不会看到时间戳 1。

最终,我们必须勇于承担责任,并发出指令,把带有时间戳 2 的事件作为已排序的事件流的开始

(3) 然后,我们需要一种策略,该策略定义:对于任何给定时间戳的事件,Flink 何时停止等待较早事件的到来。

这正是 watermarks 的作用 — 它们定义何时停止等待较早的事件。

Flink 中事件时间的处理取决于 _watermark 生成器_,后者将带有时间戳的特殊元素插入流中形成 _watermarks_。事件时间 t 的 watermark 代表 t 之前(很可能)都已经到达。

当 watermark 以 2 或更大的时间戳到达时,事件流的排序器应停止等待,并输出 2 作为已经排序好的流。

(4) 我们可能会思考,如何决定 watermarks 的不同生成策略

每个事件都会延迟一段时间后到达,然而这些延迟有所不同,有些事件可能比其他事件延迟得更多。一种简单的方法是假定这些延迟受某个最大延迟的限制。Flink 将此策略称为 最大无序边界 (bounded-out-of-orderness) watermark。当然,我们可以想像出更好的生成 watermark 的方法,但是对于大多数应用而言,固定延迟策略已经足够了。

延迟 VS 正确性

watermarks 给了开发者流处理的一种选择,它们使开发人员在开发应用程序时可以控制延迟和完整性之间的权衡。与批处理不同,批处理中的奢侈之处在于可以在产生任何结果之前完全了解输入,而使用流式传输,我们不被允许等待所有的时间都产生了,才输出排序好的数据,这与流相违背。

我们可以把 watermarks 的边界时间配置的相对较短,从而冒着在输入了解不完全的情况下产生结果的风险-即可能会很快产生错误结果。或者,你可以等待更长的时间,并利用对输入流的更全面的了解来产生结果。

当然也可以实施混合解决方案,先快速产生初步结果,然后在处理其他(最新)数据时向这些结果提供更新。对于有一些对延迟的容忍程度很低,但是又对结果有很严格的要求的场景下,或许是一个福音。

延迟

延迟是相对于 watermarks 定义的。Watermark(t) 表示事件流的时间已经到达了 t; watermark 之后的时间戳 ≤ t 的任何事件都被称之为延迟事件。

使用 Watermarks

如果想要使用基于带有事件时间戳的事件流,Flink 需要知道与每个事件相关的时间戳,而且流必须包含 watermark。

动手练习中使用的出租车数据源已经为我们处理了这些详细信息。但是,在您自己的应用程序中,您将必须自己进行处理,这通常是通过实现一个类来实现的,该类从事件中提取时间戳,并根据需要生成 watermarks。最简单的方法是使用 WatermarkStrategy

1
2
3
4
5
6
7
8
DataStream<Event> stream = ...

WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.timestamp);

DataStream<Event> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(strategy);

窗口

我们在操作无界数据流时,经常需要应对以下问题,我们经常把无界数据流分解成有界数据流聚合分析:

  • 每分钟的浏览量
  • 每位用户每周的会话数
  • 每个传感器每分钟的最高温度

用 Flink 计算窗口分析取决于两个主要的抽象操作:_Window Assigners_,将事件分配给窗口(根据需要创建新的窗口对象),以及 _Window Functions_,处理窗口内的数据。

Flink 的窗口 API 还具有 TriggersEvictors 的概念,Triggers 确定何时调用窗口函数,而 Evictors 则可以删除在窗口中收集的元素。

举一个简单的例子,我们一般这样使用键控事件流(基于 key 分组的输入事件流):

1
2
3
4
stream.
.keyBy(<key selector>)
.window(<window assigner>)
.reduce|aggregate|process(<window function>)

您不是必须使用键控事件流(keyed stream),但是值得注意的是,如果不使用键控事件流,我们的程序就不能 并行 处理。

1
2
3
stream.
.windowAll(<window assigner>)
.reduce|aggregate|process(<window function>)

窗口分配器

Flink 有一些内置的窗口分配器,如下所示:

Window assigners

通过一些示例来展示关于这些窗口如何使用,或者如何区分它们:

  • 滚动时间窗口
    • 每分钟页面浏览量
    • TumblingEventTimeWindows.of(Time.minutes(1))
  • 滑动时间窗口
    • 每 10 秒钟计算前 1 分钟的页面浏览量
    • SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))
  • 会话窗口
    • 每个会话的网页浏览量,其中会话之间的间隔至少为 30 分钟
    • EventTimeSessionWindows.withGap(Time.minutes(30))

以下都是一些可以使用的间隔时间 Time.milliseconds(n), Time.seconds(n), Time.minutes(n), Time.hours(n), 和 Time.days(n)

基于时间的窗口分配器(包括会话时间)既可以处理 事件时间,也可以处理 处理时间。这两种基于时间的处理没有哪一个更好,我们必须折衷。使用 处理时间,我们必须接受以下限制:

  • 无法正确处理历史数据,
  • 无法正确处理超过最大无序边界的数据,
  • 结果将是不确定的,

但是有自己的优势,较低的延迟。

使用基于计数的窗口时,请记住,只有窗口内的事件数量到达窗口要求的数值时,这些窗口才会触发计算。尽管可以使用自定义触发器自己实现该行为,但无法应对超时和处理部分窗口。

我们可能在有些场景下,想使用全局 window assigner 将每个事件(相同的 key)都分配给某一个指定的全局窗口。 很多情况下,一个比较好的建议是使用 ProcessFunction,具体介绍在这里

窗口应用函数

我们有三种最基本的操作窗口内的事件的选项:

  1. 像批量处理,ProcessWindowFunction 会缓存 Iterable 和窗口内容,供接下来全量计算;
  2. 或者像流处理,每一次有事件被分配到窗口时,都会调用 ReduceFunction 或者 AggregateFunction 来增量计算;
  3. 或者结合两者,通过 ReduceFunction 或者 AggregateFunction 预聚合的增量计算结果在触发窗口时, 提供给 ProcessWindowFunction 做全量计算。

接下来展示一段 1 和 3 的示例,每一个实现都是计算传感器的最大值。在每一个一分钟大小的事件时间窗口内, 生成一个包含 (key,end-of-window-timestamp, max_value) 的一组结果。

ProcessWindowFunction 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
DataStream<SensorReading> input = ...

input
.keyBy(x -> x.key)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(new MyWastefulMax());

public static class MyWastefulMax extends ProcessWindowFunction<
SensorReading, // 输入类型
Tuple3<String, Long, Integer>, // 输出类型
String, // 键类型
TimeWindow> { // 窗口类型

@Override
public void process(
String key,
Context context,
Iterable<SensorReading> events,
Collector<Tuple3<String, Long, Integer>> out) {

int max = 0;
for (SensorReading event : events) {
max = Math.max(event.value, max);
}
out.collect(Tuple3.of(key, context.window().getEnd(), max));
}
}

在当前实现中有一些值得关注的地方:

  • Flink 会缓存所有分配给窗口的事件流,直到触发窗口为止。这个操作可能是相当昂贵的。
  • Flink 会传递给 ProcessWindowFunction 一个 Context 对象,这个对象内包含了一些窗口信息。Context 接口 展示大致如下:
1
2
3
4
5
6
7
8
9
public abstract class Context implements java.io.Serializable {
public abstract W window();

public abstract long currentProcessingTime();
public abstract long currentWatermark();

public abstract KeyedStateStore windowState();
public abstract KeyedStateStore globalState();
}

windowStateglobalState 可以用来存储当前的窗口的 key、窗口或者当前 key 的每一个窗口信息。这在一些场景下会很有用,试想,我们在处理当前窗口的时候,可能会用到上一个窗口的信息。

增量聚合示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
DataStream<SensorReading> input = ...

input
.keyBy(x -> x.key)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(new MyReducingMax(), new MyWindowFunction());

private static class MyReducingMax implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r1 : r2;
}
}

private static class MyWindowFunction extends ProcessWindowFunction<
SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {

@Override
public void process(
String key,
Context context,
Iterable<SensorReading> maxReading,
Collector<Tuple3<String, Long, SensorReading>> out) {

SensorReading max = maxReading.iterator().next();
out.collect(Tuple3.of(key, context.window().getEnd(), max));
}
}

请注意 Iterable<SensorReading> 将只包含一个读数 – MyReducingMax 计算出的预先汇总的最大值。

晚到的事件

默认场景下,超过最大无序边界的事件会被删除,但是 Flink 给了我们两个选择去控制这些事件。

您可以使用一种称为旁路输出 的机制来安排将要删除的事件收集到侧输出流中,这里是一个示例:

1
2
3
4
5
6
7
8
9
OutputTag<Event> lateTag = new OutputTag<Event>("late"){};

SingleOutputStreamOperator<Event> result = stream.
.keyBy(...)
.window(...)
.sideOutputLateData(lateTag)
.process(...);

DataStream<Event> lateStream = result.getSideOutput(lateTag);

我们还可以指定 允许的延迟(allowed lateness) 的间隔,在这个间隔时间内,延迟的事件将会继续分配给窗口(同时状态会被保留),默认状态下,每个延迟事件都会导致窗口函数被再次调用(有时也称之为 late firing )。

默认情况下,允许的延迟为 0。换句话说,watermark 之后的元素将被丢弃(或发送到侧输出流)。

举例说明:

1
2
3
4
5
stream.
.keyBy(...)
.window(...)
.allowedLateness(Time.seconds(10))
.process(...);

当允许的延迟大于零时,只有那些超过最大无序边界以至于会被丢弃的事件才会被发送到侧输出流(如果已配置)。

深入了解窗口操作

Flink 的窗口 API 某些方面有一些奇怪的行为,可能和我们预期的行为不一致。 根据 Flink 用户邮件列表 和其他地方一些频繁被问起的问题, 以下是一些有关 Windows 的底层事实,这些信息可能会让您感到惊讶。

滑动窗口是通过复制来实现的

滑动窗口分配器可以创建许多窗口对象,并将每个事件复制到每个相关的窗口中。例如,如果您每隔 15 分钟就有 24 小时的滑动窗口,则每个事件将被复制到 4 * 24 = 96 个窗口中。

时间窗口会和时间对齐

仅仅因为我们使用的是一个小时的处理时间窗口并在 12:05 开始运行您的应用程序,并不意味着第一个窗口将在 1:05 关闭。第一个窗口将长 55 分钟,并在 1:00 关闭。

请注意,滑动窗口和滚动窗口分配器所采用的 offset 参数可用于改变窗口的对齐方式。有关详细的信息,请参见 滚动窗口滑动窗口

window 后面可以接 window

比如说:

1
2
3
4
5
6
stream
.keyBy(t -> t.key)
.window(<window assigner>)
.reduce(<reduce function>)
.windowAll(<same window assigner>)
.reduce(<same reduce function>)

可能我们会猜测以 Flink 的能力,想要做到这样看起来是可行的(前提是你使用的是 ReduceFunction 或 AggregateFunction ),但不是。

之所以可行,是因为时间窗口产生的事件是根据窗口结束时的时间分配时间戳的。例如,一个小时小时的窗口所产生的所有事件都将带有标记一个小时结束的时间戳。后面的窗口内的数据消费和前面的流产生的数据是一致的。

空的时间窗口不会输出结果

事件会触发窗口的创建。换句话说,如果在特定的窗口内没有事件,就不会有窗口,就不会有输出结果。

延迟时间会导致延迟聚合

会话窗口的实现是基于窗口的一个抽象能力,窗口可以 _聚合_。会话窗口中的每个数据在初始被消费时,都会被分配一个新的窗口,但是如果窗口之间的间隔足够小,多个窗口就会被聚合。延迟事件可以弥合两个先前分开的会话间隔,从而产生一个虽然有延迟但是更加准确地结果。

参考资料

Java 基础语法特性

注释

空白行,或者注释的内容,都会被 Java 编译器忽略掉。

Java 支持多种注释方式,下面的示例展示了各种注释的使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class HelloWorld {
/*
* JavaDoc 注释
*/
public static void main(String[] args) {
// 单行注释
/* 多行注释:
1. 注意点a
2. 注意点b
*/
System.out.println("Hello World");
}
}

基本数据类型

img

👉 扩展阅读:深入理解 Java 基本数据类型

变量和常量

Java 支持的变量类型有:

  • 局部变量 - 类方法中的变量。
  • 成员变量(也叫实例变量) - 类方法外的变量,不过没有 static 修饰。
  • 静态变量(也叫类变量) - 类方法外的变量,用 static 修饰。

特性对比:

局部变量 实例变量(也叫成员变量) 类变量(也叫静态变量)
局部变量声明在方法、构造方法或者语句块中。 实例变量声明在方法、构造方法和语句块之外。 类变量声明在方法、构造方法和语句块之外。并且以 static 修饰。
局部变量在方法、构造方法、或者语句块被执行的时候创建,当它们执行完成后,变量将会被销毁。 实例变量在对象创建的时候创建,在对象被销毁的时候销毁。 类变量在第一次被访问时创建,在程序结束时销毁。
局部变量没有默认值,所以必须经过初始化,才可以使用。 实例变量具有默认值。数值型变量的默认值是 0,布尔型变量的默认值是 false,引用类型变量的默认值是 null。变量的值可以在声明时指定,也可以在构造方法中指定。 类变量具有默认值。数值型变量的默认值是 0,布尔型变量的默认值是 false,引用类型变量的默认值是 null。变量的值可以在声明时指定,也可以在构造方法中指定。此外,静态变量还可以在静态语句块中初始化。
对于局部变量,如果是基本类型,会把值直接存储在栈;如果是引用类型,会把其对象存储在堆,而把这个对象的引用(指针)存储在栈。 实例变量存储在堆。 类变量存储在静态存储区。
访问修饰符不能用于局部变量。 访问修饰符可以用于实例变量。 访问修饰符可以用于类变量。
局部变量只在声明它的方法、构造方法或者语句块中可见。 实例变量对于类中的方法、构造方法或者语句块是可见的。一般情况下应该把实例变量设为私有。通过使用访问修饰符可以使实例变量对子类可见。 与实例变量具有相似的可见性。但为了对类的使用者可见,大多数静态变量声明为 public 类型。
实例变量可以直接通过变量名访问。但在静态方法以及其他类中,就应该使用完全限定名:ObejectReference.VariableName。 静态变量可以通过:ClassName.VariableName 的方式访问。
无论一个类创建了多少个对象,类只拥有类变量的一份拷贝。
类变量除了被声明为常量外很少使用。

变量修饰符

  • 访问级别修饰符
    • 如果变量是实例变量或类变量,可以添加访问级别修饰符(public/protected/private)
  • 静态修饰符
    • 如果变量是类变量,需要添加 static 修饰
  • final
    • 如果变量使用 final 修饰符,就表示这是一个常量,不能被修改。

数组

img

👉 扩展阅读:深入理解 Java 数组

枚举

img

👉 扩展阅读:深入理解 Java 枚举

操作符

Java 中支持的操作符类型如下:

img

👉 扩展阅读:Java 操作符

方法

img

👉 扩展阅读:深入理解 Java 方法

控制语句

img

👉 扩展阅读:Java 控制语句

异常

img

img

👉 扩展阅读:深入理解 Java 异常

泛型

img

👉 扩展阅读:深入理解 Java 泛型

反射

img

img

👉 扩展阅读:深入理解 Java 反射和动态代理

注解

img

img

img

img

👉 扩展阅读:深入理解 Java 注解

序列化

img

👉 扩展阅读:Java 序列化

Elasticsearch 优化

Elasticsearch 是当前流行的企业级搜索引擎,设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。作为一个开箱即用的产品,在生产环境上线之后,我们其实不一定能确保其的性能和稳定性。如何根据实际情况提高服务的性能,其实有很多技巧。这章我们分享从实战经验中总结出来的 elasticsearch 性能优化,主要从硬件配置优化、索引优化设置、查询方面优化、数据结构优化、集群架构优化等方面讲解。

硬件配置优化

升级硬件设备配置一直都是提高服务能力最快速有效的手段,在系统层面能够影响应用性能的一般包括三个因素:CPU、内存和 IO,可以从这三方面进行 ES 的性能优化工作。

CPU 配置

一般说来,CPU 繁忙的原因有以下几个:

  1. 线程中有无限空循环、无阻塞、正则匹配或者单纯的计算;
  2. 发生了频繁的 GC;
  3. 多线程的上下文切换;

大多数 Elasticsearch 部署往往对 CPU 要求不高。因此,相对其它资源,具体配置多少个(CPU)不是那么关键。你应该选择具有多个内核的现代处理器,常见的集群使用 2 到 8 个核的机器。如果你要在更快的 CPUs 和更多的核数之间选择,选择更多的核数更好。多个内核提供的额外并发远胜过稍微快一点点的时钟频率。

内存配置

如果有一种资源是最先被耗尽的,它可能是内存。排序和聚合都很耗内存,所以有足够的堆空间来应付它们是很重要的。即使堆空间是比较小的时候,也能为操作系统文件缓存提供额外的内存。因为 Lucene 使用的许多数据结构是基于磁盘的格式,Elasticsearch 利用操作系统缓存能产生很大效果。

64 GB 内存的机器是非常理想的,但是 32 GB 和 16 GB 机器也是很常见的。少于 8 GB 会适得其反(你最终需要很多很多的小机器),大于 64 GB 的机器也会有问题。

由于 ES 构建基于 lucene,而 lucene 设计强大之处在于 lucene 能够很好的利用操作系统内存来缓存索引数据,以提供快速的查询性能。lucene 的索引文件 segements 是存储在单文件中的,并且不可变,对于 OS 来说,能够很友好地将索引文件保持在 cache 中,以便快速访问;因此,我们很有必要将一半的物理内存留给 lucene;另一半的物理内存留给 ES(JVM heap)。

内存分配

当机器内存小于 64G 时,遵循通用的原则,50% 给 ES,50% 留给 lucene。

当机器内存大于 64G 时,遵循以下原则:

  • 如果主要的使用场景是全文检索,那么建议给 ES Heap 分配 4~32G 的内存即可;其它内存留给操作系统,供 lucene 使用(segments cache),以提供更快的查询性能。
  • 如果主要的使用场景是聚合或排序,并且大多数是 numerics,dates,geo_points 以及 not_analyzed 的字符类型,建议分配给 ES Heap 分配 4~32G 的内存即可,其它内存留给操作系统,供 lucene 使用,提供快速的基于文档的聚类、排序性能。
  • 如果使用场景是聚合或排序,并且都是基于 analyzed 字符数据,这时需要更多的 heap size,建议机器上运行多 ES 实例,每个实例保持不超过 50% 的 ES heap 设置(但不超过 32 G,堆内存设置 32 G 以下时,JVM 使用对象指标压缩技巧节省空间),50% 以上留给 lucene。

禁止 swap

禁止 swap,一旦允许内存与磁盘的交换,会引起致命的性能问题。可以通过在 elasticsearch.yml 中 bootstrap.memory_lock: true,以保持 JVM 锁定内存,保证 ES 的性能。

GC 设置

保持 GC 的现有设置,默认设置为:Concurrent-Mark and Sweep(CMS),别换成 G1 GC,因为目前 G1 还有很多 BUG。

保持线程池的现有设置,目前 ES 的线程池较 1.X 有了较多优化设置,保持现状即可;默认线程池大小等于 CPU 核心数。如果一定要改,按公式 ( ( CPU 核心数 * 3 ) / 2 ) + 1 设置;不能超过 CPU 核心数的 2 倍;但是不建议修改默认配置,否则会对 CPU 造成硬伤。

磁盘

硬盘对所有的集群都很重要,对大量写入的集群更是加倍重要(例如那些存储日志数据的)。硬盘是服务器上最慢的子系统,这意味着那些写入量很大的集群很容易让硬盘饱和,使得它成为集群的瓶颈。

在经济压力能承受的范围下,尽量使用固态硬盘(SSD)。固态硬盘相比于任何旋转介质(机械硬盘,磁带等),无论随机写还是顺序写,都会对 IO 有较大的提升。

如果你正在使用 SSDs,确保你的系统 I/O 调度程序是配置正确的。当你向硬盘写数据,I/O 调度程序决定何时把数据实际发送到硬盘。大多数默认 *nix 发行版下的调度程序都叫做 cfq(完全公平队列)。

调度程序分配时间片到每个进程。并且优化这些到硬盘的众多队列的传递。但它是为旋转介质优化的:机械硬盘的固有特性意味着它写入数据到基于物理布局的硬盘会更高效。

这对 SSD 来说是低效的,尽管这里没有涉及到机械硬盘。但是,deadline 或者 noop 应该被使用。deadline 调度程序基于写入等待时间进行优化,noop 只是一个简单的 FIFO 队列。

这个简单的更改可以带来显著的影响。仅仅是使用正确的调度程序,我们看到了 500 倍的写入能力提升。

如果你使用旋转介质(如机械硬盘),尝试获取尽可能快的硬盘(高性能服务器硬盘,15k RPM 驱动器)

使用 RAID0 是提高硬盘速度的有效途径,对机械硬盘和 SSD 来说都是如此。没有必要使用镜像或其它 RAID 变体,因为 Elasticsearch 在自身层面通过副本,已经提供了备份的功能,所以不需要利用磁盘的备份功能,同时如果使用磁盘备份功能的话,对写入速度有较大的影响。

最后,避免使用网络附加存储(NAS)。人们常声称他们的 NAS 解决方案比本地驱动器更快更可靠。除却这些声称,我们从没看到 NAS 能配得上它的大肆宣传。NAS 常常很慢,显露出更大的延时和更宽的平均延时方差,而且它是单点故障的。

索引优化设置

索引优化主要是在 Elasticsearch 的插入层面优化,Elasticsearch 本身索引速度其实还是蛮快的,具体数据,我们可以参考官方的 benchmark 数据。我们可以根据不同的需求,针对索引优化。

批量提交

当有大量数据提交的时候,建议采用批量提交(Bulk 操作);此外使用 bulk 请求时,每个请求不超过几十 M,因为太大会导致内存使用过大。

比如在做 ELK 过程中,Logstash indexer 提交数据到 Elasticsearch 中,batch size 就可以作为一个优化功能点。但是优化 size 大小需要根据文档大小和服务器性能而定。

像 Logstash 中提交文档大小超过 20MB,Logstash 会将一个批量请求切分为多个批量请求。

如果在提交过程中,遇到 EsRejectedExecutionException 异常的话,则说明集群的索引性能已经达到极限了。这种情况,要么提高服务器集群的资源,要么根据业务规则,减少数据收集速度,比如只收集 Warn、Error 级别以上的日志。

增加 Refresh 时间间隔

为了提高索引性能,Elasticsearch 在写入数据的时候,采用延迟写入的策略,即数据先写到内存中,当超过默认 1 秒(index.refresh_interval)会进行一次写入操作,就是将内存中 segment 数据刷新到磁盘中,此时我们才能将数据搜索出来,所以这就是为什么 Elasticsearch 提供的是近实时搜索功能,而不是实时搜索功能。

如果我们的系统对数据延迟要求不高的话,我们可以通过延长 refresh 时间间隔,可以有效地减少 segment 合并压力,提高索引速度。比如在做全链路跟踪的过程中,我们就将 index.refresh_interval 设置为 30s,减少 refresh 次数。再如,在进行全量索引时,可以将 refresh 次数临时关闭,即 index.refresh_interval 设置为-1,数据导入成功后再打开到正常模式,比如 30s。

在加载大量数据时候可以暂时不用 refresh 和 repliccas,index.refresh_interval 设置为-1,index.number_of_replicas 设置为 0。

修改 index_buffer_size 的设置

索引缓冲的设置可以控制多少内存分配给索引进程。这是一个全局配置,会应用于一个节点上所有不同的分片上。

1
2
indices.memory.index_buffer_size: 10%
indices.memory.min_index_buffer_size: 48mb

indices.memory.index_buffer_size 接受一个百分比或者一个表示字节大小的值。默认是 10%,意味着分配给节点的总内存的 10%用来做索引缓冲的大小。这个数值被分到不同的分片(shards)上。如果设置的是百分比,还可以设置 min_index_buffer_size (默认 48mb)和 max_index_buffer_size(默认没有上限)。

修改 translog 相关的设置

一是控制数据从内存到硬盘的操作频率,以减少硬盘 IO。可将 sync_interval 的时间设置大一些。默认为 5s。

1
index.translog.sync_interval: 5s

也可以控制 tranlog 数据块的大小,达到 threshold 大小时,才会 flush 到 lucene 索引文件。默认为 512m。

1
index.translog.flush_threshold_size: 512mb

注意 _id 字段的使用

_id 字段的使用,应尽可能避免自定义 _id,以避免针对 ID 的版本管理;建议使用 ES 的默认 ID 生成策略或使用数字类型 ID 做为主键。

注意 _all 字段及 _source 字段的使用

**_**all 字段及 _source 字段的使用,应该注意场景和需要,_all 字段包含了所有的索引字段,方便做全文检索,如果无此需求,可以禁用;_source 存储了原始的 document 内容,如果没有获取原始文档数据的需求,可通过设置 includes、excludes 属性来定义放入 _source 的字段。

合理的配置使用 index 属性

合理的配置使用 index 属性,analyzed 和 not_analyzed,根据业务需求来控制字段是否分词或不分词。只有 groupby 需求的字段,配置时就设置成 not_analyzed,以提高查询或聚类的效率。

减少副本数量

Elasticsearch 默认副本数量为 3 个,虽然这样会提高集群的可用性,增加搜索的并发数,但是同时也会影响写入索引的效率。

在索引过程中,需要把更新的文档发到副本节点上,等副本节点生效后在进行返回结束。使用 Elasticsearch 做业务搜索的时候,建议副本数目还是设置为 3 个,但是像内部 ELK 日志系统、分布式跟踪系统中,完全可以将副本数目设置为 1 个。

查询方面优化

Elasticsearch 作为业务搜索的近实时查询时,查询效率的优化显得尤为重要。

路由优化

当我们查询文档的时候,Elasticsearch 如何知道一个文档应该存放到哪个分片中呢?它其实是通过下面这个公式来计算出来的。

1
shard = hash(routing) % number_of_primary_shards

routing 默认值是文档的 id,也可以采用自定义值,比如用户 ID。

不带 routing 查询

在查询的时候因为不知道要查询的数据具体在哪个分片上,所以整个过程分为 2 个步骤:

  1. 分发:请求到达协调节点后,协调节点将查询请求分发到每个分片上。
  2. 聚合:协调节点搜集到每个分片上查询结果,再将查询的结果进行排序,之后给用户返回结果。

带 routing 查询

查询的时候,可以直接根据 routing 信息定位到某个分配查询,不需要查询所有的分配,经过协调节点排序。

向上面自定义的用户查询,如果 routing 设置为 userid 的话,就可以直接查询出数据来,效率提升很多。

Filter VS Query

尽可能使用过滤器上下文(Filter)替代查询上下文(Query)

  • Query:此文档与此查询子句的匹配程度如何?
  • Filter:此文档和查询子句匹配吗?

Elasticsearch 针对 Filter 查询只需要回答“是”或者“否”,不需要像 Query 查询一样计算相关性分数,同时 Filter 结果可以缓存。

深度翻页

在使用 Elasticsearch 过程中,应尽量避免大翻页的出现。

正常翻页查询都是从 from 开始 size 条数据,这样就需要在每个分片中查询打分排名在前面的 from+size 条数据。协同节点收集每个分配的前 from+size 条数据。协同节点一共会受到 N*(from+size) 条数据,然后进行排序,再将其中 from 到 from+size 条数据返回出去。如果 from 或者 size 很大的话,导致参加排序的数量会同步扩大很多,最终会导致 CPU 资源消耗增大。

可以通过使用 Elasticsearch scroll 和 scroll-scan 高效滚动的方式来解决这样的问题。

也可以结合实际业务特点,文档 id 大小如果和文档创建时间是一致有序的,可以以文档 id 作为分页的偏移量,并将其作为分页查询的一个条件。

脚本(script)合理使用

我们知道脚本使用主要有 3 种形式,内联动态编译方式、_script 索引库中存储和文件脚本存储的形式;一般脚本的使用场景是粗排,尽量用第二种方式先将脚本存储在 _script 索引库中,起到提前编译,然后通过引用脚本 id,并结合 params 参数使用,即可以达到模型(逻辑)和数据进行了分离,同时又便于脚本模块的扩展与维护。具体 ES 脚本的深入内容请参考 Elasticsearch 脚本模块的详解

数据结构优化

基于 Elasticsearch 的使用场景,文档数据结构尽量和使用场景进行结合,去掉没用及不合理的数据。

尽量减少不需要的字段

如果 Elasticsearch 用于业务搜索服务,一些不需要用于搜索的字段最好不存到 ES 中,这样即节省空间,同时在相同的数据量下,也能提高搜索性能。

避免使用动态值作字段,动态递增的 mapping,会导致集群崩溃;同样,也需要控制字段的数量,业务中不使用的字段,就不要索引。控制索引的字段数量、mapping 深度、索引字段的类型,对于 ES 的性能优化是重中之重。

以下是 ES 关于字段数、mapping 深度的一些默认设置:

1
2
3
index.mapping.nested_objects.limit: 10000
index.mapping.total_fields.limit: 1000
index.mapping.depth.limit: 20

Nested Object vs Parent/Child

尽量避免使用 nested 或 parent/child 的字段,能不用就不用;nested query 慢,parent/child query 更慢,比 nested query 慢上百倍;因此能在 mapping 设计阶段搞定的(大宽表设计或采用比较 smart 的数据结构),就不要用父子关系的 mapping。

如果一定要使用 nested fields,保证 nested fields 字段不能过多,目前 ES 默认限制是 50。因为针对 1 个 document,每一个 nested field,都会生成一个独立的 document,这将使 doc 数量剧增,影响查询效率,尤其是 JOIN 的效率。

1
index.mapping.nested_fields.limit: 50
对比 Nested Object Parent/Child
优点 文档存储在一起,因此读取性高 父子文档可以独立更新,互不影响
缺点 更新父文档或子文档时需要更新整个文档 为了维护 join 关系,需要占用部分内存,读取性能较差
场景 子文档偶尔更新,查询频繁 子文档更新频繁

选择静态映射,非必需时,禁止动态映射

尽量避免使用动态映射,这样有可能会导致集群崩溃,此外,动态映射有可能会带来不可控制的数据类型,进而有可能导致在查询端出现相关异常,影响业务。

此外,Elasticsearch 作为搜索引擎时,主要承载 query 的匹配和排序的功能,那数据的存储类型基于这两种功能的用途分为两类,一是需要匹配的字段,用来建立倒排索引对 query 匹配用,另一类字段是用做粗排用到的特征字段,如 ctr、点击数、评论数等等。

集群架构设计

合理的部署 Elasticsearch 有助于提高服务的整体可用性。

主节点、数据节点和协调节点分离

Elasticsearch 集群在架构拓朴时,采用主节点、数据节点和负载均衡节点分离的架构,在 5.x 版本以后,又可将数据节点再细分为“Hot-Warm”的架构模式。

Elasticsearch 的配置文件中有 2 个参数,node.master 和 node.data。这两个参数搭配使用时,能够帮助提供服务器性能。

主(master)节点

配置 node.master:truenode.data:false,该 node 服务器只作为一个主节点,但不存储任何索引数据。我们推荐每个集群运行 3 个专用的 master 节点来提供最好的弹性。使用时,你还需要将 discovery.zen.minimum_master_nodes setting 参数设置为 2,以免出现脑裂(split-brain)的情况。用 3 个专用的 master 节点,专门负责处理集群的管理以及加强状态的整体稳定性。因为这 3 个 master 节点不包含数据也不会实际参与搜索以及索引操作,在 JVM 上它们不用做相同的事,例如繁重的索引或者耗时,资源耗费很大的搜索。因此不太可能会因为垃圾回收而导致停顿。因此,master 节点的 CPU,内存以及磁盘配置可以比 data 节点少很多的。

数据(data)节点

配置 node.master:falsenode.data:true,该 node 服务器只作为一个数据节点,只用于存储索引数据,使该 node 服务器功能单一,只用于数据存储和数据查询,降低其资源消耗率。

在 Elasticsearch 5.x 版本之后,data 节点又可再细分为“Hot-Warm”架构,即分为热节点(hot node)和暖节点(warm node)。

hot 节点:

hot 节点主要是索引节点(写节点),同时会保存近期的一些频繁被查询的索引。由于进行索引非常耗费 CPU 和 IO,即属于 IO 和 CPU 密集型操作,建议使用 SSD 的磁盘类型,保持良好的写性能;我们推荐部署最小化的 3 个 hot 节点来保证高可用性。根据近期需要收集以及查询的数据量,可以增加服务器数量来获得想要的性能。

将节点设置为 hot 类型需要 elasticsearch.yml 如下配置:

1
node.attr.box_type: hot

如果是针对指定的 index 操作,可以通过 settings 设置 index.routing.allocation.require.box_type: hot 将索引写入 hot 节点。

warm 节点:

这种类型的节点是为了处理大量的,而且不经常访问的只读索引而设计的。由于这些索引是只读的,warm 节点倾向于挂载大量磁盘(普通磁盘)来替代 SSD。内存、CPU 的配置跟 hot 节点保持一致即可;节点数量一般也是大于等于 3 个。

将节点设置为 warm 类型需要 elasticsearch.yml 如下配置:

1
node.attr.box_type: warm

同时,也可以在 elasticsearch.yml 中设置 index.codec:best_compression 保证 warm 节点的压缩配置。

当索引不再被频繁查询时,可通过 index.routing.allocation.require.box_type:warm,将索引标记为 warm,从而保证索引不写入 hot 节点,以便将 SSD 磁盘资源用在刀刃上。一旦设置这个属性,ES 会自动将索引合并到 warm 节点。

协调(coordinating)节点

协调节点用于做分布式里的协调,将各分片或节点返回的数据整合后返回。该节点不会被选作主节点,也不会存储任何索引数据。该服务器主要用于查询负载均衡。在查询的时候,通常会涉及到从多个 node 服务器上查询数据,并将请求分发到多个指定的 node 服务器,并对各个 node 服务器返回的结果进行一个汇总处理,最终返回给客户端。在 ES 集群中,所有的节点都有可能是协调节点,但是,可以通过设置 node.masternode.datanode.ingest 都为 false 来设置专门的协调节点。需要较好的 CPU 和较高的内存。

  • node.master:false 和 node.data:true,该 node 服务器只作为一个数据节点,只用于存储索引数据,使该 node 服务器功能单一,只用于数据存储和数据查询,降低其资源消耗率。
  • node.master:true 和 node.data:false,该 node 服务器只作为一个主节点,但不存储任何索引数据,该 node 服务器将使用自身空闲的资源,来协调各种创建索引请求或者查询请求,并将这些请求合理分发到相关的 node 服务器上。
  • node.master:false 和 node.data:false,该 node 服务器即不会被选作主节点,也不会存储任何索引数据。该服务器主要用于查询负载均衡。在查询的时候,通常会涉及到从多个 node 服务器上查询数据,并将请求分发到多个指定的 node 服务器,并对各个 node 服务器返回的结果进行一个汇总处理,最终返回给客户端。

关闭 data 节点服务器中的 http 功能

针对 Elasticsearch 集群中的所有数据节点,不用开启 http 服务。将其中的配置参数这样设置,http.enabled:false,同时也不要安装 head, bigdesk, marvel 等监控插件,这样保证 data 节点服务器只需处理创建/更新/删除/查询索引数据等操作。

http 功能可以在非数据节点服务器上开启,上述相关的监控插件也安装到这些服务器上,用于监控 Elasticsearch 集群状态等数据信息。这样做一来出于数据安全考虑,二来出于服务性能考虑。

一台服务器上最好只部署一个 node

一台物理服务器上可以启动多个 node 服务器节点(通过设置不同的启动 port),但一台服务器上的 CPU、内存、硬盘等资源毕竟有限,从服务器性能考虑,不建议一台服务器上启动多个 node 节点。

集群分片设置

ES 一旦创建好索引后,就无法调整分片的设置,而在 ES 中,一个分片实际上对应一个 lucene 索引,而 lucene 索引的读写会占用很多的系统资源,因此,分片数不能设置过大;所以,在创建索引时,合理配置分片数是非常重要的。一般来说,我们遵循一些原则:

  1. 控制每个分片占用的硬盘容量不超过 ES 的最大 JVM 的堆空间设置(一般设置不超过 32 G,参考上面的 JVM 内存设置原则),因此,如果索引的总容量在 500 G 左右,那分片大小在 16 个左右即可;当然,最好同时考虑原则 2。
  2. 考虑一下 node 数量,一般一个节点有时候就是一台物理机,如果分片数过多,大大超过了节点数,很可能会导致一个节点上存在多个分片,一旦该节点故障,即使保持了 1 个以上的副本,同样有可能会导致数据丢失,集群无法恢复。所以,一般都设置分片数不超过节点数的 3 倍

参考资料

Elasticsearch 聚合

::: info 概述

在数据库中,聚合是指将数据进行分组统计,得到一个汇总的结果。例如,计算总和、平均值、最大值或最小值等操作。

Elasticsearch 将聚合分为三类:

类型 说明
Metric(指标聚合) 根据字段值进行统计计算
Bucket(桶聚合) 根据字段值、范围或其他条件进行分组
Pipeline(管道聚合) 对其他聚合输出的结果进行再次聚合

本文将逐一介绍这几种聚合方式的用法和特性。

:::

阅读全文 »

Elasticsearch 搜索(下)

Elasticsearch 提供了基于 JSON 的 DSL(Domain Specific Language)来定义查询。

可以将 DSL 视为查询的 AST(抽象语法树),由两种类型的子句组成:

  • 叶子查询 - 在指定字段中查找特定值,例如:matchtermrange
  • 组合查询 - 组合其他叶子查询或组合查询,用于以逻辑方式组合多个查询(例如: booldis_max),或更改它们的行为(例如:constant_score)。

查询子句的行为会有所不同,具体取决于它们是在 query content 还是 filter context 中使用。

  • query context - 有相关性计算,采用相关性算法,计算文档与查询关键词之间的相关度,并根据评分(_score)大小排序。
  • filter context - 无相关性计算,可以利用缓存,性能更好。

从用法角度,Elasticsearch 查询分类大致分为:

全文查询

Full Text Search(全文搜索) 支持在非结构化文本数据中搜索与查询关键字最匹配的数据。

在 ES 中,支持以下全文搜索方式:

  • intervals - 根据匹配词的顺序和近似度返回文档。
  • match - 匹配查询,用于执行全文搜索的标准查询,包括模糊匹配和短语或邻近查询。
  • match_bool_prefix - 对检索文本分词,并根据这些分词构造一个布尔查询。除了最后一个分词之外的每个分词都进行 term 查询。最后一个分词用于 prefix 查询;其他分词都进行 term 查询。
  • match_phrase - 短语匹配查询,短语匹配会将检索内容分词,这些词语必须全部出现在被检索内容中,并且顺序必须一致,默认情况下这些词都必须连续。
  • match_phrase_prefix - 与 match_phrase 查询类似,但对最后一个单词执行通配符搜索。
  • multi_match 支持多字段 match 查询
  • combined_fields - 匹配多个字段,就像它们已索引到一个组合字段中一样。
  • query_string - 支持紧凑的 Lucene query string(查询字符串)语法,允许指定 AND|OR|NOT 条件和单个查询字符串中的多字段搜索。仅适用于专家用户。
  • simple_query_string - 更简单、更健壮的 query_string 语法版本,适合直接向用户公开。

match(匹配查询)

match 查询用于搜索单个字段。首先,会针对检索文本进行解析(分词),分词后的任何一个词项只要被匹配,文档就会被搜到。默认情况下,相当于对分词后词项进行 or 匹配操作。

:::details match 示例

1
2
3
4
5
6
7
8
9
10
GET kibana_sample_data_ecommerce/_search
{
"query": {
"match": {
"customer_full_name": {
"query": "George Hubbard"
}
}
}
}

响应结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
"took": 891, // 查询使用的毫秒数
"timed_out": false, // 是否有分片超时,也就是说是否只返回了部分结果
"_shards": {
// 总分片数、响应成功/失败数量信息
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
// 搜索结果
"total": {
// 匹配的总记录数
"value": 82,
"relation": "eq"
},
"max_score": 10.018585, // 所有匹配文档中的最大相关性分值
"hits": [
// 匹配文档列表
{
"_index": "kibana_sample_data_ecommerce", // 文档所属索引
"_type": "_doc", // 文档所属 type
"_id": "2ZUtBX4BU8KXl1YJRBrH", // 文档的唯一性标识
"_score": 10.018585, // 文档的相关性分值
"_source": {
// 文档的原始 JSON 对象
// 略
}
}
// 省略多条记录
]
}
}

:::

可以通过组合 <field>query 参数来简化匹配查询语法。下面是一个简单的示例。

:::details match 简写示例

下面的查询等价于前面的匹配查询示例:

1
2
3
4
5
6
7
8
GET kibana_sample_data_ecommerce/_search
{
"query": {
"match": {
"customer_full_name": "George Hubbard"
}
}
}

:::

在进行全文本字段检索的时候, match API 提供了 operator 和 minimum_should_match 参数:

  • operator 参数值可以为 “or” 或者 “and” 来控制检索词项间的关系,默认值为 “or”。所以上面例子中,只要书名中含有 “linux” 或者 “architecture” 的文档都可以匹配上。
  • minimum_should_match 可以指定词项的最少匹配个数,其值可以指定为某个具体的数字,但因为我们无法预估检索内容的词项数量,一般将其设置为一个百分比。

:::details minimum_should_match 示例

至少有 50% 的词项匹配的文档才会被返回:

1
2
3
4
5
6
7
8
9
10
11
12
GET kibana_sample_data_ecommerce/_search
{
"query": {
"match": {
"category": {
"query": "Women Clothing Accessories",
"operator": "or",
"minimum_should_match": "50%"
}
}
}
}

:::

match 查询提供了 fuzziness 参数,fuzziness 允许基于被查询字段的类型进行模糊匹配。请参阅 Fuzziness 的配置。

在这种情况下可以设置 prefix_lengthmax_expansions 来控制模糊匹配。如果设置了模糊选项,查询将使用 top_terms_blended_freqs_${max_expansions} 作为其重写方法,fuzzy_rewrite 参数允许控制查询将如何被重写。

默认情况下允许模糊倒转 (abba),但可以通过将 fuzzy_transpositions 设置为 false 来禁用。

:::details fuzziness 示例

1
2
3
4
5
6
7
8
9
10
11
GET kibana_sample_data_ecommerce/_search
{
"query": {
"match": {
"customer_first_name": {
"query": "Gearge",
"fuzziness": "AUTO"
}
}
}
}

:::

如果使用的分析器像 stop 过滤器一样删除查询中的所有标记,则默认行为是不匹配任何文档。可以使用 zero_terms_query 选项来改变默认行为,它接受 none(默认)和 all (相当于 match_all 查询)。

:::details zero_terms_query 示例

1
2
3
4
5
6
7
8
9
10
11
12
GET kibana_sample_data_logs/_search
{
"query": {
"match": {
"message": {
"query": "Mozilla Linux",
"operator": "and",
"zero_terms_query": "all"
}
}
}
}

:::

match_phrase(短语匹配查询)

match_phrase 查询首先会对检索内容进行分词,分词器可以自定义,同时文档还要满足以下两个条件才会被搜索到:

  1. 分词后所有词项都要出现在该字段中(相当于 and 操作)
  2. 字段中的词项顺序要一致

:::details match_phrase 示例

1
2
3
4
5
6
7
8
9
10
GET kibana_sample_data_logs/_search
{
"query": {
"match_phrase": {
"agent": {
"query": "Linux x86_64"
}
}
}
}

:::

match_phrase_prefix(短语前缀匹配查询)

查询和 match_phrase 查询类似,只不过 match_phrase_prefix 最后一个 term 会被作为前缀匹配。

:::details match_phrase_prefix 示例

匹配以 https://www.elastic.co/download 开头的短语

1
2
3
4
5
6
7
8
9
10
GET kibana_sample_data_logs/_search
{
"query": {
"match_phrase_prefix": {
"url": {
"query": "https://www.elastic.co/download"
}
}
}
}

:::

multi_match(多字段匹配查询)

multi_match 查询允许对多个字段执行相同的匹配查询。

multi_match 查询在内部执行的方式取决于 type 参数,可以设置为:

  • best_fields -(默认)将所有与查询匹配的文档作为结果返回,但是只使用评分最高的字段的评分来作为评分结果返回。
  • most_fields - 将所有与查询匹配的文档作为结果返回,并将所有匹配字段的评分累加起来作为评分结果。
  • cross_fields - 将具有相同分析器的字段视为一个大字段。在每个字段中查找每个单词。例如当需要查询英文人名的时候,可以将 first_name 和 last_name 两个字段组合起来当作 full_name 来查询。
  • phrase - 对每个字段运行 match_phrase 查询,并将最佳匹配字段的评分作为结果返回。
  • phrase_prefix - 对每个字段运行 match_phrase_prefix 查询,并将最佳匹配字段的评分作为结果返回。
  • bool_prefix - 在每个字段上创建一个 match_bool_prefix 查询,并且合并每个字段的评分作为评分结果。

:::details multi_match 示例

1
2
3
4
5
6
7
8
9
10
11
12
GET kibana_sample_data_ecommerce/_search
{
"query": {
"multi_match": {
"query": 34.98,
"fields": [
"taxful_total_price",
"taxless_total_*" # 可以使用通配符
]
}
}
}

:::

词项级别查询

Term(词项)是表达语意的最小单位。搜索和利用统计语言模型进行自然语言处理都需要处理 Term。

全文查询在执行查询之前会分析查询字符串。与全文查询不同,词项级别查询不会分词,而是将输入作为一个整体,在倒排索引中查找准确的词项。并且使用相关度计算公式为每个包含该词项的文档进行相关度计算。一言以概之:词项查询是对词项进行精确匹配。词项查询通常用于结构化数据,如数字、日期和枚举类型。

词项查询有以下类型:

  • exists - 返回在指定字段上有值的文档。
  • fuzzy - 模糊查询,返回包含与搜索词相似的词的文档。
  • ids - 根据 ID 返回文档。此查询使用存储在 _id 字段中的文档 ID。
  • prefix - 前缀查询,用于查询某个字段中包含指定前缀的文档。
  • range - 范围查询,用于匹配在某一范围内的数值型、日期类型或者字符串型字段的文档。
  • regexp - 正则匹配查询,返回与正则表达式相匹配的词项所属的文档。
  • term - 用来查找指定字段中包含给定单词的文档。
  • terms - 与 term 相似,但可以搜索多个值。
  • terms set - 与 term 相似,但可以定义返回文档所需的匹配词数。
  • wildcard - 通配符查询,返回与通配符模式匹配的文档。

exists(字段不为空查询)

exists 返回在指定字段上有值的文档。

由于多种原因,文档字段可能不存在索引值:

  • JSON 中的字段为 null[]
  • 该字段在 mapping 中配置了 "index" : false
  • 字段值的长度超过了 mapping 中的 ignore_above 设置
  • 字段值格式错误,并且在 mapping 中定义了 ignore_malformed

:::details exists 示例

1
2
3
4
5
6
7
8
GET kibana_sample_data_ecommerce/_search
{
"query": {
"exists": {
"field": "email"
}
}
}

:::

fuzzy(模糊查询)

fuzzy 返回包含与搜索词相似的词的文档。ES 使用 Levenshtein edit distance(Levenshtein 编辑距离) 测量相似度或模糊度。

编辑距离是将一个术语转换为另一个术语所需的单个字符更改的数量。这些变化可能包括:

  • 改变一个字符:(box -> fox)
  • 删除一个字符:(black -> lack)
  • 插入一个字符:(sic -> sick
  • 反转两个相邻字符:(act → cat)

为了找到相似的词条,fuzzy query 会在指定的编辑距离内创建搜索词条的所有可能变体或扩展集。然后返回完全匹配任意扩展的文档。

注意:如果配置了 search.allow_expensive_queries ,则 fuzzy query 不能执行。

:::details fuzzy 示例

1
2
3
4
5
6
7
8
9
10
GET kibana_sample_data_ecommerce/_search
{
"query": {
"fuzzy": {
"customer_full_name": {
"value": "mary"
}
}
}
}

:::

prefix(前缀查询)

prefix 用于查询某个字段中包含指定前缀的文档。

比如查询 user.id 中含有以 ki 为前缀的关键词的文档,那么含有 kindkid 等所有以 ki 开头关键词的文档都会被匹配。

:::details prefix 示例

1
2
3
4
5
6
7
8
9
10
GET kibana_sample_data_ecommerce/_search
{
"query": {
"prefix": {
"customer_full_name": {
"value": "mar"
}
}
}
}

:::

range(范围查询)

range 用于匹配在某一范围内的数值型、日期类型或者字符串型字段的文档。比如搜索哪些书籍的价格在 50 到 100 之间、哪些书籍的出版时间在 2015 年到 2019 年之间。使用 range 查询只能查询一个字段,不能作用在多个字段上

range 查询支持的参数有以下几种:

  • gt - 大于
  • gte - 大于等于
  • lt - 小于
  • lte - 小于等于
  • format - 如果字段是 Date 类型,可以设置日期格式化
  • time_zone - 时区
  • relation - 指示范围查询如何匹配范围字段的值。
    • INTERSECTS (Default) - 匹配与查询字段值范围相交的文档。
    • CONTAINS - 匹配完全包含查询字段值的文档。
    • WITHIN - 匹配具有完全在查询范围内的范围字段值的文档。

:::details range 示例

数值范围查询示例:

1
2
3
4
5
6
7
8
9
10
11
GET kibana_sample_data_ecommerce/_search
{
"query": {
"range": {
"taxful_total_price": {
"gt": 10,
"lte": 50
}
}
}
}

日期范围查询示例:

1
2
3
4
5
6
7
8
9
10
11
12
GET kibana_sample_data_ecommerce/_search
{
"query": {
"range": {
"order_date": {
"time_zone": "+00:00",
"gte": "2018-01-01T00:00:00",
"lte": "now"
}
}
}
}

:::

regexp(正则匹配查询)

regexp 返回与正则表达式相匹配的词项所属的文档。正则表达式 是一种使用占位符字符匹配数据模式的方法,称为运算符。

注意:如果配置了 search.allow_expensive_queries ,则 regexp query 会被禁用。

:::details regexp 示例

1
2
3
4
5
6
7
8
GET kibana_sample_data_ecommerce/_search
{
"query": {
"regexp": {
"email": ".*@.*-family.zzz"
}
}
}

:::

term(词项匹配查询)

term 用来查找指定字段中包含给定单词的文档,term 查询不被解析,只有查询词和文档中的词精确匹配才会被搜索到,应用场景为查询人名、地名等需要精准匹配的需求。

注意:应避免 term 查询对 text 字段使用查询。默认情况下,Elasticsearch 针对 text 字段的值进行解析分词,这会使查找 text 字段值的精确匹配变得困难。要搜索 text 字段值,需改用 match 查询。

:::details term 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 1. 创建一个索引
DELETE my-index-000001
PUT my-index-000001
{
"mappings": {
"properties": {
"full_text": { "type": "text" }
}
}
}

# 2. 使用 "Quick Brown Foxes!" 关键字查 "full_text" 字段
PUT my-index-000001/_doc/1
{
"full_text": "Quick Brown Foxes!"
}

# 3. 使用 term 查询
GET my-index-000001/_search?pretty
{
"query": {
"term": {
"full_text": "Quick Brown Foxes!"
}
}
}
# 因为 full_text 字段不再包含确切的 Term —— "Quick Brown Foxes!",所以 term query 搜索不到任何结果

# 4. 使用 match 查询
GET my-index-000001/_search?pretty
{
"query": {
"match": {
"full_text": "Quick Brown Foxes!"
}
}
}

DELETE my-index-000001

:warning: 注意:应避免 term 查询对 text 字段使用查询。

默认情况下,Elasticsearch 针对 text 字段的值进行解析分词,这会使查找 text 字段值的精确匹配变得困难。

要搜索 text 字段值,需改用 match 查询。

:::

terms(多词项匹配查询)

termsterm 相同,但可以搜索多个值。

terms query 查询参数:

  • **index**:索引名
  • **id**:文档 ID
  • **path**:要从中获取字段值的字段的名称,即搜索关键字
  • **routing**(选填):要从中获取 term 值的文档的自定义路由值。如果在索引文档时提供了自定义路由值,则此参数是必需的。

:::details terms 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 1. 创建一个索引
DELETE my-index-000001
PUT my-index-000001
{
"mappings": {
"properties": {
"color": { "type": "keyword" }
}
}
}

# 2. 写入一个文档
PUT my-index-000001/_doc/1
{
"color": [
"blue",
"green"
]
}

# 3. 写入另一个文档
PUT my-index-000001/_doc/2
{
"color": "blue"
}

# 3. 使用 terms query
GET my-index-000001/_search?pretty
{
"query": {
"terms": {
"color": {
"index": "my-index-000001",
"id": "2",
"path": "color"
}
}
}
}

DELETE my-index-000001

:::

wildcard(通配符查询)

wildcard 即通配符查询,返回与通配符模式匹配的文档。

? 用来匹配一个任意字符,* 用来匹配零个或者多个字符。

注意:如果配置了 search.allow_expensive_queries ,则 wildcard query 会被禁用。

:::details wildcard 示例

示例:以下搜索返回 user.id 字段包含以 ki 开头并以 y 结尾的术语的文档。这些匹配项可以包括 kiykitykimchy

1
2
3
4
5
6
7
8
9
10
11
12
GET kibana_sample_data_ecommerce/_search
{
"query": {
"wildcard": {
"email": {
"value": "*@underwood-family.zzz",
"boost": 1,
"rewrite": "constant_score"
}
}
}
}

:::

复合查询

复合查询就是把一些简单查询组合在一起实现更复杂的查询需求,除此之外,复合查询还可以控制另外一个查询的行为。

复合查询有以下类型:

  • bool - 布尔查询,可以组合多个过滤语句来过滤文档。
  • boosting - 提供调整相关性打分的能力,在 positive 块中指定匹配文档的语句,同时降低在 negative 块中也匹配的文档的得分。
  • constant_score - 使用 constant_score 可以将 query 转化为 filter,filter 可以忽略相关性算分的环节,并且 filter 可以有效利用缓存,从而提高查询的性能。
  • dis_max - 返回匹配了一个或者多个查询语句的文档,但只将最佳匹配的评分作为相关性算分返回。
  • function_score - 支持使用函数来修改查询返回的分数。

bool (布尔查询)

bool 查询可以把任意多个简单查询组合在一起,使用 mustshouldmust_notfilter 选项来表示简单查询之间的逻辑,每个选项都可以出现 0 次到多次,它们的含义如下:

  • must - 文档必须匹配 must 选项下的查询条件,相当于逻辑运算的 AND,且参与文档相关度的评分。
  • should - 文档可以匹配 should 选项下的查询条件也可以不匹配,相当于逻辑运算的 OR,且参与文档相关度的评分。
  • must_not - 与 must 相反,匹配该选项下的查询条件的文档不会被返回;需要注意的是,must_not 语句不会影响评分,它的作用只是将不相关的文档排除
  • filter - 和 must 一样,匹配 filter 选项下的查询条件的文档才会被返回,但是 filter 不评分,只起到过滤功能,与 must_not 相反

:::details bool 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
GET kibana_sample_data_ecommerce/_search
{
"query": {
"bool": {
"filter": {
"term": {
"type": "order"
}
},
"must_not": {
"range": {
"taxful_total_price": {
"gte": 30
}
}
},
"should": [
{
"match": {
"day_of_week": "Sunday"
}
},
{
"match": {
"category": "Clothing"
}
}
],
"minimum_should_match": 1
}
}
}

:::

boosting

boosting 提供了调整相关性打分的能力。

boosting 查询包括 positivenegativenegative_boost 三个部分。positive 中的查询评分保持不变;negative 中的查询会降低文档评分;相关性算分降低的程度将由 negative_boost 参数决定,其取值范围为:[0.0, 1.0]

:::details boosting 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
GET kibana_sample_data_ecommerce/_search
{
"query": {
"boosting": {
"positive": {
"term": {
"day_of_week": "Monday"
}
},
"negative": {
"range": {
"taxful_total_price": {
"gte": "30"
}
}
},
"negative_boost": 0.2
}
}
}

:::

constant_score

使用 constant_score 可以将 query 转化为 filter,可以忽略相关性算分的环节,并且 filter 可以有效利用缓存,从而提高查询的性能。

:::details constant_score 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
GET kibana_sample_data_ecommerce/_search
{
"query": {
"constant_score": {
"filter": {
"term": {
"day_of_week": "Monday"
}
},
"boost": 1.2
}
}
}

:::

dis_max

dis_max 查询与 bool 查询有一定联系也有一定区别,dis_max 查询支持多并发查询,可返回与任意查询条件子句匹配的任何文档类型。与 bool 查询可以将所有匹配查询的分数相结合使用的方式不同,dis_max 查询只使用最佳匹配查询条件的分数。

:::details dis_max 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
GET kibana_sample_data_ecommerce/_search
{
"query": {
"dis_max": {
"queries": [
{
"term": {
"currency": "EUR"
}
},
{
"term": {
"day_of_week": "Sunday"
}
}
],
"tie_breaker": 0.7
}
}
}

:::

function_score

function_score 查询可以修改查询的文档得分,这个查询在有些情况下非常有用,比如通过评分函数计算文档得分代价较高,可以改用过滤器加自定义评分函数的方式来取代传统的评分方式。

使用 function_score 查询,用户需要定义一个查询和一至多个评分函数,评分函数会对查询到的每个文档分别计算得分。

function_score 查询提供了以下几种算分函数:

  • script_score - 利用自定义脚本完全控制算分逻辑。
  • weight - 为每一个文档设置一个简单且不会被规范化的权重。
  • random_score - 为每个用户提供一个不同的随机算分,对结果进行排序。
  • field_value_factor - 使用文档字段的值来影响算分,例如将好评数量这个字段作为考虑因数。
  • decay functions - 衰减函数,以某个字段的值为标准,距离指定值越近,算分就越高。例如我想让书本价格越接近 10 元,算分越高排序越靠前。

:::details function_score 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
GET kibana_sample_data_ecommerce/_search
{
"query": {
"function_score": {
"query": { "match_all": {} },
"boost": "5",
"functions": [
{
"filter": { "match": { "day_of_week": "Sunday" } },
"random_score": {},
"weight": 23
},
{
"filter": { "match": { "day_of_week": "Monday" } },
"weight": 42
}
],
"max_boost": 42,
"score_mode": "max",
"boost_mode": "multiply",
"min_score": 42
}
}
}

:::

推荐搜索

ES 通过 Suggester 提供了推荐搜索能力,可以用于文本纠错,文本自动补全等场景。

根据使用场景的不同,ES 提供了以下 4 种 Suggester:

  • Term Suggester - 基于词项的纠错补全。
  • Phrase Suggester - 基于短语的纠错补全。
  • Completion Suggester - 自动补全单词,输入词语的前半部分,自动补全单词。
  • Context Suggester - 基于上下文的补全提示,可以实现上下文感知推荐。

Term Suggester

Term Suggester 提供了基于单词的纠错、补全功能,其工作原理是基于编辑距离(edit distance)来运作的,编辑距离的核心思想是一个词需要改变多少个字符就可以和另一个词一致。所以如果一个词转化为原词所需要改动的字符数越少,它越有可能是最佳匹配。

:::details term suggester 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
GET kibana_sample_data_ecommerce/_search
{
"query": {
"match": {
"day_of_week": "Sund"
}
},
"suggest": {
"my_suggest": {
"text": "Sund",
"term": {
"suggest_mode": "missing",
"field": "day_of_week"
}
}
}
}

响应结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
// 略
},
"suggest": {
"my_suggest": [
{
"text": "Sund",
"offset": 0,
"length": 4,
"options": [
{
"text": "Sunday",
"score": 0.5,
"freq": 614
}
]
}
]
}
}

:::

Term Suggester API 有很多参数,比较常用的有以下几个:

  • text - 指定了需要产生建议的文本,一般是用户的输入内容。
  • field - 指定从文档的哪个字段中获取建议。
  • suggest_mode - 设置建议的模式。其值有以下几个选项:
    • missing - 如果索引中存在就不进行建议,默认的选项。
    • popular - 推荐出现频率更高的词。
    • always - 不管是否存在,都进行建议。
  • analyzer - 指定分词器来对输入文本进行分词,默认与 field 指定的字段设置的分词器一致。
  • size - 为每个单词提供的最大建议数量。
  • sort - 建议结果排序的方式,有以下两个选项 -
    • score - 先按相似性得分排序,然后按文档频率排序,最后按词项本身(字母顺序的等)排序。
    • frequency - 先按文档频率排序,然后按相似性得分排序,最后按词项本身排序。

Phrase Suggester

Phrase Suggester 在 Term Suggester 的基础上增加了一些额外的逻辑,因为是短语形式的建议,所以会考量多个 term 间的关系,比如相邻的程度、词频等

:::details phrase suggester 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
GET kibana_sample_data_logs/_search
{
"suggest": {
"text": "Firefix",
"simple_phrase": {
"phrase": {
"field": "agent",
"direct_generator": [ {
"field": "agent",
"suggest_mode": "always"
} ],
"highlight": {
"pre_tag": "<em>",
"post_tag": "</em>"
}
}
}
}
}

响应结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
"took" : 2,
"timed_out" : false,
"_shards" : // 略
"hits" : // 略
"suggest" : {
"simple_phrase" : [
{
"text" : "Firefix",
"offset" : 0,
"length" : 7,
"options" : [
{
"text" : "firefox",
"highlighted" : "<em>firefox</em>",
"score" : 0.2000096
}
]
}
]
}
}

:::

Phrase Suggester 可用的参数也是比较多的,下面介绍几个用得比较多的参数选项 -

  • max_error - 指定最多可以拼写错误的词语的个数。
  • confidence - 其作用是用来控制返回结果条数的。如果用户输入的数据(短语)得分为 N,那么返回结果的得分需要大于 N * confidenceconfidence 默认值为 1.0。
  • highlight - 高亮被修改后的词语。

Completion Suggester

Completion Suggester 提供了自动补全的功能

Completion Suggester 在实现的时候会将 analyze(将文本分词,并且去除没用的词语,例如 is、at 这样的词语) 后的数据进行编码,构建为 FST 并且和索引存放在一起。FST(**finite-state transducer**)是一种高效的前缀查询索引。由于 FST 天生为前缀查询而生,所以其非常适合实现自动补全的功能。ES 会将整个 FST 加载到内存中,所以在使用 FST 进行前缀查询的时候效率是非常高效的。

在使用 Completion Suggester 前需要定义 Mapping,对应的字段需要使用 “completion” type。

:::details completion suggester 示例

构造用于自动补全的测试数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 先删除原来的索引
DELETE music

# 新增 type 字段,类型为 completion,用于自动补全测试
PUT music
{
"mappings": {
"properties": {
"suggest": {
"type": "completion"
}
}
}
}

# 添加推荐
PUT music/_doc/1?refresh
{
"suggest" : {
"input": [ "Nevermind", "Nirvana" ],
"weight" : 34
}
}
PUT music/_doc/1?refresh
{
"suggest": [
{
"input": "Nevermind",
"weight": 10
},
{
"input": "Nirvana",
"weight": 3
}
]
}

获取推荐:

1
2
3
4
5
6
7
8
9
10
11
12
13
POST music/_search
{
"suggest": {
"song-suggest": {
"prefix": "nir",
"completion": {
"field": "suggest",
"size": 5,
"skip_duplicates": true
}
}
}
}

:::

Context Suggester

Context Suggester 是 Completion Suggester 的扩展,可以实现上下文感知推荐

ES 支持两种类型的上下文:

  • Category:任意字符串的分类。
  • Geo:地理位置信息。

在使用 Context Suggester 前需要定义 Mapping,然后在数据中加入相关的 Context 信息。

:::details context suggester 示例

构造用于 Context Suggester 的测试数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# 删除原来的索引
DELETE books_context

# 创建用于测试 Context Suggester 的索引
PUT books_context
{
"mappings": {
"properties": {
"book_id": {
"type": "keyword"
},
"name": {
"type": "text",
"analyzer": "standard"
},
"name_completion": {
"type": "completion",
"contexts": [
{
"name": "book_type",
"type": "category"
}
]
},
"author": {
"type": "keyword"
},
"intro": {
"type": "text"
},
"price": {
"type": "double"
},
"date": {
"type": "date"
}
}
},
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
}
}

# 导入测试数据
PUT books_context/_doc/4
{
"book_id": "4ee82465",
"name": "Linux Programming",
"name_completion": {
"input": ["Linux Programming"],
"contexts": {
"book_type": "program"
}
},
"author": "Richard Stones",
"intro": "Happy to Linux Programming",
"price": 10.9,
"date": "2022-06-01"
}
PUT books_context/_doc/5
{
"book_id": "4ee82466",
"name": "Linus Autobiography",
"name_completion": {
"input": ["Linus Autobiography"],
"contexts": {
"book_type": "autobiography"
}
},
"author": "Linus",
"intro": "Linus Autobiography",
"price": 14.9,
"date": "2012-06-01"
}

执行 Context Suggester 查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
POST books_context/_search
{
"suggest": {
"my_suggest": {
"prefix": "linu",
"completion": {
"field": "name_completion",
"contexts": {
"book_type": "autobiography"
}
}
}
}
}

:::

参考资料

Spring Bean

在 Spring 中,构成应用程序主体由 Spring IoC 容器管理的对象称为 Bean。Bean 是由 Spring IoC 容器实例化、装配和管理的对象。 Bean 以及它们之间的依赖关系反映在容器使用的配置元数据中。

Spring Bean 定义

BeanDefinition

Spring IoC 容器本身,并不能识别配置的元数据。为此,要将这些配置信息转为 Spring 能识别的格式——BeanDefinition 对象。

BeanDefinition 是 Spring 中定义 Bean 的配置元信息接口,它包含:

  • Bean 类名
  • Bean 行为配置元素,如:作用域、自动绑定的模式、生命周期回调等
  • 其他 Bean 引用,也可称为合作者(Collaborators)或依赖(Dependencies)
  • 配置设置,如 Bean 属性(Properties)

BeanDefinition 元信息

BeanDefinition 元信息如下:

属性(Property) 说明
Class 全类名,必须是具体类,不能用抽象类或接口
Name Bean 的名称或者 ID
Scope Bean 的作用域(如:singletonprototype 等)
Constructor arguments Bean 构造器参数(用于依赖注入)
Properties Bean 属性设置(用于依赖注入)
Autowiring mode Bean 自动绑定模式(如:通过名称 byName)
Lazy initialization mode Bean 延迟初始化模式(延迟和非延迟)
Initialization method Bean 初始化回调方法名称
Destruction method Bean 销毁回调方法名称

BeanDefinition 构建

BeanDefinition 构建方式:

  • 通过 BeanDefinitionBuilder

  • 通过 AbstractBeanDefinition 以及派生类

💻 Spring Bean 定义示例源码:BeanDefinitionTests

Spring Bean 命名

Spring Bean 命名规则

每个 Bean 拥有一个或多个标识符(identifiers),这些标识符在 Bean 所在的容器必须是唯一的。通常,一个 Bean 仅有一个标识符,如果需要额外的,可考虑使用别名(Alias)来扩充。

在基于 XML 的配置元信息中,开发人员可以使用 id 属性、name 属性或来指定 Bean 标识符。通常,Bean 的标识符由字母组成,允许出现特殊字符。如果要想引入 Bean 的别名的话,可在 name 属性使用半角逗号(“,”)或分号(“;”) 来间隔。

Spring 中,为 Bean 指定 idname 属性不是必须的。如果不指定,Spring 会自动为 Bean 分配一个唯一的名称。尽管 Bean 的命名没有限制,不过官方建议采用驼峰命名法来命名 Bean

Spring Bean 命名生成器

Spring 提供了两种 Spring Bean 命名生成器:

  • DefaultBeanNameGenerator:默认通用 BeanNameGenerator 实现。
  • AnnotationBeanNameGenerator:基于注解扫描的 BeanNameGenerator 实现。
1
2
3
public interface BeanNameGenerator {
String generateBeanName(BeanDefinition definition, BeanDefinitionRegistry registry);
}

Spring Bean 别名

Spring 支持通过 <alias> 属性为 Bean 设置别名。

Bean 别名(Alias)的作用:

  • 复用现有的 BeanDefinition
  • 更具有场景化的命名方法,比如:
    • <alias name="myApp-dataSource" alias="subsystemA-dataSource"/>
    • <alias name="myApp-dataSource" alias="subsystemB-dataSource"/>
1
2
3
4
<bean id="user" class="io.github.dunwu.spring.core.bean.entity.person.User">
<!-- 属性略 -->
</bean>
<alias name="user" alias="aliasUser" />

Spring Bean 生命周期

  1. Spring 对 Bean 进行实例化(相当于 new XXX())

  2. Spring 将值和引用注入到 Bean 对应的属性中

  3. 如果 Bean 实现了 BeanNameAware 接口,Spring 将 Bean 的 ID 传递给 setBeanName 方法

    • 作用是通过 Bean 的引用来获得 Bean ID,一般业务中是很少有用到 Bean 的 ID 的
  4. 如果 Bean 实现了 BeanFactoryAware 接口,Spring 将调用 setBeanDactory 方法,并把 BeanFactory 容器实例作为参数传入。

    • 作用是获取 Spring 容器,如 Bean 通过 Spring 容器发布事件等
  5. 如果 Bean 实现了 ApplicationContextAware 接口,Spring 容器将调用 setApplicationContext 方法,把应用上下文作为参数传入

    • 作用与 BeanFactory 类似都是为了获取 Spring 容器,不同的是 Spring 容器在调用 setApplicationContext 方法时会把它自己作为 setApplicationContext 的参数传入,而 Spring 容器在调用 setBeanFactory 前需要使用者自己指定(注入)setBeanFactory 里的参数 BeanFactory
  6. 如果 Bean 实现了 BeanPostProcess 接口,Spring 将调用 postProcessBeforeInitialization 方法

    • 作用是在 Bean 实例创建成功后对其进行增强处理,如对 Bean 进行修改,增加某个功能
  7. 如果 Bean 实现了 InitializingBean 接口,Spring 将调用 afterPropertiesSet 方法,作用与在配置文件中对 Bean 使用 init-method 声明初始化的作用一样,都是在 Bean 的全部属性设置成功后执行的初始化方法。

  8. 如果 Bean 实现了 BeanPostProcess 接口,Spring 将调用 postProcessAfterInitialization 方法

    • postProcessBeforeInitialization 是在 Bean 初始化前执行的,而 postProcessAfterInitialization 是在 Bean 初始化后执行的
  9. 经过以上的工作后,Bean 将一直驻留在应用上下文中给应用使用,直到应用上下文被销毁

  10. 如果 Bean 实现了 DispostbleBean 接口,Spring 将调用它的 destory 方法,作用与在配置文件中对 Bean 使用 destory-method 属性的作用一样,都是在 Bean 实例销毁前执行的方法。

Spring Bean 注册

注册 Spring Bean 实际上是将 BeanDefinition 注册到 IoC 容器中。

XML 配置元信息

Spring 的传统配置方式。在 <bean> 标签中配置元数据内容。

缺点是当 JavaBean 过多时,产生的配置文件足以让你眼花缭乱。

注解配置元信息

使用 @Bean@Component@Import 注解注册 Spring Bean。

Java API 配置元信息

  • 命名方式:BeanDefinitionRegistry#registerBeanDefinition(String,BeanDefinition)
  • 非命名方式:BeanDefinitionReaderUtils#registerWithGeneratedName(AbstractBeanDefinition,BeanDefinitionRegistry)
  • 配置类方式:AnnotatedBeanDefinitionReader#register(Class...)

💻 Spring Bean 注册示例源码:BeanRegistryTests

Spring Bean 实例化

Spring Bean 实例化方式:

  • 常规方式
    • 通过构造器(配置元信息:XML、Java 注解和 Java API)
    • 通过静态方法(配置元信息:XML、Java 注解和 Java API)
    • 通过 Bean 工厂方法(配置元信息:XML、Java 注解和 Java API)
    • 通过 FactoryBean(配置元信息:XML、Java 注解和 Java API)
  • 特殊方式
    • 通过 ServiceLoaderFactoryBean(配置元信息:XML、Java 注解和 Java API )
    • 通过 AutowireCapableBeanFactory#createBean(java.lang.Class, int, boolean)
    • 通过 BeanDefinitionRegistry#registerBeanDefinition(String,BeanDefinition)

💻 Spring Bean 实例化示例源码:BeanInstantiationTestsBeanInstantiationSpecialTests

Spring Bean 初始化和销毁

Spring Bean 初始化和销毁的方式有以下几种:

  1. 使用 @PostConstruct@PreDestroy 注解分别指定相应的初始化方法和销毁方法。

  2. 实现 InitializingBean 接口的 afterPropertiesSet() 方法来编写初始化方法;实现 DisposableBean 接口的 destroy() 方法来编写销毁方法。

    • InitializingBean 接口包含一个 afterPropertiesSet 方法,可以通过实现该接口,然后在这个方法中编写初始化逻辑。
    • DisposableBean接口包含一个 destory 方法,可以通过实现该接口,然后在这个方法中编写销毁逻辑。
  3. 自定义初始化方法

    • XML 配置:<bean init-method="init" destroy="destroy" ... />
    • Java 注解:@Bean(initMethod = "init", destroyMethod = "destroy")
    • Java API:AbstractBeanDefinition#setInitMethodName(String)AbstractBeanDefinition#setDestroyMethodName(String) 分别定义初始化和销毁方法

注意:如果同时存在,执行顺序会按照序列执行。

Bean 的延迟初始化

  • xml 方式:<bean lazy-init="true" ... />
  • 注解方式:@Lazy

Spring 提供了一个 BeanPostProcessor 接口,提供了两个方法 postProcessBeforeInitializationpostProcessAfterInitialization。其中postProcessBeforeInitialization 在组件的初始化方法调用之前执行,postProcessAfterInitialization 在组件的初始化方法调用之后执行。它们都包含两个入参:

  • bean:当前组件对象;
  • beanName:当前组件在容器中的名称。

💻 Spring Bean 初始化和销毁示例源码:BeanInitDestroyTests

Spring Bean 垃圾回收

Spring Bean 垃圾回收步骤:

  1. 关闭 Spring 容器(应用上下文)
  2. 执行 GC
  3. Spring Bean 覆盖的 finalize() 方法被回调

Spring Bean 作用范围

Scope Description
singleton (Default) Scopes a single bean definition to a single object instance for each Spring IoC container.
prototype Scopes a single bean definition to any number of object instances.
request Scopes a single bean definition to the lifecycle of a single HTTP request. That is, each HTTP request has its own instance of a bean created off the back of a single bean definition. Only valid in the context of a web-aware Spring ApplicationContext.
session Scopes a single bean definition to the lifecycle of an HTTP Session. Only valid in the context of a web-aware Spring ApplicationContext.
application Scopes a single bean definition to the lifecycle of a ServletContext. Only valid in the context of a web-aware Spring ApplicationContext.
websocket Scopes a single bean definition to the lifecycle of a WebSocket. Only valid in the context of a web-aware Spring ApplicationContext.

参考资料

SpringBoot 之快速入门

Spring Boot 简介

Spring Boot 可以让使用者非常方便的创建 Spring 应用。

Spring Boot 的目标是:

  • 为所有 Spring 开发者提供更快且可广泛访问的入门体验。
  • 开箱即用
  • 提供一系列通用的非功能特性(例如嵌入式服务、安全、指标、健康检查和外部化配置)
  • 完全不需要代码生成,也不需要 XML 配置。

Spring Boot 系统要求

Spring Boot 的构建工具要求:

Build Tool Version
Maven 3.5+
Gradle 6.8.x, 6.9.x, and 7.x

Spring Boot 支持的 Servlet 容器:

Name Servlet Version
Tomcat 9.0 4.0
Jetty 9.4 3.1
Jetty 10.0 4.0
Undertow 2.0 4.0

部署第一个 Spring Boot 项目

本节介绍如何开发一个小的“Hello World!” web 应用示例,来展示 Spring Boot 的一些关键功能。我们使用 Maven 来构建这个项目,因为大多数 IDE 都支持它。

环境检查

Spring Boot 项目依赖于 Java 环境和 Mave,开始项目之前需要先检查一下环境。

本地是否已安装 Java:

1
2
3
4
$ java -version
java version "1.8.0_102"
Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.102-b14, mixed mode)

本地是否已安装 Maven:

1
2
3
4
$ mvn -v
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-17T14:33:14-04:00)
Maven home: /usr/local/Cellar/maven/3.3.9/libexec
Java version: 1.8.0_102, vendor: Oracle Corporation

创建 pom

我们需要从创建 Maven pom.xml 文件开始。 pom.xml 是 Maven 用于构建项目的配置文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.example</groupId>
<artifactId>myproject</artifactId>
<version>0.0.1-SNAPSHOT</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.1</version>
</parent>

<!-- Additional lines to be added here... -->

</project>

使用者可以通过运行 mvn package 来测试它

添加依赖

Spring Boot 提供了许多启动器(Starters)以应对不同的使用场景。使用者可将 jars 添加到类路径中。我们的示例程序在 POM 的 parent 使用 spring-boot-starter-parent。 spring-boot-starter-parent 是一个特殊的启动器,提供有用的 Maven 默认值。它还提供了一个依赖项的版本管理,可以让使用者使用时不必显示指定版本。

其他启动器(Starters)提供了各种针对不同使用场景的功能。比如,我们需要开发一个 Web 应用程序,就可以添加了一个 spring-boot-starter-web 依赖项。在此之前,我们可以通过运行以下命令来查看我们当前拥有的 maven 依赖:

1
2
3
$ mvn dependency:tree

[INFO] com.example:myproject:jar:0.0.1-SNAPSHOT

mvn dependency:tree 命令打印项目依赖项的层级结构。可以看到 spring-boot-starter-parent 本身没有提供任何依赖。要添加必要的依赖,需要编辑 pom.xml 并在 <dependencies> 部分添加 spring-boot-starter-web 依赖项:

1
2
3
4
5
6
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>

编写代码

要运行应用程序,我们需要创建一个启动类。默认情况下,Maven 从 src/main/java 编译源代码,因此您需要创建该目录结构,然后添加一个名为 src/main/java/MyApplication.java 的文件以包含以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RestController
@EnableAutoConfiguration
public class MyApplication {

@RequestMapping("/")
String home() {
return "Hello World!";
}

public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}

}

说明:

@RestController 注解告诉 Spring,这个类是用来处理 Rest 请求的。

@RequestMapping 注解提供了“路由”信息。它告诉 Spring 任何带有 / 路径的 HTTP 请求都应该映射到 home 方法。 @RestController 注解告诉 Spring 将结果字符串直接呈现给调用者。

@EnableAutoConfiguration 注解告诉 Spring Boot 根据你添加的 jar 依赖去自动装配 Spring。

自动配置旨在与“Starters”配合使用,但这两个概念并没有直接联系。您可以自由选择 starters 之外的 jar 依赖项。 Spring Boot 仍然尽力自动配置您的应用程序。

Spring Boot 的 main 方法通过调用 run 委托给 Spring Boot 的 SpringApplication 类。 SpringApplication 引导我们的应用程序,启动 Spring,进而启动自动配置的 Tomcat Web 服务器。我们需要将 MyApplication.class 作为参数传递给 run 方法,以告诉 SpringApplication 哪个是入口类。还传递 args 数组以公开任何命令行参数。

运行示例

此时,您的应用程序应该可以工作了。由于您使用了 spring-boot-starter-parent POM,因此您有一个有用的运行目标,可用于启动应用程序。从项目根目录键入 mvn spring-boot:run 以启动应用程序。您应该会看到类似于以下内容的输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
$ mvn spring-boot:run

. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.6.1)
....... . . .
....... . . . (log output here)
....... . . .
........ Started MyApplication in 2.222 seconds (JVM running for 6.514)

如果您打开 Web 浏览器访问 localhost:8080,您应该会看到以下输出:

1
Hello World!

要正常退出应用程序,请按 ctrl-c

创建可执行 jar

要创建一个可执行的 jar,我们需要将 spring-boot-maven-plugin 添加到我们的 pom.xml 中。为此,请在依赖项部分下方插入以下行:

1
2
3
4
5
6
7
8
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

保存 pom.xml 并从命令行运行 mvn package,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ mvn package

[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building myproject 0.0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO] .... ..
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ myproject ---
[INFO] Building jar: /Users/developer/example/spring-boot-example/target/myproject-0.0.1-SNAPSHOT.jar
[INFO]
[INFO] --- spring-boot-maven-plugin:2.6.1:repackage (default) @ myproject ---
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

如果您查看 target 目录,应该会看到 myproject-0.0.1-SNAPSHOT.jar。该文件的大小应约为 10 MB。如果想看里面,可以使用 jar tvf,如下:

1
$ jar tvf target/myproject-0.0.1-SNAPSHOT.jar

您还应该在目标目录中看到一个更小的名为 myproject-0.0.1-SNAPSHOT.jar.original 的文件。这是 Maven 在 Spring Boot 重新打包之前创建的原始 jar 文件。

要运行该应用程序,请使用 java -jar 命令,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
$ java -jar target/myproject-0.0.1-SNAPSHOT.jar

. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.6.1)
....... . . .
....... . . . (log output here)
....... . . .
........ Started MyApplication in 2.536 seconds (JVM running for 2.864)

和以前一样,要退出应用程序,请按 ctrl-c

通过 SPRING INITIALIZR 创建 Spring Boot 项目

创建项目

通过 SPRING INITIALIZR 工具产生基础项目

  1. 访问:http://start.spring.io/
  2. 选择构建工具Maven Project、Spring Boot 版本 1.5.10 以及一些工程基本信息,可参考下图所示:

img

  1. 点击Generate Project下载项目压缩包
  2. 解压压缩包,包中已是一个完整的项目。

如果你使用 Intellij 作为 IDE,那么你可以直接使用 SPRING INITIALIZR,参考下图操作:

img

项目说明

重要文件

  • src/main/java 路径下的 Chapter1Application 类 :程序入口
  • src/main/resources 路径下的 application.properties :项目配置文件
  • src/test/java 路径下的 Chapter01ApplicationTests :程序测试入口

pom.xml

pom 中指定 parent 为以下内容,表示此项目继承了 spring-boot-starter-parent 的 maven 配置(主要是指定了常用依赖、插件的版本)。

1
2
3
4
5
6
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.10.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

此外,pom 中默认引入两个依赖包,和一个插件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
  • spring-boot-starter-web:核心模块,包括自动配置支持、日志和 YAML。
  • spring-boot-starter-test:测试模块,包括 JUnit、Hamcrest、Mockito。
  • spring-boot-maven-plugin:spring boot 插件, 提供了一系列 spring boot 相关的 maven 操作。
    • spring-boot:build-info,生成 Actuator 使用的构建信息文件 build-info.properties
    • spring-boot:repackage,默认 goal。在 mvn package 之后,再次打包可执行的 jar/war,同时保留 mvn package 生成的 jar/war 为.origin
    • spring-boot:run,运行 Spring Boot 应用
    • spring-boot:start,在 mvn integration-test 阶段,进行 Spring Boot 应用生命周期的管理
    • spring-boot:stop,在 mvn integration-test 阶段,进行 Spring Boot 应用生命周期的管理

编写 REST 服务

  • 创建 package ,名为 io.github.zp.springboot.chapter1.web(根据项目情况修改)
  • 创建 HelloController 类,内容如下:
1
2
3
4
5
6
7
8
9
@RestController
public class HelloController {

@RequestMapping("/hello")
public String index() {
return "Hello World";
}

}
  • 启动主程序 XXXApplication,打开浏览器访问http://localhost:8080/hello ,可以看到页面输出Hello World

编写单元测试用例

XXXApplicationTests 类中编写一个简单的单元测试来模拟 HTTP 请求,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = MockServletContext.class)
@WebAppConfiguration
public class SpringBootHelloWorldApplicationTest {

private MockMvc mvc;

@Before
public void setUp() {
mvc = MockMvcBuilders.standaloneSetup(new HelloController()).build();
}

@Test
public void getHello() throws Exception {
mvc.perform(MockMvcRequestBuilders.get("/hello").accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andExpect(content().string(equalTo("Hello World")));
}

}

使用MockServletContext来构建一个空的WebApplicationContext,这样我们创建的HelloController就可以在@Before函数中创建并传递到MockMvcBuilders.standaloneSetup()函数中。

  • 注意引入下面内容,让statuscontentequalTo函数可用
1
2
3
import static org.hamcrest.Matchers.equalTo;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

至此已完成目标,通过 Maven 构建了一个空白 Spring Boot 项目,再通过引入 web 模块实现了一个简单的请求处理。

示例源码

示例源码:spring-boot-web-helloworld

参考资料

CAP 和 BASE

一致性

一致性(Consistency)指的是多个数据副本是否能保持一致的特性。

在一致性的条件下,分布式系统在执行写操作成功后,如果所有用户都能够读取到最新的值,该系统就被认为具有强一致性。

数据一致性又可以分为以下几点:

  • 强一致性 - 数据更新操作结果和操作响应总是一致的,即操作响应通知更新失败,那么数据一定没有被更新,而不是处于不确定状态。
  • 弱一致性 - 系统在写入数据成功后,不承诺立即能读到最新的值,也不承诺什么时候能读到,但是过一段时间之后用户可以看到更新后的值。那么用户读不到最新数据的这段时间被称为“不一致窗口时间”。
  • 最终一致性 - 最终一致性作为弱一致性中的特例,强调的是所有数据副本,在经过一段时间的同步后,最终能够到达一致的状态,不需要实时保证系统数据的强一致性。

ACID

ACID 是数据库事务正确执行的四个基本要素的单词缩写:

  • 原子性(Atomicity)
    • 原子是指不可分解为更小粒度的东西。事务的原子性意味着:事务中的所有操作要么全部成功,要么全部失败
    • 回滚可以用日志来实现,日志记录着事务所执行的修改操作,在回滚时反向执行这些修改操作即可。
  • 一致性(Consistency)
    • 数据库在事务执行前后都保持一致性状态。
    • 在一致性状态下,所有事务对一个数据的读取结果都是相同的。
  • 隔离性(Isolation)
    • 同时运行的事务互不干扰。换句话说,一个事务所做的修改在最终提交以前,对其它事务是不可见的。
  • 持久性(Durability)
    • 一旦事务提交,则其所做的修改将会永远保存到数据库中。即使系统发生崩溃,事务执行的结果也不能丢失。
    • 可以通过数据库备份和恢复来实现,在系统发生奔溃时,使用备份的数据库进行数据恢复。

一个支持事务(Transaction)的数据库系统,必需要具有这四种特性,否则在事务过程(Transaction processing)当中无法保证数据的正确性。

  • 只有满足一致性,事务的执行结果才是正确的。
  • 在无并发的情况下,事务串行执行,隔离性一定能够满足。此时只要能满足原子性,就一定能满足一致性。
  • 在并发的情况下,多个事务并行执行,事务不仅要满足原子性,还需要满足隔离性,才能满足一致性。
  • 事务满足持久化是为了能应对系统崩溃的情况。

CAP 定理

CAP 简介

1998 年,Brewer 提出了分布式系统领域大名鼎鼎的 CAP 定理。

CAP 定理提出:分布式系统有三个指标,这三个指标不能同时做到:

  • 一致性(Consistency) - 在任何给定时间,网络中的所有节点都具有完全相同(最近)的值。
  • 可用性(Availability) - 对网络的每个请求都会返回响应,但不能保证返回的数据是最新的。
  • 分区容错性(Partition Tolerance) - 即使任意数量的节点出现故障,网络仍会继续运行。

CAP 就是取 Consistency、Availability、Partition Tolerance 的首字母而命名。

在分布式系统中,分区容错性是一个既定的事实:因为分布式系统总会出现各种各样的问题,如由于网络原因而导致节点失联;发生机器故障;机器重启或升级等等。因此,CAP 定理实际上是要在可用性(A)和一致性(C)之间做权衡

AP 模式

对网络的每个请求都会收到响应,即使由于网络分区(故障节点)而无法保证数据一定是最新的。

选择 AP 模式,偏向于保证服务的高可用性。用户访问系统的时候,都能得到响应数据,不会出现响应错误;但是,当出现分区故障时,相同的读操作,访问不同的节点,得到响应数据可能不一样。

CP 模式

如果由于网络分区(故障节点)而无法保证特定信息是最新的,则系统将返回错误或超时。

选择 CP 模式,一旦因为消息丢失、延迟过高发生了网络分区,就会影响用户的体验和业务的可用性。因为为了防止数据不一致,系统将拒绝新数据的写入。

CAP 定理的应用

CAP 定理在分布式系统设计中,可以被应用与哪些方面?

一个最具代表性的问题是:服务注册中心应该选择 AP 还是 CP?

在微服务架构下,服务注册和服务发现机制中主要有三种角色:

  • 服务提供者(RPC Server / Provider)
  • 服务消费者(RPC Client / Consumer)
  • 服务注册中心(Registry)

注册中心负责协调服务注册和服务发现,显然它是核心中的核心。主流的注册中心有很多,如:ZooKeeper、Nacos、Eureka、Consul、etcd 等。在针对注册中心进行技术选型时,其 CAP 设计也是一个比较的维度。

  • CP 模型代表:ZooKeeper、etcd。系统强调数据的一致性,当数据一致性无法保证时(如:正在选举主节点),系统拒绝请求。
  • AP 模型代表:Nacos、Eureka。系统强调可用性,牺牲一定的一致性(即服务节点上的数据不保证是最新的),来保证整体服务可用。

对于服务注册中心而言,即使不同节点保存的服务注册信息存在差异,也不会造成灾难性的后果,仅仅是信息滞后而已。但是,如果为了追求数据一致性,使得服务发现短时间内不可用,负面影响更严重。所以,对于服务注册中心而言,可用性比一致性更重要,一般应该选择 AP 模型。

CAP 定理的误导

CAP 定理在分布式系统领域大名鼎鼎,以至于被很多人视为了真理。然而,CAP 定理真的正确吗?

网络分区是一种故障,不管喜欢还是不喜欢,它都可能发生,所以无法选择或逃避分区的问题。在网络正常的时候,系统可以同时保证一致性(线性化)和可用性。而一旦发生了网络故障,必须要么选择一致性,要么选择可用性。因此,对 CAP 更准确的理解应该是:当发生网络分区(P)的情况下,可用性(A)和一致性(C)二者只能选其一

CAP 定理所描述的模型实际上局限性很大,它只考虑了一种一致性模型和一种故障(网络分区故障),而没有考虑网络延迟、节点失效等情况。因此,它对于指导一个具体的分布式系统设计来说,没有太大的实际价值。

值得一提的是,在 CAP 定理提出十二年之后,其提出者也发表了一篇文章 CAP Twelve Years Later: How the “Rules” Have Changed,来阐述 CAP 定理的局限性。

BASE 定理

BASE 是 基本可用(Basically Available)软状态(Soft State)最终一致性(Eventually Consistent) 三个短语的缩写。BASE 定理是对 CAP 定理中可用性(A)和一致性(C)权衡的结果。

BASE 定理的核心思想是:即使无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式来使系统达到最终一致性。

  • 基本可用(Basically Available) - 分布式系统在出现故障的时候,保证核心可用,允许损失部分可用性。例如,电商在做促销时,为了保证购物系统的稳定性,部分消费者可能会被引导到一个降级的页面。
  • 软状态(Soft State) - 指允许系统中的数据存在中间状态,并认为该中间状态不会影响系统整体可用性,即允许系统不同节点的数据副本之间进行同步的过程存在延时
  • 最终一致性(Eventually Consistent) - 强调的是所有数据副本,在经过一段时间的同步后,最终能够到达一致的状态,不需要实时保证系统数据的强一致性。

BASE vs. ACID

BASE 定理的核心思想是:即使无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式来使系统达到最终一致性。

ACID 要求强一致性,通常运用在传统的数据库系统上。而 BASE 要求最终一致性,通过牺牲强一致性来达到可用性,通常运用在大型分布式系统中。

在实际的分布式场景中,不同业务单元和组件对一致性的要求是不同的,因此 ACID 和 BASE 往往会结合在一起使用。

参考资料