Dunwu Blog

大道至简,知易行难

Flink 入门

Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

img

概念

处理无界和有界数据

任何类型的数据都可以形成一种事件流。

数据可以被作为 无界 或者 有界 流来处理。

  • 无界流 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
  • 有界流 有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。

img

核心概念

显而易见,(数据)流是流处理的基本要素。然而,流也拥有着多种特征。这些特征决定了流如何以及何时被处理。Flink 是一个能够处理任何类型数据流的强大处理框架。

  • 有界无界 的数据流:流可以是无界的;也可以是有界的,例如固定大小的数据集。Flink 在无界的数据流处理上拥有诸多功能强大的特性,同时也针对有界的数据流开发了专用的高效算子。
  • 实时历史记录 的数据流:所有的数据都是以流的方式产生,但用户通常会使用两种截然不同的方法处理数据。或是在数据生成时进行实时的处理;亦或是先将数据流持久化到存储系统中——例如文件系统或对象存储,然后再进行批处理。Flink 的应用能够同时支持处理实时以及历史记录数据流。

状态

只有在每一个单独的事件上进行转换操作的应用才不需要状态,换言之,每一个具有一定复杂度的流处理应用都是有状态的。任何运行基本业务逻辑的流处理应用都需要在一定时间内存储所接收的事件或中间结果,以供后续的某个时间点(例如收到下一个事件或者经过一段特定时间)进行访问并进行后续处理。

img

应用状态是 Flink 中的一等公民,Flink 提供了许多状态管理相关的特性支持,其中包括:

  • 多种状态基础类型:Flink 为多种不同的数据结构提供了相对应的状态基础类型,例如原子值(value),列表(list)以及映射(map)。开发者可以基于处理函数对状态的访问方式,选择最高效、最适合的状态基础类型。
  • 插件化的 State Backend:State Backend 负责管理应用程序状态,并在需要的时候进行 checkpoint。Flink 支持多种 state backend,可以将状态存在内存或者 RocksDB。RocksDB 是一种高效的嵌入式、持久化键值存储引擎。Flink 也支持插件式的自定义 state backend 进行状态存储。
  • 精确一次语义:Flink 的 checkpoint 和故障恢复算法保证了故障发生后应用状态的一致性。因此,Flink 能够在应用程序发生故障时,对应用程序透明,不造成正确性的影响。
  • 超大数据量状态:Flink 能够利用其异步以及增量式的 checkpoint 算法,存储数 TB 级别的应用状态。
  • 可弹性伸缩的应用:Flink 能够通过在更多或更少的工作节点上对状态进行重新分布,支持有状态应用的分布式的横向伸缩。

时间

时间是流处理应用另一个重要的组成部分。因为事件总是在特定时间点发生,所以大多数的事件流都拥有事件本身所固有的时间语义。进一步而言,许多常见的流计算都基于时间语义,例如窗口聚合、会话计算、模式检测和基于时间的 join。流处理的一个重要方面是应用程序如何衡量时间,即区分事件时间(event-time)和处理时间(processing-time)。

Flink 提供了丰富的时间语义支持。

  • 事件时间模式:使用事件时间语义的流处理应用根据事件本身自带的时间戳进行结果的计算。因此,无论处理的是历史记录的事件还是实时的事件,事件时间模式的处理总能保证结果的准确性和一致性。
  • Watermark 支持:Flink 引入了 watermark 的概念,用以衡量事件时间进展。Watermark 也是一种平衡处理延时和完整性的灵活机制。
  • 迟到数据处理:当以带有 watermark 的事件时间模式处理数据流时,在计算完成之后仍会有相关数据到达。这样的事件被称为迟到事件。Flink 提供了多种处理迟到数据的选项,例如将这些数据重定向到旁路输出(side output)或者更新之前完成计算的结果。
  • 处理时间模式:除了事件时间模式,Flink 还支持处理时间语义。处理时间模式根据处理引擎的机器时钟触发计算,一般适用于有着严格的低延迟需求,并且能够容忍近似结果的流处理应用。
  • Flink 是基于事件驱动 (Event-driven) 的应用,能够同时支持流处理和批处理
  • 基于内存计算,能够保证高吞吐和低延迟,具有优越的性能表现;
  • 支持精确一次 (Exactly-once) 语意,能够完美地保证一致性和正确性;
  • 分层 API ,能够满足各个层次的开发需求;
  • 支持高可用配置,支持保存点机制,能够提供安全性和稳定性上的保证;
  • 多样化的部署方式,支持本地,远端,云端等多种部署方案;
  • 具有横向扩展架构,能够按照用户的需求进行动态扩容;
  • 活跃度极高的社区和完善的生态圈的支持。

分层架构

Flink 采用分层的架构设计,从而保证各层在功能和职责上的清晰。

自上而下,分别是 API & Libraries 层、Runtime 核心层以及物理部署层:

API & Libraries 层

这一层主要提供了编程 API 和 顶层类库:

  • 编程 API : 用于进行流处理的 DataStream API 和用于进行批处理的 DataSet API;
    • SQL & Table API - SQL & Table API 同时适用于批处理和流处理,这意味着你可以对有界数据流和无界数据流以相同的语义进行查询,并产生相同的结果。除了基本查询外, 它还支持自定义的标量函数,聚合函数以及表值函数,可以满足多样化的查询需求。
    • DataStream & DataSet API - DataStream & DataSet API 是 Flink 数据处理的核心 API,支持使用 Java 语言或 Scala 语言进行调用,提供了数据读取,数据转换和数据输出等一系列常用操作的封装。
    • Stateful Stream Processing - Stateful Stream Processing 是最低级别的抽象,它通过 Process Function 函数内嵌到 DataStream API 中。 Process Function 是 Flink 提供的最底层 API,具有最大的灵活性,允许开发者对于时间和状态进行细粒度的控制。
  • 顶层类库
    • 用于复杂事件处理的 CEP 库;
    • 用于结构化数据查询的 SQL & Table 库;
    • 基于批处理的机器学习库 FlinkML
    • 图形处理库 Gelly。

Runtime 核心层

这一层是 Flink 分布式计算框架的核心实现层,包括作业转换,任务调度,资源分配,任务执行等功能,基于这一层的实现,可以在流式引擎下同时运行流处理程序和批处理程序。

物理部署层

Flink 的物理部署层,用于支持在不同平台上部署运行 Flink 应用。

集群架构

核心组件

按照上面的介绍,Flink 核心架构的第二层是 Runtime 层, 该层采用标准的 Master - Slave 结构, 其中,Master 部分又包含了三个核心组件:Dispatcher、ResourceManager 和 JobManager,而 Slave 则主要是 TaskManager 进程。它们的功能分别如下:

  • JobManagers (也称为 masters) :JobManagers 接收由 Dispatcher 传递过来的执行程序,该执行程序包含了作业图 (JobGraph),逻辑数据流图 (logical dataflow graph) 及其所有的 classes 文件以及第三方类库 (libraries) 等等 。紧接着 JobManagers 会将 JobGraph 转换为执行图 (ExecutionGraph),然后向 ResourceManager 申请资源来执行该任务,一旦申请到资源,就将执行图分发给对应的 TaskManagers 。因此每个作业 (Job) 至少有一个 JobManager;高可用部署下可以有多个 JobManagers,其中一个作为 _leader_,其余的则处于 standby 状态。
  • TaskManagers (也称为 workers) : TaskManagers **负责实际的子任务 (subtasks) 的执行,每个 TaskManagers 都拥有一定数量的 slots。Slot 是一组固定大小的资源的合集 (如计算能力,存储空间)**。TaskManagers 启动后,会将其所拥有的 slots 注册到 ResourceManager 上,由 ResourceManager 进行统一管理。
  • Dispatcher:负责接收客户端提交的执行程序,并传递给 JobManager 。除此之外,它还提供了一个 WEB UI 界面,用于监控作业的执行情况。
  • ResourceManager :负责管理 slots 并协调集群资源。ResourceManager 接收来自 JobManager 的资源请求,并将存在空闲 slots 的 TaskManagers 分配给 JobManager 执行任务。Flink 基于不同的部署平台,如 YARN , Mesos,K8s 等提供了不同的资源管理器,当 TaskManagers 没有足够的 slots 来执行任务时,它会向第三方平台发起会话来请求额外的资源。

img

Task & SubTask

上面我们提到:TaskManagers 实际执行的是 SubTask,而不是 Task,这里解释一下两者的区别:

在执行分布式计算时,Flink 将可以链接的操作 (operators) 链接到一起,这就是 Task。之所以这样做, 是为了减少线程间切换和缓冲而导致的开销,在降低延迟的同时可以提高整体的吞吐量。 但不是所有的 operator 都可以被链接,如下 keyBy 等操作会导致网络 shuffle 和重分区,因此其就不能被链接,只能被单独作为一个 Task。 简单来说,一个 Task 就是一个可以链接的最小的操作链 (Operator Chains) 。如下图,source 和 map 算子被链接到一块,因此整个作业就只有三个 Task:

img

解释完 Task ,我们在解释一下什么是 SubTask,其准确的翻译是: _A subtask is one parallel slice of a task_,即一个 Task 可以按照其并行度拆分为多个 SubTask。如上图,source & map 具有两个并行度,KeyBy 具有两个并行度,Sink 具有一个并行度,因此整个虽然只有 3 个 Task,但是却有 5 个 SubTask。Jobmanager 负责定义和拆分这些 SubTask,并将其交给 Taskmanagers 来执行,每个 SubTask 都是一个单独的线程。

4.3 资源管理

理解了 SubTasks ,我们再来看看其与 Slots 的对应情况。一种可能的分配情况如下:

img

这时每个 SubTask 线程运行在一个独立的 TaskSlot, 它们共享所属的 TaskManager 进程的 TCP 连接(通过多路复用技术)和心跳信息 (heartbeat messages),从而可以降低整体的性能开销。此时看似是最好的情况,但是每个操作需要的资源都是不尽相同的,这里假设该作业 keyBy 操作所需资源的数量比 Sink 多很多 ,那么此时 Sink 所在 Slot 的资源就没有得到有效的利用。

基于这个原因,Flink 允许多个 subtasks 共享 slots,即使它们是不同 tasks 的 subtasks,但只要它们来自同一个 Job 就可以。假设上面 souce & map 和 keyBy 的并行度调整为 6,而 Slot 的数量不变,此时情况如下:

img

可以看到一个 Task Slot 中运行了多个 SubTask 子任务,此时每个子任务仍然在一个独立的线程中执行,只不过共享一组 Sot 资源而已。那么 Flink 到底如何确定一个 Job 至少需要多少个 Slot 呢?Flink 对于这个问题的处理很简单,默认情况一个 Job 所需要的 Slot 的数量就等于其 Operation 操作的最高并行度。如下, A,B,D 操作的并行度为 4,而 C,E 操作的并行度为 2,那么此时整个 Job 就需要至少四个 Slots 来完成。通过这个机制,Flink 就可以不必去关心一个 Job 到底会被拆分为多少个 Tasks 和 SubTasks。

img

4.4 组件通讯

Flink 的所有组件都基于 Actor System 来进行通讯。Actor system 是多种角色的 actor 的容器,它提供调度,配置,日志记录等多种服务,并包含一个可以启动所有 actor 的线程池,如果 actor 是本地的,则消息通过共享内存进行共享,但如果 actor 是远程的,则通过 RPC 的调用来传递消息。

按照统计维度的不同,Flink 中的窗口可以分为 时间窗口 (Time Windows) 和 计数窗口 (Count Windows) 。

时间窗口

时间窗口 (Time Windows) 用于以时间为维度来进行数据聚合,具体分为以下四类:

Tumbling Windows

滚动窗口 (Tumbling Windows) 是指彼此之间没有重叠的窗口。例如:每隔 1 小时统计过去 1 小时内的商品点击量,那么 1 天就只能分为 24 个窗口,每个窗口彼此之间是不存在重叠的,具体如下:

img

Sliding Windows

滑动窗口(Sliding Windows)用于滚动进行聚合分析,例如:每隔 6 分钟统计一次过去一小时内所有商品的点击量,那么统计窗口彼此之间就是存在重叠的,即 1 天可以分为 240 个窗口。图示如下:

img

可以看到 window 1 - 4 这四个窗口彼此之间都存在着时间相等的重叠部分。想要实现滑动窗口,只需要在使用 timeWindow 方法时额外传递第二个参数作为滚动时间即可,具体如下:

1
2
// 每隔3秒统计一次过去1分钟内的数据
timeWindow(Time.minutes(1),Time.seconds(3))

Session Windows

当用户在进行持续浏览时,可能每时每刻都会有点击数据,例如在活动区间内,用户可能频繁的将某类商品加入和移除购物车,而你只想知道用户本次浏览最终的购物车情况,此时就可以在用户持有的会话结束后再进行统计。想要实现这类统计,可以通过会话窗口(Session Windows) 来进行实现。

img

具体的实现代码如下:

1
2
3
4
// 以处理时间为衡量标准,如果10秒内没有任何数据输入,就认为会话已经关闭,此时触发统计
window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
// 以事件时间为衡量标准
window(EventTimeSessionWindows.withGap(Time.seconds(10)))

Global Windows

最后一个窗口是全局窗口(Global Windows), 全局窗口会将所有 key 相同的元素分配到同一个窗口中,其通常配合触发器 (trigger) 进行使用。如果没有相应触发器,则计算将不会被执行。

img

这里继续以上面词频统计的案例为例,示例代码如下:

1
2
// 当单词累计出现的次数每达到10次时,则触发计算,计算整个窗口内该单词出现的总数
window(GlobalWindows.create()).trigger(CountTrigger.of(10)).sum(1).print();

统计窗口

统计窗口(Count Windows)用于以数量为维度来进行数据聚合,同样也分为滚动窗口和滑动窗口,实现方式也和时间窗口完全一致,只是调用的 API 不同,具体如下:

1
2
3
4
// 滚动计数窗口,每1000次点击则计算一次
countWindow(1000)
// 滑动计数窗口,每10次点击发生后,则计算过去1000次点击的情况
countWindow(1000,10)

实际上计数窗口内部就是调用的我们上一部分介绍的全局窗口来实现的,其源码如下:

1
2
3
4
5
6
7
8
9
10
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}


public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}

Flink 一个比较重要的特性就是其支持有状态计算。即你可以将中间的计算结果进行保存,并提供给后续的计算使用:


img

具体而言,Flink 又将状态 (State) 分为 Keyed State 与 Operator State:

2.1 算子状态

算子状态 (Operator State):顾名思义,状态是和算子进行绑定的,一个算子的状态不能被其他算子所访问到。官方文档上对 Operator State 的解释是:_each operator state is bound to one parallel operator instance_,所以更为确切的说一个算子状态是与一个并发的算子实例所绑定的,即假设算子的并行度是 2,那么其应有两个对应的算子状态:

img

2.2 键控状态

键控状态 (Keyed State) :是一种特殊的算子状态,即状态是根据 key 值进行区分的,Flink 会为每类键值维护一个状态实例。如下图所示,每个颜色代表不同 key 值,对应四个不同的状态实例。需要注意的是键控状态只能在 KeyedStream 上进行使用,我们可以通过 stream.keyBy(...) 来得到 KeyedStream

img

事件驱动型应用

什么是事件驱动型应用?

事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。

事件驱动型应用是在计算存储分离的传统应用基础上进化而来。在传统架构中,应用需要读写远程事务型数据库。

相反,事件驱动型应用是基于状态化流处理来完成。在该设计中,数据和计算不会分离,应用只需访问本地(内存或磁盘)即可获取数据。系统容错性的实现依赖于定期向远程持久化存储写入 checkpoint。下图描述了传统应用和事件驱动型应用架构的区别。

img

事件驱动型应用的优势?

事件驱动型应用无须查询远程数据库,本地数据访问使得它具有更高的吞吐和更低的延迟。而由于定期向远程持久化存储的 checkpoint 工作可以异步、增量式完成,因此对于正常事件处理的影响甚微。事件驱动型应用的优势不仅限于本地数据访问。传统分层架构下,通常多个应用会共享同一个数据库,因而任何对数据库自身的更改(例如:由应用更新或服务扩容导致数据布局发生改变)都需要谨慎协调。反观事件驱动型应用,由于只需考虑自身数据,因此在更改数据表示或服务扩容时所需的协调工作将大大减少。

事件驱动型应用会受制于底层流处理系统对时间和状态的把控能力,Flink 诸多优秀特质都是围绕这些方面来设计的。它提供了一系列丰富的状态操作原语,允许以精确一次的一致性语义合并海量规模(TB 级别)的状态数据。此外,Flink 还支持事件时间和自由度极高的定制化窗口逻辑,而且它内置的 ProcessFunction 支持细粒度时间控制,方便实现一些高级业务逻辑。同时,Flink 还拥有一个复杂事件处理(CEP)类库,可以用来检测数据流中的模式。

Flink 中针对事件驱动应用的明星特性当属 savepoint。Savepoint 是一个一致性的状态映像,它可以用来初始化任意状态兼容的应用。在完成一次 savepoint 后,即可放心对应用升级或扩容,还可以启动多个版本的应用来完成 A/B 测试。

典型的事件驱动型应用实例

数据分析应用

什么是数据分析应用?

数据分析任务需要从原始数据中提取有价值的信息和指标。传统的分析方式通常是利用批查询,或将事件记录下来并基于此有限数据集构建应用来完成。为了得到最新数据的分析结果,必须先将它们加入分析数据集并重新执行查询或运行应用,随后将结果写入存储系统或生成报告。

借助一些先进的流处理引擎,还可以实时地进行数据分析。和传统模式下读取有限数据集不同,流式查询或应用会接入实时事件流,并随着事件消费持续产生和更新结果。这些结果数据可能会写入外部数据库系统或以内部状态的形式维护。仪表展示应用可以相应地从外部数据库读取数据或直接查询应用的内部状态。

如下图所示,Apache Flink 同时支持流式及批量分析应用。

img

流式分析应用的优势?

和批量分析相比,由于流式分析省掉了周期性的数据导入和查询过程,因此从事件中获取指标的延迟更低。不仅如此,批量查询必须处理那些由定期导入和输入有界性导致的人工数据边界,而流式查询则无须考虑该问题。

另一方面,流式分析会简化应用抽象。批量查询的流水线通常由多个独立部件组成,需要周期性地调度提取数据和执行查询。如此复杂的流水线操作起来并不容易,一旦某个组件出错将会影响流水线的后续步骤。而流式分析应用整体运行在 Flink 之类的高端流处理系统之上,涵盖了从数据接入到连续结果计算的所有步骤,因此可以依赖底层引擎提供的故障恢复机制。

Flink 为持续流式分析和批量分析都提供了良好的支持。具体而言,它内置了一个符合 ANSI 标准的 SQL 接口,将批、流查询的语义统一起来。无论是在记录事件的静态数据集上还是实时事件流上,相同 SQL 查询都会得到一致的结果。同时 Flink 还支持丰富的用户自定义函数,允许在 SQL 中执行定制化代码。如果还需进一步定制逻辑,可以利用 Flink DataStream API 和 DataSet API 进行更低层次的控制。此外,Flink 的 Gelly 库为基于批量数据集的大规模高性能图分析提供了算法和构建模块支持。

典型的数据分析应用实例

数据管道应用

什么是数据管道?

提取-转换-加载(ETL)是一种在存储系统之间进行数据转换和迁移的常用方法。ETL 作业通常会周期性地触发,将数据从事务型数据库拷贝到分析型数据库或数据仓库。

数据管道和 ETL 作业的用途相似,都可以转换、丰富数据,并将其从某个存储系统移动到另一个。但数据管道是以持续流模式运行,而非周期性触发。因此它支持从一个不断生成数据的源头读取记录,并将它们以低延迟移动到终点。例如:数据管道可以用来监控文件系统目录中的新文件,并将其数据写入事件日志;另一个应用可能会将事件流物化到数据库或增量构建和优化查询索引。

下图描述了周期性 ETL 作业和持续数据管道的差异。

img

数据管道的优势?

和周期性 ETL 作业相比,持续数据管道可以明显降低将数据移动到目的端的延迟。此外,由于它能够持续消费和发送数据,因此用途更广,支持用例更多。

很多常见的数据转换和增强操作可以利用 Flink 的 SQL 接口(或 Table API)及用户自定义函数解决。如果数据管道有更高级的需求,可以选择更通用的 DataStream API 来实现。Flink 为多种数据存储系统(如:Kafka、Kinesis、Elasticsearch、JDBC 数据库系统等)内置了连接器。同时它还提供了文件系统的连续型数据源及数据汇,可用来监控目录变化和以时间分区的方式写入文件。

典型的数据管道应用实例

参考资料

JVM 类加载

img

类加载机制

类是在运行期间动态加载的。

类的加载指的是将类的 .class 文件中的二进制数据读入到内存中,将其放在运行时数据区的方法区内,然后在堆区创建一个java.lang.Class对象,用来封装类在方法区内的数据结构。类的加载的最终产品是位于堆区中的Class对象,Class对象封装了类在方法区内的数据结构,并且向 Java 程序员提供了访问方法区内的数据结构的接口。

类加载器并不需要等到某个类被“首次主动使用”时再加载它,JVM 规范允许类加载器在预料某个类将要被使用时就预先加载它,如果在预先加载的过程中遇到了.class 文件缺失或存在错误,类加载器必须在程序首次主动使用该类时才报告错误(LinkageError 错误)如果这个类一直没有被程序主动使用,那么类加载器就不会报告错误

类的生命周期

img

Java 类的完整生命周期包括以下几个阶段:

  • 加载(Loading)
  • 链接(Linking)
    • 验证(Verification)
    • 准备(Preparation)
    • 解析(Resolution)
  • 初始化(Initialization)
  • 使用(Using)
  • 卸载(Unloading)

加载、验证、准备、初始化和卸载这 5 个阶段的顺序是确定的,类的加载过程必须按照这种顺序按部就班地开始。而解析过程在某些情况下可以在初始化阶段之后再开始,这是为了支持 Java 的动态绑定

类加载过程是指加载、验证、准备、解析和初始化这 5 个阶段。

(一)加载

加载是类加载的一个阶段,注意不要混淆。

加载,是指查找字节流,并且据此创建类的过程

加载过程完成以下三件事:

  • 通过一个类的全限定名来获取定义此类的二进制字节流。
  • 将这个字节流所代表的静态存储结构转化为方法区的运行时存储结构。
  • 在内存中生成一个代表这个类的 Class 对象,作为方法区这个类的各种数据的访问入口。

其中二进制字节流可以从以下方式中获取:

  • 从 ZIP 包读取,这很常见,最终成为日后 JAR、EAR、WAR 格式的基础。
  • 从网络中获取,这种场景最典型的应用是 Applet。
  • 运行时计算生成,这种场景使用得最多得就是动态代理技术,在 java.lang.reflect.Proxy 中,就是用了 ProxyGenerator.generateProxyClass 的代理类的二进制字节流。
  • 由其他文件生成,典型场景是 JSP 应用,即由 JSP 文件生成对应的 Class 类。
  • 从数据库读取,这种场景相对少见,例如有些中间件服务器(如 SAP Netweaver)可以选择把程序安装到数据库中来完成程序代码在集群间的分发。

更详细内容会在 3. ClassLoader 介绍。

(二)验证

验证是链接阶段的第一步。验证的目标是确保 Class 文件的字节流中包含的信息符合当前虚拟机的要求,并且不会危害虚拟机自身的安全。

验证阶段大致会完成 4 个阶段的检验动作:

  • 文件格式验证 - 验证字节流是否符合 Class 文件格式的规范,并且能被当前版本的虚拟机处理。
  • 元数据验证 - 对字节码描述的信息进行语义分析,以保证其描述的信息符合 Java 语言规范的要求。
  • 字节码验证 - 通过数据流和控制流分析,确保程序语义是合法、符合逻辑的。
  • 符号引用验证 - 发生在虚拟机将符号引用转换为直接引用的时候,对类自身以外(常量池中的各种符号引用)的信息进行匹配性校验。

验证阶段是非常重要的,但不是必须的,它对程序运行期没有影响,如果所引用的类经过反复验证,那么可以考虑采用 -Xverifynone 参数来关闭大部分的类验证措施,以缩短虚拟机类加载的时间。

(三)准备

类变量是被 static 修饰的变量,准备阶段为 static 变量在方法区分配内存并初始化为默认值,使用的是方法区的内存。

实例变量不会在这阶段分配内存,它将会在对象实例化时随着对象一起分配在 Java 堆中。(实例化不是类加载的一个过程,类加载发生在所有实例化操作之前,并且类加载只进行一次,实例化可以进行多次)

初始值一般为 0 值,例如下面的类变量 value 被初始化为 0 而不是 123。

1
public static int value = 123;

如果类变量是常量,那么会按照表达式来进行初始化,而不是赋值为 0。

1
public static final int value = 123;

准备阶段有以下几点需要注意:

  • 这时候进行内存分配的仅包括类变量(static),而不包括实例变量,实例变量会在对象实例化时随着对象一块分配在 Java 堆中。
  • 这里所设置的初始值通常情况下是数据类型默认的零值(如 00Lnullfalse 等),而不是被在 Java 代码中被显式地赋予的值。

假设一个类变量的定义为:public static int value = 3

那么变量 value 在准备阶段过后的初始值为 0,而不是 3,因为这时候尚未开始执行任何 Java 方法,而把 value 赋值为 3 的public static指令是在程序编译后,存放于类构造器()方法之中的,所以把 value 赋值为 3 的动作将在初始化阶段才会执行。

这里还需要注意如下几点:

  • 对基本数据类型来说,对于类变量(static)和全局变量,如果不显式地对其赋值而直接使用,则系统会为其赋予默认的零值,而对于局部变量来说,在使用前必须显式地为其赋值,否则编译时不通过。
  • 对于同时被 static 和 final 修饰的常量,必须在声明的时候就为其显式地赋值,否则编译时不通过;而只被 final 修饰的常量则既可以在声明时显式地为其赋值,也可以在类初始化时显式地为其赋值,总之,在使用前必须为其显式地赋值,系统不会为其赋予默认零值。
  • 对于引用数据类型 reference 来说,如数组引用、对象引用等,如果没有对其进行显式地赋值而直接使用,系统都会为其赋予默认的零值,即 null。
  • 如果在数组初始化时没有对数组中的各元素赋值,那么其中的元素将根据对应的数据类型而被赋予默认的零值。
  • 如果类字段的字段属性表中存在ConstantValue属性,即同时被 final 和 static 修饰,那么在准备阶段变量 value 就会被初始化为 ConstValue 属性所指定的值。

假设上面的类变量 value 被定义为: public static final int value = 3

编译时 Javac 将会为 value 生成 ConstantValue 属性,在准备阶段虚拟机就会根据ConstantValue的设置将 value 赋值为 3。我们可以理解为 static final 常量在编译期就将其结果放入了调用它的类的常量池中

(四)解析

在 class 文件被加载至 Java 虚拟机之前,这个类无法知道其他类及其方法、字段所对应的具体地址,甚至不知道自己方法、字段的地址。因此,每当需要引用这些成员时,Java 编译器会生成一个符号引用。在运行阶段,这个符号引用一般都能够无歧义地定位到具体目标上。

举例来说,对于一个方法调用,编译器会生成一个包含目标方法所在类的名字、目标方法的名字、接收参数类型以及返回值类型的符号引用,来指代所要调用的方法。

解析阶段目标是将常量池的符号引用替换为直接引用的过程。解析动作主要针对类或接口、字段、类方法、接口方法、方法类型、方法句柄和调用点限定符 7 类符号引用进行。

  • 符号引用(Symbolic References) - 符号引用以一组符号来描述所引用的目标,符号可以是任何形式的字面量,只要使用时能无歧义地定位到目标即可。
  • 直接引用(Direct Reference) - 直接引用可以是直接指向目标的指针、相对偏移量或是一个能间接定位到目标的句柄。

(五)初始化

在 Java 代码中,如果要初始化一个静态字段,可以在声明时直接赋值,也可以在静态代码块中对其赋值。

如果直接赋值的静态字段被 final 所修饰,并且它的类型是基本类型或字符串时,那么该字段便会被 Java 编译器标记成常量值(ConstantValue),其初始化直接由 Java 虚拟机完成。除此之外的直接赋值操作,以及所有静态代码块中的代码,则会被 Java 编译器置于同一方法中,并把它命名为 < clinit >

初始化阶段才真正开始执行类中的定义的 Java 程序代码。初始化,为类的静态变量赋予正确的初始值,JVM 负责对类进行初始化,主要对类变量进行初始化

类初始化方式

  • 声明类变量时指定初始值
  • 使用静态代码块为类变量指定初始值

在准备阶段,类变量已经赋过一次系统要求的初始值,而在初始化阶段,根据程序员通过程序制定的主观计划去初始化类变量和其它资源。

类初始化步骤

  1. 如果类还没有被加载和链接,开始加载该类。
  2. 如果该类的直接父类还没有被初始化,先初始化其父类。
  3. 如果该类有初始化语句,则依次执行这些初始化语句。

类初始化时机

只有主动引用类的时候才会导致类的初始化。

(1)主动引用

类的主动引用包括以下六种:

  • 创建类的实例 - 也就是 new 对象
  • 访问静态变量 - 访问某个类或接口的静态变量,或者对该静态变量赋值
  • 访问静态方法
  • 反射 - 如Class.forName(“com.shengsiyuan.Test”)
  • 初始化子类 - 初始化某个类的子类,则其父类也会被初始化
  • 启动类 - Java 虚拟机启动时被标明为启动类的类(Java Test),直接使用java.exe命令来运行某个主类

(2)被动引用

以上 5 种场景中的行为称为对一个类进行主动引用。除此之外,所有引用类的方式都不会触发初始化,称为被动引用。被动引用的常见例子包括:

  • 通过子类引用父类的静态字段,不会导致子类初始化
1
System.out.println(SubClass.value); // value 字段在 SuperClass 中定义
  • 通过数组定义来引用类,不会触发此类的初始化。该过程会对数组类进行初始化,数组类是一个由虚拟机自动生成的、直接继承自 Object 的子类,其中包含了数组的属性和方法。
1
SuperClass[] sca = new SuperClass[10];
  • 常量在编译阶段会存入调用类的常量池中,本质上并没有直接引用到定义常量的类,因此不会触发定义常量的类的初始化
1
System.out.println(ConstClass.HELLOWORLD);

类初始化细节

类初始化 <clinit>() 方法的细节:

  • 是由编译器自动收集类中所有类变量的赋值动作和静态语句块(static{} 块)中的语句合并产生的,编译器收集的顺序由语句在源文件中出现的顺序决定。特别注意的是,静态语句块只能访问到定义在它之前的类变量,定义在它之后的类变量只能赋值,不能访问。例如以下代码:
1
2
3
4
5
6
7
public class Test {
static {
i = 0; // 给变量赋值可以正常编译通过
System.out.print(i); // 这句编译器会提示“非法向前引用”
}
static int i = 1;
}
  • 与类的构造函数(或者说实例构造器 <init>())不同,不需要显式的调用父类的构造器。虚拟机会自动保证在子类的 <clinit>() 方法运行之前,父类的 <clinit>() 方法已经执行结束。因此虚拟机中第一个执行 <clinit>() 方法的类肯定为 java.lang.Object
  • 由于父类的 <clinit>() 方法先执行,也就意味着父类中定义的静态语句块要优于子类的变量赋值操作。例如以下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
static class Parent {
public static int A = 1;
static {
A = 2;
}
}

static class Sub extends Parent {
public static int B = A;
}

public static void main(String[] args) {
System.out.println(Sub.B); // 输出结果是父类中的静态变量 A 的值,也就是 2。
}
  • <clinit>() 方法对于类或接口不是必须的,如果一个类中不包含静态语句块,也没有对类变量的赋值操作,编译器可以不为该类生成 <clinit>() 方法。
  • 接口中不可以使用静态语句块,但仍然有类变量初始化的赋值操作,因此接口与类一样都会生成 <clinit>() 方法。但接口与类不同的是,执行接口的 <clinit>() 方法不需要先执行父接口的 <clinit>() 方法。只有当父接口中定义的变量使用时,父接口才会初始化。另外,接口的实现类在初始化时也一样不会执行接口的 <clinit>() 方法。
  • 虚拟机会保证一个类的 <clinit>() 方法在多线程环境下被正确的加锁和同步,如果多个线程同时初始化一个类,只会有一个线程执行这个类的 <clinit>() 方法,其它线程都会阻塞等待,直到活动线程执行 <clinit>() 方法完毕。如果在一个类的 <clinit>() 方法中有耗时的操作,就可能造成多个线程阻塞,在实际过程中此种阻塞很隐蔽。

ClassLoader

ClassLoader 即类加载器,负责将类加载到 JVM。在 Java 虚拟机外部实现,以便让应用程序自己决定如何去获取所需要的类。

JVM 加载 class 文件到内存有两种方式:

  • 隐式加载 - JVM 自动加载需要的类到内存中。
  • 显示加载 - 通过使用 ClassLoader 来加载一个类到内存中。

类与类加载器

如何判断两个类是否相等:类本身相等,并且使用同一个类加载器进行加载。这是因为每一个 ClassLoader 都拥有一个独立的类名称空间

这里的相等,包括类的 Class 对象的 equals() 方法、isAssignableFrom() 方法、isInstance() 方法的返回结果为 true,也包括使用 instanceof 关键字做对象所属关系判定结果为 true。

类加载器分类

img

Bootstrap ClassLoader

Bootstrap ClassLoader ,即启动类加载器 ,负责加载 JVM 自身工作所需要的类

Bootstrap ClassLoader 会将存放在 <JAVA_HOME>\lib 目录中的,或者被 -Xbootclasspath 参数所指定的路径中的,并且是虚拟机识别的(仅按照文件名识别,如 rt.jar,名字不符合的类库即使放在 lib 目录中也不会被加载)类库加载到虚拟机内存中

Bootstrap ClassLoader 是由 C++ 实现的,它完全由 JVM 自己控制的,启动类加载器无法被 Java 程序直接引用,用户在编写自定义类加载器时,如果需要把加载请求委派给启动类加载器,直接使用 null 代替即可。

ExtClassLoader

ExtClassLoader,即扩展类加载器,这个类加载器是由 ExtClassLoader(sun.misc.Launcher\$ExtClassLoader)实现的。

ExtClassLoader 负责将 <JAVA_HOME>\lib\ext 或者被 java.ext.dir 系统变量所指定路径中的所有类库加载到内存中,开发者可以直接使用扩展类加载器

AppClassLoader

AppClassLoader,即应用程序类加载器,这个类加载器是由 AppClassLoader(sun.misc.Launcher\$AppClassLoader) 实现的。由于这个类加载器是 ClassLoader 中的 getSystemClassLoader() 方法的返回值,因此一般称为系统类加载器。

AppClassLoader 负责加载用户类路径(即 classpath)上所指定的类库,开发者可以直接使用这个类加载器,如果应用程序中没有自定义过自己的类加载器,一般情况下这个就是程序中默认的类加载器。

自定义类加载器

自定义类加载器可以做到如下几点:

  • 在执行非置信代码之前,自动验证数字签名。
  • 动态地创建符合用户特定需要的定制化构建类。
  • 从特定的场所取得 java class,例如数据库中和网络中。

假设,我们需要自定义一个名为 FileSystemClassLoader 的类加载器,继承自 java.lang.ClassLoader,用于加载文件系统上的类。它首先根据类的全名在文件系统上查找类的字节代码文件(.class 文件),然后读取该文件内容,最后通过 defineClass() 方法来把这些字节代码转换成 java.lang.Class 类的实例。

java.lang.ClassLoader 类的方法 loadClass() 实现了双亲委派模型的逻辑,因此自定义类加载器一般不去覆写它,而是通过覆写 findClass() 方法。

ClassLoader 常用的场景:

  • 容器 - 典型应用:Servlet 容器(如:Tomcat、Jetty)、udf (Mysql、Hive)等。加载解压 jar 包或 war 包后,加载其 Class 到指定的类加载器中运行(通常需要考虑空间隔离)。
  • 热部署、热插拔 - 应用启动后,动态获得某个类信息,然后加载到 JVM 中工作。很多著名的容器软件、框架(如:Spring 等),都使用 ClassLoader 来实现自身的热部署。

【示例】自定义一个类加载器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class FileSystemClassLoader extends ClassLoader {

private String rootDir;

public FileSystemClassLoader(String rootDir) {
this.rootDir = rootDir;
}

protected Class<?> findClass(String name) throws ClassNotFoundException {
byte[] classData = getClassData(name);
if (classData == null) {
throw new ClassNotFoundException();
} else {
return defineClass(name, classData, 0, classData.length);
}
}

private byte[] getClassData(String className) {
String path = classNameToPath(className);
try {
InputStream ins = new FileInputStream(path);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
int bufferSize = 4096;
byte[] buffer = new byte[bufferSize];
int bytesNumRead;
while ((bytesNumRead = ins.read(buffer)) != -1) {
baos.write(buffer, 0, bytesNumRead);
}
return baos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

private String classNameToPath(String className) {
return rootDir + File.separatorChar
+ className.replace('.', File.separatorChar) + ".class";
}
}

双亲委派

理解双亲委派之前,先让我们看一个示例。

【示例】寻找类加载示例

1
2
3
4
5
6
public static void main(String[] args) {
ClassLoader loader = Thread.currentThread().getContextClassLoader();
System.out.println(loader);
System.out.println(loader.getParent());
System.out.println(loader.getParent().getParent());
}

输出:

1
2
3
sun.misc.Launcher$AppClassLoader@18b4aac2
sun.misc.Launcher$ExtClassLoader@19e1023e
null

从上面的结果可以看出,并没有获取到 ExtClassLoader 的父 Loader,原因是 Bootstrap Loader(引导类加载器)是用 C 语言实现的,找不到一个确定的返回父 Loader 的方式,于是就返回 null。

下图展示的类加载器之间的层次关系,称为类加载器的双亲委派模型(Parents Delegation Model)该模型要求除了顶层的 Bootstrap ClassLoader 外,其余的类加载器都应有自己的父类加载器这里类加载器之间的父子关系一般通过组合(Composition)关系来实现,而不是通过继承(Inheritance)的关系实现

(1)工作过程

一个类加载器首先将类加载请求传送到父类加载器,只有当父类加载器无法完成类加载请求时才尝试加载

(2)好处

使得 Java 类随着它的类加载器一起具有一种带有优先级的层次关系,从而使得基础类得到统一:

  • 系统类防止内存中出现多份同样的字节码
  • 保证 Java 程序安全稳定运行

例如: java.lang.Object 存放在 rt.jar 中,如果编写另外一个 java.lang.Object 的类并放到 classpath 中,程序可以编译通过。因为双亲委派模型的存在,所以在 rt.jar 中的 Object 比在 classpath 中的 Object 优先级更高,因为 rt.jar 中的 Object 使用的是启动类加载器,而 classpath 中的 Object 使用的是应用程序类加载器。正因为 rt.jar 中的 Object 优先级更高,因为程序中所有的 Object 都是这个 Object

(3)实现

以下是抽象类 java.lang.ClassLoader 的代码片段,其中的 loadClass() 方法运行过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public abstract class ClassLoader {
// The parent class loader for delegation
private final ClassLoader parent;

public Class<?> loadClass(String name) throws ClassNotFoundException {
return loadClass(name, false);
}

protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
synchronized (getClassLoadingLock(name)) {
// 首先判断该类型是否已经被加载
Class<?> c = findLoadedClass(name);
if (c == null) {
// 如果没有被加载,就委托给父类加载或者委派给启动类加载器加载
try {
if (parent != null) {
// 如果存在父类加载器,就委派给父类加载器加载
c = parent.loadClass(name, false);
} else {
// 如果不存在父类加载器,就检查是否是由启动类加载器加载的类,通过调用本地方法native Class findBootstrapClass(String name)
c = findBootstrapClassOrNull(name);
}
} catch (ClassNotFoundException e) {
// 如果父类加载器加载失败,会抛出 ClassNotFoundException
}

if (c == null) {
// 如果父类加载器和启动类加载器都不能完成加载任务,才调用自身的加载功能
c = findClass(name);
}
}
if (resolve) {
resolveClass(c);
}
return c;
}
}

protected Class<?> findClass(String name) throws ClassNotFoundException {
throw new ClassNotFoundException(name);
}
}

【说明】

  • 先检查类是否已经加载过,如果没有则让父类加载器去加载。
  • 当父类加载器加载失败时抛出 ClassNotFoundException,此时尝试自己去加载。

ClassLoader 参数

在生产环境上启动 java 应用时,通常会指定一些 ClassLoader 参数,以加载应用所需要的 lib:

1
java -jar xxx.jar -classpath lib/*

ClassLoader 相关参数选项:

参数选项 ClassLoader 类型 说明
-Xbootclasspath Bootstrap ClassLoader 设置 Bootstrap ClassLoader 搜索路径。【不常用】
-Xbootclasspath/a Bootstrap ClassLoader 把路径添加到已存在的 Bootstrap ClassLoader 搜索路径后面。【常用】
-Xbootclasspath/p Bootstrap ClassLoader 把路径添加到已存在的 Bootstrap ClassLoader 搜索路径前面。【不常用】
-Djava.ext.dirs ExtClassLoader 设置 ExtClassLoader 搜索路径。
-Djava.class.path-cp-classpath AppClassLoader 设置 AppClassLoader 搜索路径。

类的加载

类加载方式

类加载有三种方式:

  • 命令行启动应用时候由 JVM 初始化加载
  • 通过 Class.forName() 方法动态加载
  • 通过 ClassLoader.loadClass() 方法动态加载

Class.forName()ClassLoader.loadClass() 区别

  • Class.forName() 将类的 .class 文件加载到 jvm 中之外,还会对类进行解释,执行类中的 static 块;
  • ClassLoader.loadClass() 只干一件事情,就是将 .class 文件加载到 jvm 中,不会执行 static 中的内容,只有在 newInstance 才会去执行 static 块。
  • Class.forName(name, initialize, loader) 带参函数也可控制是否加载 static 块。并且只有调用了 newInstance() 方法采用调用构造函数,创建类的对象 。

加载类错误

ClassNotFoundException

ClassNotFoundException 异常出镜率极高。**ClassNotFoundException 表示当前 classpath 下找不到指定类**。

常见问题原因:

  • 调用 ClassforName() 方法,未找到类。
  • 调用 ClassLoader 中的 loadClass() 方法,未找到类。
  • 调用 ClassLoader 中的 findSystemClass() 方法,未找到类。

【示例】执行以下代码,会抛出 ClassNotFoundException 异常:

1
2
3
4
5
6
7
8
9
public class ClassNotFoundExceptionDemo {
public static void main(String[] args) {
try {
Class.forName("NotFound");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}

解决方法:检查 classpath 下有没有相应的 class 文件。

NoClassDefFoundError

常见问题原因:

  • 类依赖的 Class 或者 jar 不存在。
  • 类文件存在,但是存在不同的域中。

解决方法:现代 Java 项目,一般使用 mavengradle 等构建工具管理项目,仔细检查找不到的类所在的 jar 包是否已添加为依赖。

UnsatisfiedLinkError

这个异常倒不是很常见,但是出错的话,通常是在 JVM 启动的时候如果一不小心将在 JVM 中的某个 lib 删除了,可能就会报这个错误了。

【示例】执行以下代码,会抛出 UnsatisfiedLinkError 错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class UnsatisfiedLinkErrorDemo {

public native void nativeMethod();

static {
System.loadLibrary("NoLib");
}

public static void main(String[] args) {
new UnsatisfiedLinkErrorDemo().nativeMethod();
}

}

【输出】

1
2
3
4
5
java.lang.UnsatisfiedLinkError: no NoLib in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at io.github.dunwu.javacore.jvm.classloader.exception.UnsatisfiedLinkErrorDemo.<clinit>(UnsatisfiedLinkErrorDemo.java:12)

ClassCastException

ClassCastException 异常通常是在程序中强制类型转换失败时出现。

【示例】执行以下代码,会抛出 ClassCastException 异常。

1
2
3
4
5
6
7
8
9
10
public class ClassCastExceptionDemo {

public static void main(String[] args) {
Object obj = new Object();
EmptyClass newObj = (EmptyClass) obj;
}

static class EmptyClass {}

}

【输出】

1
2
Exception in thread "main" java.lang.ClassCastException: java.lang.Object cannot be cast to io.github.dunwu.javacore.jvm.classloader.exception.ClassCastExceptionDemo$EmptyClass
at io.github.dunwu.javacore.jvm.classloader.exception.ClassCastExceptionDemo.main(ClassCastExceptionDemo.java:11)

参考资料

Elasticsearch 快速入门

Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。 作为 Elastic Stack 的核心,它集中存储您的数据,帮助您发现意料之中以及意料之外的情况。

Elasticsearch 基于搜索库 Lucene 开发。ElasticSearch 隐藏了 Lucene 的复杂性,提供了简单易用的 REST API / Java API 接口(另外还有其他语言的 API 接口)。

_以下简称 ES_。

Elasticsearch 简介

什么是 Elasticsearch

Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。 作为 Elastic Stack 的核心,它集中存储您的数据,帮助您发现意料之中以及意料之外的情况。

Elasticsearch 基于搜索库 Lucene 开发。ElasticSearch 隐藏了 Lucene 的复杂性,提供了简单易用的 REST API / Java API 接口(另外还有其他语言的 API 接口)。

ElasticSearch 可以视为一个文档存储,它将复杂数据结构序列化为 JSON 存储

ElasticSearch 是近乎于实时的全文搜素,这是指:

  • 从写入数据到数据可以被搜索,存在较小的延迟(大概是 1s)
  • 基于 ES 执行搜索和分析可以达到秒级

核心概念

1
index -> type -> mapping -> document -> field

Cluster

集群包含多个节点,每个节点属于哪个集群都是通过一个配置来决定的,对于中小型应用来说,刚开始一个集群就一个节点很正常。

Node

Node 是集群中的一个节点,节点也有一个名称,默认是随机分配的。默认节点会去加入一个名称为 elasticsearch 的集群。如果直接启动一堆节点,那么它们会自动组成一个 elasticsearch 集群,当然一个节点也可以组成 elasticsearch 集群。

Index

可以认为是文档(document)的优化集合。

ES 会为所有字段建立索引,经过处理后写入一个反向索引(Inverted Index)。查找数据的时候,直接查找该索引。

所以,ES 数据管理的顶层单位就叫做 Index(索引)。它是单个数据库的同义词。每个 Index (即数据库)的名字必须是小写。

Type

每个索引里可以有一个或者多个类型(type)。类型(type) 是 index 的一个逻辑分类。

不同的 Type 应该有相似的结构(schema),举例来说,id字段不能在这个组是字符串,在另一个组是数值。这是与关系型数据库的表的一个区别。性质完全不同的数据(比如productslogs)应该存成两个 Index,而不是一个 Index 里面的两个 Type(虽然可以做到)。

注意:根据规划,Elastic 6.x 版只允许每个 Index 包含一个 Type,7.x 版将会彻底移除 Type。

Document

Index 里面单条的记录称为 Document(文档)。许多条 Document 构成了一个 Index。

每个 文档(document) 都是字段(field)的集合。

Document 使用 JSON 格式表示,下面是一个例子。

1
2
3
4
5
{
"user": "张三",
"title": "工程师",
"desc": "数据库管理"
}

同一个 Index 里面的 Document,不要求有相同的结构(scheme),但是最好保持相同,这样有利于提高搜索效率。

Field

字段(field) 是包含数据的键值对。

默认情况下,Elasticsearch 对每个字段中的所有数据建立索引,并且每个索引字段都具有专用的优化数据结构。

Shard

当单台机器不足以存储大量数据时,Elasticsearch 可以将一个索引中的数据切分为多个 分片(shard)分片(shard) 分布在多台服务器上存储。有了 shard 就可以横向扩展,存储更多数据,让搜索和分析等操作分布到多台服务器上去执行,提升吞吐量和性能。每个 shard 都是一个 lucene index。

Replica

任何一个服务器随时可能故障或宕机,此时 shard 可能就会丢失,因此可以为每个 shard 创建多个 **副本(replica)**。replica 可以在 shard 故障时提供备用服务,保证数据不丢失,多个 replica 还可以提升搜索操作的吞吐量和性能。primary shard(建立索引时一次设置,不能修改,默认 5 个),replica shard(随时修改数量,默认 1 个),默认每个索引 10 个 shard,5 个 primary shard,5 个 replica shard,最小的高可用配置,是 2 台服务器。

ES 核心概念 vs. DB 核心概念

ES DB
index 数据库
type 数据表
docuemnt 一行数据

ElasticSearch 基本原理

ES 写数据过程

  • 客户端选择一个 node 发送请求过去,这个 node 就是 coordinating node(协调节点)。
  • coordinating node 对 document 进行路由,将请求转发给对应的 node(有 primary shard)。
  • 实际的 node 上的 primary shard 处理请求,然后将数据同步到 replica node
  • coordinating node 如果发现 primary node 和所有 replica node 都搞定之后,就返回响应结果给客户端。

img

ES 读数据过程

可以通过 doc id 来查询,会根据 doc id 进行 hash,判断出来当时把 doc id 分配到了哪个 shard 上面去,从那个 shard 去查询。

  • 客户端发送请求到任意一个 node,成为 coordinate node
  • coordinate nodedoc id 进行哈希路由,将请求转发到对应的 node,此时会使用 round-robin 轮询算法,在 primary shard 以及其所有 replica 中随机选择一个,让读请求负载均衡。
  • 接收请求的 node 返回 document 给 coordinate node
  • coordinate node 返回 document 给客户端。

es 搜索数据过程

es 最强大的是做全文检索,就是比如你有三条数据:

1
2
3
java真好玩儿啊
java好难学啊
j2ee特别牛

你根据 java 关键词来搜索,将包含 javadocument 给搜索出来。es 就会给你返回:java 真好玩儿啊,java 好难学啊。

  • 客户端发送请求到一个 coordinate node
  • 协调节点将搜索请求转发到所有的 shard 对应的 primary shardreplica shard ,都可以。
  • query phase:每个 shard 将自己的搜索结果(其实就是一些 doc id )返回给协调节点,由协调节点进行数据的合并、排序、分页等操作,产出最终结果。
  • fetch phase:接着由协调节点根据 doc id 去各个节点上拉取实际document 数据,最终返回给客户端。

写请求是写入 primary shard,然后同步给所有的 replica shard;读请求可以从 primary shard 或 replica shard 读取,采用的是随机轮询算法。

写数据底层原理

es-write-detail

先写入内存 buffer,在 buffer 里的时候数据是搜索不到的;同时将数据写入 translog 日志文件。

如果 buffer 快满了,或者到一定时间,就会将内存 buffer 数据 refresh 到一个新的 segment file 中,但是此时数据不是直接进入 segment file 磁盘文件,而是先进入 os cache 。这个过程就是 refresh

每隔 1 秒钟,es 将 buffer 中的数据写入一个新的 segment file,每秒钟会产生一个新的磁盘文件 segment file,这个 segment file 中就存储最近 1 秒内 buffer 中写入的数据。

但是如果 buffer 里面此时没有数据,那当然不会执行 refresh 操作,如果 buffer 里面有数据,默认 1 秒钟执行一次 refresh 操作,刷入一个新的 segment file 中。

操作系统里面,磁盘文件其实都有一个东西,叫做 os cache,即操作系统缓存,就是说数据写入磁盘文件之前,会先进入 os cache,先进入操作系统级别的一个内存缓存中去。只要 buffer 中的数据被 refresh 操作刷入 os cache中,这个数据就可以被搜索到了。

为什么叫 es 是准实时的? NRT,全称 near real-time。默认是每隔 1 秒 refresh 一次的,所以 es 是准实时的,因为写入的数据 1 秒之后才能被看到。可以通过 es 的 restful api 或者 java api手动执行一次 refresh 操作,就是手动将 buffer 中的数据刷入 os cache中,让数据立马就可以被搜索到。只要数据被输入 os cache 中,buffer 就会被清空了,因为不需要保留 buffer 了,数据在 translog 里面已经持久化到磁盘去一份了。

重复上面的步骤,新的数据不断进入 buffer 和 translog,不断将 buffer 数据写入一个又一个新的 segment file 中去,每次 refresh 完 buffer 清空,translog 保留。随着这个过程推进,translog 会变得越来越大。当 translog 达到一定长度的时候,就会触发 commit 操作。

commit 操作发生第一步,就是将 buffer 中现有数据 refreshos cache 中去,清空 buffer。然后,将一个 commit point 写入磁盘文件,里面标识着这个 commit point 对应的所有 segment file,同时强行将 os cache 中目前所有的数据都 fsync 到磁盘文件中去。最后清空 现有 translog 日志文件,重启一个 translog,此时 commit 操作完成。

这个 commit 操作叫做 flush。默认 30 分钟自动执行一次 flush,但如果 translog 过大,也会触发 flush。flush 操作就对应着 commit 的全过程,我们可以通过 es api,手动执行 flush 操作,手动将 os cache 中的数据 fsync 强刷到磁盘上去。

translog 日志文件的作用是什么?你执行 commit 操作之前,数据要么是停留在 buffer 中,要么是停留在 os cache 中,无论是 buffer 还是 os cache 都是内存,一旦这台机器死了,内存中的数据就全丢了。所以需要将数据对应的操作写入一个专门的日志文件 translog 中,一旦此时机器宕机,再次重启的时候,es 会自动读取 translog 日志文件中的数据,恢复到内存 buffer 和 os cache 中去。

translog 其实也是先写入 os cache 的,默认每隔 5 秒刷一次到磁盘中去,所以默认情况下,可能有 5 秒的数据会仅仅停留在 buffer 或者 translog 文件的 os cache 中,如果此时机器挂了,会丢失 5 秒钟的数据。但是这样性能比较好,最多丢 5 秒的数据。也可以将 translog 设置成每次写操作必须是直接 fsync 到磁盘,但是性能会差很多。

实际上你在这里,如果面试官没有问你 es 丢数据的问题,你可以在这里给面试官炫一把,你说,其实 es 第一是准实时的,数据写入 1 秒后可以搜索到;可能会丢失数据的。有 5 秒的数据,停留在 buffer、translog os cache、segment file os cache 中,而不在磁盘上,此时如果宕机,会导致 5 秒的数据丢失

总结一下,数据先写入内存 buffer,然后每隔 1s,将数据 refresh 到 os cache,到了 os cache 数据就能被搜索到(所以我们才说 es 从写入到能被搜索到,中间有 1s 的延迟)。每隔 5s,将数据写入 translog 文件(这样如果机器宕机,内存数据全没,最多会有 5s 的数据丢失),translog 大到一定程度,或者默认每隔 30mins,会触发 commit 操作,将缓冲区的数据都 flush 到 segment file 磁盘文件中。

数据写入 segment file 之后,同时就建立好了倒排索引。

删除/更新数据底层原理

如果是删除操作,commit 的时候会生成一个 .del 文件,里面将某个 doc 标识为 deleted 状态,那么搜索的时候根据 .del 文件就知道这个 doc 是否被删除了。

如果是更新操作,就是将原来的 doc 标识为 deleted 状态,然后新写入一条数据。

buffer 每 refresh 一次,就会产生一个 segment file,所以默认情况下是 1 秒钟一个 segment file,这样下来 segment file 会越来越多,此时会定期执行 merge。每次 merge 的时候,会将多个 segment file 合并成一个,同时这里会将标识为 deleted 的 doc 给物理删除掉,然后将新的 segment file 写入磁盘,这里会写一个 commit point,标识所有新的 segment file,然后打开 segment file 供搜索使用,同时删除旧的 segment file

底层 lucene

简单来说,lucene 就是一个 jar 包,里面包含了封装好的各种建立倒排索引的算法代码。我们用 Java 开发的时候,引入 lucene jar,然后基于 lucene 的 api 去开发就可以了。

通过 lucene,我们可以将已有的数据建立索引,lucene 会在本地磁盘上面,给我们组织索引的数据结构。

倒排索引

在搜索引擎中,每个文档都有一个对应的文档 ID,文档内容被表示为一系列关键词的集合。例如,文档 1 经过分词,提取了 20 个关键词,每个关键词都会记录它在文档中出现的次数和出现位置。

那么,倒排索引就是关键词到文档 ID 的映射,每个关键词都对应着一系列的文件,这些文件中都出现了关键词。

举个栗子。

有以下文档:

DocId Doc
1 谷歌地图之父跳槽 Facebook
2 谷歌地图之父加盟 Facebook
3 谷歌地图创始人拉斯离开谷歌加盟 Facebook
4 谷歌地图之父跳槽 Facebook 与 Wave 项目取消有关
5 谷歌地图之父拉斯加盟社交网站 Facebook

对文档进行分词之后,得到以下倒排索引

WordId Word DocIds
1 谷歌 1,2,3,4,5
2 地图 1,2,3,4,5
3 之父 1,2,4,5
4 跳槽 1,4
5 Facebook 1,2,3,4,5
6 加盟 2,3,5
7 创始人 3
8 拉斯 3,5
9 离开 3
10 4
.. .. ..

另外,实用的倒排索引还可以记录更多的信息,比如文档频率信息,表示在文档集合中有多少个文档包含某个单词。

那么,有了倒排索引,搜索引擎可以很方便地响应用户的查询。比如用户输入查询 Facebook,搜索系统查找倒排索引,从中读出包含这个单词的文档,这些文档就是提供给用户的搜索结果。

要注意倒排索引的两个重要细节:

  • 倒排索引中的所有词项对应一个或多个文档;
  • 倒排索引中的词项根据字典顺序升序排列

上面只是一个简单的栗子,并没有严格按照字典顺序升序排列。

参考资料

Elasticsearch 面试总结

集群部署

ES 部署情况:

5 节点(配置:8 核 64 G 1T),总计 320 G,5 T。

约 10+ 索引,5 分片,每日新增数据量约为 2G,4000w 条。记录保存 30 天。

性能优化

filesystem cache

你往 es 里写的数据,实际上都写到磁盘文件里去了,查询的时候,操作系统会将磁盘文件里的数据自动缓存到 filesystem cache 里面去。

es-search-process

es 的搜索引擎严重依赖于底层的 filesystem cache ,你如果给 filesystem cache 更多的内存,尽量让内存可以容纳所有的 idx segment file索引数据文件,那么你搜索的时候就基本都是走内存的,性能会非常高。

性能差距究竟可以有多大?我们之前很多的测试和压测,如果走磁盘一般肯定上秒,搜索性能绝对是秒级别的,1 秒、5 秒、10 秒。但如果是走 filesystem cache ,是走纯内存的,那么一般来说性能比走磁盘要高一个数量级,基本上就是毫秒级的,从几毫秒到几百毫秒不等。

这里有个真实的案例。某个公司 es 节点有 3 台机器,每台机器看起来内存很多,64G,总内存就是 64 * 3 = 192G 。每台机器给 es jvm heap 是 32G ,那么剩下来留给 filesystem cache 的就是每台机器才 32G ,总共集群里给 filesystem cache 的就是 32 * 3 = 96G 内存。而此时,整个磁盘上索引数据文件,在 3 台机器上一共占用了 1T 的磁盘容量,es 数据量是 1T ,那么每台机器的数据量是 300G 。这样性能好吗? filesystem cache 的内存才 100G,十分之一的数据可以放内存,其他的都在磁盘,然后你执行搜索操作,大部分操作都是走磁盘,性能肯定差。

归根结底,你要让 es 性能要好,最佳的情况下,就是你的机器的内存,至少可以容纳你的总数据量的一半。

根据我们自己的生产环境实践经验,最佳的情况下,是仅仅在 es 中就存少量的数据,就是你要用来搜索的那些索引,如果内存留给 filesystem cache 的是 100G,那么你就将索引数据控制在 100G 以内,这样的话,你的数据几乎全部走内存来搜索,性能非常之高,一般可以在 1 秒以内。

比如说你现在有一行数据。 id,name,age .... 30 个字段。但是你现在搜索,只需要根据 id,name,age 三个字段来搜索。如果你傻乎乎往 es 里写入一行数据所有的字段,就会导致说 90% 的数据是不用来搜索的,结果硬是占据了 es 机器上的 filesystem cache 的空间,单条数据的数据量越大,就会导致 filesystem cahce 能缓存的数据就越少。其实,仅仅写入 es 中要用来检索的少数几个字段就可以了,比如说就写入 es id,name,age 三个字段,然后你可以把其他的字段数据存在 mysql/hbase 里,我们一般是建议用 es + hbase 这么一个架构。

hbase 的特点是适用于海量数据的在线存储,就是对 hbase 可以写入海量数据,但是不要做复杂的搜索,做很简单的一些根据 id 或者范围进行查询的这么一个操作就可以了。从 es 中根据 name 和 age 去搜索,拿到的结果可能就 20 个 doc id ,然后根据 doc id 到 hbase 里去查询每个 doc id 对应的完整的数据,给查出来,再返回给前端。

写入 es 的数据最好小于等于,或者是略微大于 es 的 filesystem cache 的内存容量。然后你从 es 检索可能就花费 20ms,然后再根据 es 返回的 id 去 hbase 里查询,查 20 条数据,可能也就耗费个 30ms,可能你原来那么玩儿,1T 数据都放 es,会每次查询都是 5~10s,现在可能性能就会很高,每次查询就是 50ms。

数据预热

假如说,哪怕是你就按照上述的方案去做了,es 集群中每个机器写入的数据量还是超过了 filesystem cache 一倍,比如说你写入一台机器 60G 数据,结果 filesystem cache 就 30G,还是有 30G 数据留在了磁盘上。

其实可以做数据预热

举个例子,拿微博来说,你可以把一些大 V,平时看的人很多的数据,你自己提前后台搞个系统,每隔一会儿,自己的后台系统去搜索一下热数据,刷到 filesystem cache 里去,后面用户实际上来看这个热数据的时候,他们就是直接从内存里搜索了,很快。

或者是电商,你可以将平时查看最多的一些商品,比如说 iphone 8,热数据提前后台搞个程序,每隔 1 分钟自己主动访问一次,刷到 filesystem cache 里去。

对于那些你觉得比较热的、经常会有人访问的数据,最好做一个专门的缓存预热子系统,就是对热数据每隔一段时间,就提前访问一下,让数据进入 filesystem cache 里面去。这样下次别人访问的时候,性能一定会好很多。

冷热分离

es 可以做类似于 mysql 的水平拆分,就是说将大量的访问很少、频率很低的数据,单独写一个索引,然后将访问很频繁的热数据单独写一个索引。最好是将冷数据写入一个索引中,然后热数据写入另外一个索引中,这样可以确保热数据在被预热之后,尽量都让他们留在 filesystem os cache 里,别让冷数据给冲刷掉

你看,假设你有 6 台机器,2 个索引,一个放冷数据,一个放热数据,每个索引 3 个 shard。3 台机器放热数据 index,另外 3 台机器放冷数据 index。然后这样的话,你大量的时间是在访问热数据 index,热数据可能就占总数据量的 10%,此时数据量很少,几乎全都保留在 filesystem cache 里面了,就可以确保热数据的访问性能是很高的。但是对于冷数据而言,是在别的 index 里的,跟热数据 index 不在相同的机器上,大家互相之间都没什么联系了。如果有人访问冷数据,可能大量数据是在磁盘上的,此时性能差点,就 10% 的人去访问冷数据,90% 的人在访问热数据,也无所谓了。

document 模型设计

对于 MySQL,我们经常有一些复杂的关联查询。在 es 里该怎么玩儿,es 里面的复杂的关联查询尽量别用,一旦用了性能一般都不太好。

最好是先在 Java 系统里就完成关联,将关联好的数据直接写入 es 中。搜索的时候,就不需要利用 es 的搜索语法来完成 join 之类的关联搜索了。

document 模型设计是非常重要的,很多操作,不要在搜索的时候才想去执行各种复杂的乱七八糟的操作。es 能支持的操作就那么多,不要考虑用 es 做一些它不好操作的事情。如果真的有那种操作,尽量在 document 模型设计的时候,写入的时候就完成。另外对于一些太复杂的操作,比如 join/nested/parent-child 搜索都要尽量避免,性能都很差的。

分页性能优化

es 的分页是较坑的,为啥呢?举个例子吧,假如你每页是 10 条数据,你现在要查询第 100 页,实际上是会把每个 shard 上存储的前 1000 条数据都查到一个协调节点上,如果你有个 5 个 shard,那么就有 5000 条数据,接着协调节点对这 5000 条数据进行一些合并、处理,再获取到最终第 100 页的 10 条数据。

分布式的,你要查第 100 页的 10 条数据,不可能说从 5 个 shard,每个 shard 就查 2 条数据,最后到协调节点合并成 10 条数据吧?你必须得从每个 shard 都查 1000 条数据过来,然后根据你的需求进行排序、筛选等等操作,最后再次分页,拿到里面第 100 页的数据。你翻页的时候,翻的越深,每个 shard 返回的数据就越多,而且协调节点处理的时间越长,非常坑爹。所以用 es 做分页的时候,你会发现越翻到后面,就越是慢。

我们之前也是遇到过这个问题,用 es 作分页,前几页就几十毫秒,翻到 10 页或者几十页的时候,基本上就要 5~10 秒才能查出来一页数据了。

有什么解决方案吗?

不允许深度分页(默认深度分页性能很差)

跟产品经理说,你系统不允许翻那么深的页,默认翻的越深,性能就越差。

类似于 app 里的推荐商品不断下拉出来一页一页的

类似于微博中,下拉刷微博,刷出来一页一页的,你可以用 scroll api ,关于如何使用,自行上网搜索。

scroll 会一次性给你生成所有数据的一个快照,然后每次滑动向后翻页就是通过游标 scroll_id 移动,获取下一页下一页这样子,性能会比上面说的那种分页性能要高很多很多,基本上都是毫秒级的。

但是,唯一的一点就是,这个适合于那种类似微博下拉翻页的,不能随意跳到任何一页的场景。也就是说,你不能先进入第 10 页,然后去第 120 页,然后又回到第 58 页,不能随意乱跳页。所以现在很多产品,都是不允许你随意翻页的,app,也有一些网站,做的就是你只能往下拉,一页一页的翻。

初始化时必须指定 scroll 参数,告诉 es 要保存此次搜索的上下文多长时间。你需要确保用户不会持续不断翻页翻几个小时,否则可能因为超时而失败。

除了用 scroll api ,你也可以用 search_after 来做, search_after 的思想是使用前一页的结果来帮助检索下一页的数据,显然,这种方式也不允许你随意翻页,你只能一页页往后翻。初始化时,需要使用一个唯一值的字段作为 sort 字段。

1.1、设计阶段调优

(1)根据业务增量需求,采取基于日期模板创建索引,通过 roll over API 滚动索引;

(2)使用别名进行索引管理;

(3)每天凌晨定时对索引做 force_merge 操作,以释放空间;

(4)采取冷热分离机制,热数据存储到 SSD,提高检索效率;冷数据定期进行 shrink 操作,以缩减存储;

(5)采取 curator 进行索引的生命周期管理;

(6)仅针对需要分词的字段,合理的设置分词器;

(7)Mapping 阶段充分结合各个字段的属性,是否需要检索、是否需要存储等。……..

1.2、写入调优

(1)写入前副本数设置为 0;

(2)写入前关闭 refresh_interval 设置为-1,禁用刷新机制;

(3)写入过程中:采取 bulk 批量写入;

(4)写入后恢复副本数和刷新间隔;

(5)尽量使用自动生成的 id。

1.3、查询调优

(1)禁用 wildcard;

(2)禁用批量 terms(成百上千的场景);

(3)充分利用倒排索引机制,能 keyword 类型尽量 keyword;

(4)数据量大时候,可以先基于时间敲定索引再检索;

(5)设置合理的路由机制。

1.4、其他调优

部署调优,业务调优等。

上面的提及一部分,面试者就基本对你之前的实践或者运维经验有所评估了。

工作原理

es 写数据过程

  • 客户端选择一个 node 发送请求过去,这个 node 就是 coordinating node (协调节点)。
  • coordinating node 对 document 进行路由,将请求转发给对应的 node(有 primary shard)。
  • 实际的 node 上的 primary shard 处理请求,然后将数据同步到 replica node
  • coordinating node 如果发现 primary node 和所有 replica node 都搞定之后,就返回响应结果给客户端。

es-write

es 读数据过程

可以通过 doc id 来查询,会根据 doc id 进行 hash,判断出来当时把 doc id 分配到了哪个 shard 上面去,从那个 shard 去查询。

  • 客户端发送请求到任意一个 node,成为 coordinate node
  • coordinate nodedoc id 进行哈希路由,将请求转发到对应的 node,此时会使用 round-robin 随机轮询算法,在 primary shard 以及其所有 replica 中随机选择一个,让读请求负载均衡。
  • 接收请求的 node 返回 document 给 coordinate node
  • coordinate node 返回 document 给客户端。

es 搜索数据过程

es 最强大的是做全文检索,就是比如你有三条数据:

1
2
3
java真好玩儿啊
java好难学啊
j2ee特别牛

你根据 java 关键词来搜索,将包含 javadocument 给搜索出来。es 就会给你返回:java 真好玩儿啊,java 好难学啊。

  • 客户端发送请求到一个 coordinate node
  • 协调节点将搜索请求转发到所有的 shard 对应的 primary shardreplica shard ,都可以。
  • query phase:每个 shard 将自己的搜索结果(其实就是一些 doc id )返回给协调节点,由协调节点进行数据的合并、排序、分页等操作,产出最终结果。
  • fetch phase:接着由协调节点根据 doc id 去各个节点上拉取实际document 数据,最终返回给客户端。

写请求是写入 primary shard,然后同步给所有的 replica shard;读请求可以从 primary shard 或 replica shard 读取,采用的是随机轮询算法。

写数据底层原理

es-write-detail

先写入内存 buffer,在 buffer 里的时候数据是搜索不到的;同时将数据写入 translog 日志文件。

如果 buffer 快满了,或者到一定时间,就会将内存 buffer 数据 refresh 到一个新的 segment file 中,但是此时数据不是直接进入 segment file 磁盘文件,而是先进入 os cache 。这个过程就是 refresh

每隔 1 秒钟,es 将 buffer 中的数据写入一个新的 segment file ,每秒钟会产生一个新的磁盘文件 segment file ,这个 segment file 中就存储最近 1 秒内 buffer 中写入的数据。

但是如果 buffer 里面此时没有数据,那当然不会执行 refresh 操作,如果 buffer 里面有数据,默认 1 秒钟执行一次 refresh 操作,刷入一个新的 segment file 中。

操作系统里面,磁盘文件其实都有一个东西,叫做 os cache ,即操作系统缓存,就是说数据写入磁盘文件之前,会先进入 os cache ,先进入操作系统级别的一个内存缓存中去。只要 buffer 中的数据被 refresh 操作刷入 os cache 中,这个数据就可以被搜索到了。

为什么叫 es 是准实时的? NRT ,全称 near real-time 。默认是每隔 1 秒 refresh 一次的,所以 es 是准实时的,因为写入的数据 1 秒之后才能被看到。可以通过 es 的 restful api 或者 java api手动执行一次 refresh 操作,就是手动将 buffer 中的数据刷入 os cache 中,让数据立马就可以被搜索到。只要数据被输入 os cache 中,buffer 就会被清空了,因为不需要保留 buffer 了,数据在 translog 里面已经持久化到磁盘去一份了。

重复上面的步骤,新的数据不断进入 buffer 和 translog,不断将 buffer 数据写入一个又一个新的 segment file 中去,每次 refresh 完 buffer 清空,translog 保留。随着这个过程推进,translog 会变得越来越大。当 translog 达到一定长度的时候,就会触发 commit 操作。

commit 操作发生第一步,就是将 buffer 中现有数据 refreshos cache 中去,清空 buffer。然后,将一个 commit point 写入磁盘文件,里面标识着这个 commit point 对应的所有 segment file ,同时强行将 os cache 中目前所有的数据都 fsync 到磁盘文件中去。最后清空 现有 translog 日志文件,重启一个 translog,此时 commit 操作完成。

这个 commit 操作叫做 flush 。默认 30 分钟自动执行一次 flush ,但如果 translog 过大,也会触发 flush 。flush 操作就对应着 commit 的全过程,我们可以通过 es api,手动执行 flush 操作,手动将 os cache 中的数据 fsync 强刷到磁盘上去。

translog 日志文件的作用是什么?你执行 commit 操作之前,数据要么是停留在 buffer 中,要么是停留在 os cache 中,无论是 buffer 还是 os cache 都是内存,一旦这台机器死了,内存中的数据就全丢了。所以需要将数据对应的操作写入一个专门的日志文件 translog 中,一旦此时机器宕机,再次重启的时候,es 会自动读取 translog 日志文件中的数据,恢复到内存 buffer 和 os cache 中去。

translog 其实也是先写入 os cache 的,默认每隔 5 秒刷一次到磁盘中去,所以默认情况下,可能有 5 秒的数据会仅仅停留在 buffer 或者 translog 文件的 os cache 中,如果此时机器挂了,会丢失 5 秒钟的数据。但是这样性能比较好,最多丢 5 秒的数据。也可以将 translog 设置成每次写操作必须是直接 fsync 到磁盘,但是性能会差很多。

实际上你在这里,如果面试官没有问你 es 丢数据的问题,你可以在这里给面试官炫一把,你说,其实 es 第一是准实时的,数据写入 1 秒后可以搜索到;可能会丢失数据的。有 5 秒的数据,停留在 buffer、translog os cache、segment file os cache 中,而不在磁盘上,此时如果宕机,会导致 5 秒的数据丢失

总结一下,数据先写入内存 buffer,然后每隔 1s,将数据 refresh 到 os cache,到了 os cache 数据就能被搜索到(所以我们才说 es 从写入到能被搜索到,中间有 1s 的延迟)。每隔 5s,将数据写入 translog 文件(这样如果机器宕机,内存数据全没,最多会有 5s 的数据丢失),translog 大到一定程度,或者默认每隔 30mins,会触发 commit 操作,将缓冲区的数据都 flush 到 segment file 磁盘文件中。

数据写入 segment file 之后,同时就建立好了倒排索引。

删除/更新数据底层原理

如果是删除操作,commit 的时候会生成一个 .del 文件,里面将某个 doc 标识为 deleted 状态,那么搜索的时候根据 .del 文件就知道这个 doc 是否被删除了。

如果是更新操作,就是将原来的 doc 标识为 deleted 状态,然后新写入一条数据。

buffer 每 refresh 一次,就会产生一个 segment file ,所以默认情况下是 1 秒钟一个 segment file ,这样下来 segment file 会越来越多,此时会定期执行 merge。每次 merge 的时候,会将多个 segment file 合并成一个,同时这里会将标识为 deleted 的 doc 给物理删除掉,然后将新的 segment file 写入磁盘,这里会写一个 commit point ,标识所有新的 segment file ,然后打开 segment file 供搜索使用,同时删除旧的 segment file

底层 lucene

简单来说,lucene 就是一个 jar 包,里面包含了封装好的各种建立倒排索引的算法代码。我们用 Java 开发的时候,引入 lucene jar,然后基于 lucene 的 api 去开发就可以了。

通过 lucene,我们可以将已有的数据建立索引,lucene 会在本地磁盘上面,给我们组织索引的数据结构。

倒排索引

在搜索引擎中,每个文档都有一个对应的文档 ID,文档内容被表示为一系列关键词的集合。例如,文档 1 经过分词,提取了 20 个关键词,每个关键词都会记录它在文档中出现的次数和出现位置。

那么,倒排索引就是关键词到文档 ID 的映射,每个关键词都对应着一系列的文件,这些文件中都出现了关键词。

举个栗子。

有以下文档:

DocId Doc
1 谷歌地图之父跳槽 Facebook
2 谷歌地图之父加盟 Facebook
3 谷歌地图创始人拉斯离开谷歌加盟 Facebook
4 谷歌地图之父跳槽 Facebook 与 Wave 项目取消有关
5 谷歌地图之父拉斯加盟社交网站 Facebook

对文档进行分词之后,得到以下倒排索引

WordId Word DocIds
1 谷歌 1, 2, 3, 4, 5
2 地图 1, 2, 3, 4, 5
3 之父 1, 2, 4, 5
4 跳槽 1, 4
5 Facebook 1, 2, 3, 4, 5
6 加盟 2, 3, 5
7 创始人 3
8 拉斯 3, 5
9 离开 3
10 4
.. .. ..

另外,实用的倒排索引还可以记录更多的信息,比如文档频率信息,表示在文档集合中有多少个文档包含某个单词。

那么,有了倒排索引,搜索引擎可以很方便地响应用户的查询。比如用户输入查询 Facebook ,搜索系统查找倒排索引,从中读出包含这个单词的文档,这些文档就是提供给用户的搜索结果。

要注意倒排索引的两个重要细节:

  • 倒排索引中的所有词项对应一个或多个文档;
  • 倒排索引中的词项根据字典顺序升序排列

上面只是一个简单的栗子,并没有严格按照字典顺序升序排列。

elasticsearch 的倒排索引是什么

面试官:想了解你对基础概念的认知。

解答:通俗解释一下就可以。

传统的我们的检索是通过文章,逐个遍历找到对应关键词的位置。

而倒排索引,是通过分词策略,形成了词和文章的映射关系表,这种词典+映射表即为倒排索引。有了倒排索引,就能实现 o(1)时间复杂度的效率检索文章了,极大的提高了检索效率。

img

学术的解答方式:

倒排索引,相反于一篇文章包含了哪些词,它从词出发,记载了这个词在哪些文档中出现过,由两部分组成——词典和倒排表。

加分项:倒排索引的底层实现是基于:FST(Finite State Transducer)数据结构。

lucene 从 4+版本后开始大量使用的数据结构是 FST。FST 有两个优点:

(1)空间占用小。通过对词典中单词前缀和后缀的重复利用,压缩了存储空间;

(2)查询速度快。O(len(str))的查询时间复杂度。

3、elasticsearch 索引数据多了怎么办,如何调优,部署

面试官:想了解大数据量的运维能力。

解答:索引数据的规划,应在前期做好规划,正所谓“设计先行,编码在后”,这样才能有效的避免突如其来的数据激增导致集群处理能力不足引发的线上客户检索或者其他业务受到影响。

如何调优,正如问题 1 所说,这里细化一下:

3.1 动态索引层面

基于模板+时间+rollover api 滚动创建索引,举例:设计阶段定义:blog 索引的模板格式为:blogindex时间戳的形式,每天递增数据。这样做的好处:不至于数据量激增导致单个索引数据量非常大,接近于上线 2 的 32 次幂-1,索引存储达到了 TB+甚至更大。

一旦单个索引很大,存储等各种风险也随之而来,所以要提前考虑+及早避免。

3.2 存储层面

冷热数据分离存储,热数据(比如最近 3 天或者一周的数据),其余为冷数据。

对于冷数据不会再写入新数据,可以考虑定期 force_merge 加 shrink 压缩操作,节省存储空间和检索效率。

3.3 部署层面

一旦之前没有规划,这里就属于应急策略。

结合 ES 自身的支持动态扩展的特点,动态新增机器的方式可以缓解集群压力,注意:如果之前主节点等规划合理,不需要重启集群也能完成动态新增的。

4、elasticsearch 是如何实现 master 选举的

面试官:想了解 ES 集群的底层原理,不再只关注业务层面了。

解答:

前置前提:

(1)只有候选主节点(master:true)的节点才能成为主节点。

(2)最小主节点数(min_master_nodes)的目的是防止脑裂。

核对了一下代码,核心入口为 findMaster,选择主节点成功返回对应 Master,否则返回 null。选举流程大致描述如下:

第一步:确认候选主节点数达标,elasticsearch.yml 设置的值

discovery.zen.minimum_master_nodes;

第二步:比较:先判定是否具备 master 资格,具备候选主节点资格的优先返回;

若两节点都为候选主节点,则 id 小的值会主节点。注意这里的 id 为 string 类型。

题外话:获取节点 id 的方法。

1
2
3
1GET /_cat/nodes?v&h=ip,port,heapPercent,heapMax,id,name

2ip port heapPercent heapMax id name

详细描述一下 Elasticsearch 索引文档的过程

面试官:想了解 ES 的底层原理,不再只关注业务层面了。

解答:

这里的索引文档应该理解为文档写入 ES,创建索引的过程。

文档写入包含:单文档写入和批量 bulk 写入,这里只解释一下:单文档写入流程。

记住官方文档中的这个图。

img

第一步:客户写集群某节点写入数据,发送请求。(如果没有指定路由/协调节点,请求的节点扮演路由节点的角色。)

第二步:节点 1 接受到请求后,使用文档_id 来确定文档属于分片 0。请求会被转到另外的节点,假定节点 3。因此分片 0 的主分片分配到节点 3 上。

第三步:节点 3 在主分片上执行写操作,如果成功,则将请求并行转发到节点 1 和节点 2 的副本分片上,等待结果返回。所有的副本分片都报告成功,节点 3 将向协调节点(节点 1)报告成功,节点 1 向请求客户端报告写入成功。

如果面试官再问:第二步中的文档获取分片的过程?

回答:借助路由算法获取,路由算法就是根据路由和文档 id 计算目标的分片 id 的过程。

1
1shard = hash(_routing) % (num_of_primary_shards)

详细描述一下 Elasticsearch 搜索的过程?

面试官:想了解 ES 搜索的底层原理,不再只关注业务层面了。

解答:

搜索拆解为“query then fetch” 两个阶段。

query 阶段的目的:定位到位置,但不取。

步骤拆解如下:

(1)假设一个索引数据有 5 主+1 副本 共 10 分片,一次请求会命中(主或者副本分片中)的一个。

(2)每个分片在本地进行查询,结果返回到本地有序的优先队列中。

(3)第 2)步骤的结果发送到协调节点,协调节点产生一个全局的排序列表。

fetch 阶段的目的:取数据。

路由节点获取所有文档,返回给客户端。

Elasticsearch 在部署时,对 Linux 的设置有哪些优化方法

面试官:想了解对 ES 集群的运维能力。

解答:

(1)关闭缓存 swap;

(2)堆内存设置为:Min(节点内存/2, 32GB);

(3)设置最大文件句柄数;

(4)线程池+队列大小根据业务需要做调整;

(5)磁盘存储 raid 方式——存储有条件使用 RAID10,增加单节点性能以及避免单节点存储故障。

lucence 内部结构是什么?

面试官:想了解你的知识面的广度和深度。

解答:

img

Lucene 是有索引和搜索的两个过程,包含索引创建,索引,搜索三个要点。可以基于这个脉络展开一些。

Elasticsearch 是如何实现 Master 选举的?

(1)Elasticsearch 的选主是 ZenDiscovery 模块负责的,主要包含 Ping(节点之间通过这个 RPC 来发现彼此)和 Unicast(单播模块包含一个主机列表以控制哪些节点需要 ping 通)这两部分;

(2)对所有可以成为 master 的节点(node.master: true)根据 nodeId 字典排序,每次选举每个节点都把自己所知道节点排一次序,然后选出第一个(第 0 位)节点,暂且认为它是 master 节点。

(3)如果对某个节点的投票数达到一定的值(可以成为 master 节点数 n/2+1)并且该节点自己也选举自己,那这个节点就是 master。否则重新选举一直到满足上述条件。

(4)补充:master 节点的职责主要包括集群、节点和索引的管理,不负责文档级别的管理;data 节点可以关闭 http 功能*。

10、Elasticsearch 中的节点(比如共 20 个),其中的 10 个

选了一个 master,另外 10 个选了另一个 master,怎么办?

(1)当集群 master 候选数量不小于 3 个时,可以通过设置最少投票通过数量(discovery.zen.minimum_master_nodes)超过所有候选节点一半以上来解决脑裂问题;

(3)当候选数量为两个时,只能修改为唯一的一个 master 候选,其他作为 data 节点,避免脑裂问题。

客户端在和集群连接时,如何选择特定的节点执行请求的?

TransportClient 利用 transport 模块远程连接一个 elasticsearch 集群。它并不加入到集群中,只是简单的获得一个或者多个初始化的 transport 地址,并以 轮询 的方式与这些地址进行通信。

详细描述一下 Elasticsearch 索引文档的过程。

协调节点默认使用文档 ID 参与计算(也支持通过 routing),以便为路由提供合适的分片。

1
shard = hash(document_id) % (num_of_primary_shards)

(1)当分片所在的节点接收到来自协调节点的请求后,会将请求写入到 MemoryBuffer,然后定时(默认是每隔 1 秒)写入到 Filesystem Cache,这个从 MomeryBuffer 到 Filesystem Cache 的过程就叫做 refresh;

(2)当然在某些情况下,存在 Momery Buffer 和 Filesystem Cache 的数据可能会丢失,ES 是通过 translog 的机制来保证数据的可靠性的。其实现机制是接收到请求后,同时也会写入到 translog 中 ,当 Filesystem cache 中的数据写入到磁盘中时,才会清除掉,这个过程叫做 flush;

(3)在 flush 过程中,内存中的缓冲将被清除,内容被写入一个新段,段的 fsync 将创建一个新的提交点,并将内容刷新到磁盘,旧的 translog 将被删除并开始一个新的 translog。

(4)flush 触发的时机是定时触发(默认 30 分钟)或者 translog 变得太大(默认为 512M)时;

img

补充:关于 Lucene 的 Segement:

(1)Lucene 索引是由多个段组成,段本身是一个功能齐全的倒排索引。

(2)段是不可变的,允许 Lucene 将新的文档增量地添加到索引中,而不用从头重建索引。

(3)对于每一个搜索请求而言,索引中的所有段都会被搜索,并且每个段会消耗 CPU 的时钟周、文件句柄和内存。这意味着段的数量越多,搜索性能会越低。

(4)为了解决这个问题,Elasticsearch 会合并小段到一个较大的段,提交新的合并段到磁盘,并删除那些旧的小段。

详细描述一下 Elasticsearch 更新和删除文档的过程。

(1)删除和更新也都是写操作,但是 Elasticsearch 中的文档是不可变的,因此不能被删除或者改动以展示其变更;

(2)磁盘上的每个段都有一个相应的.del 文件。当删除请求发送后,文档并没有真的被删除,而是在.del 文件中被标记为删除。该文档依然能匹配查询,但是会在结果中被过滤掉。当段合并时,在.del 文件中被标记为删除的文档将不会被写入新段。

(3)在新的文档被创建时,Elasticsearch 会为该文档指定一个版本号,当执行更新时,旧版本的文档在.del 文件中被标记为删除,新版本的文档被索引到一个新段。旧版本的文档依然能匹配查询,但是会在结果中被过滤掉。

详细描述一下 Elasticsearch 搜索的过程。

(1)搜索被执行成一个两阶段过程,我们称之为 Query Then Fetch;

(2)在初始查询阶段时,查询会广播到索引中每一个分片拷贝(主分片或者副本分片)。 每个分片在本地执行搜索并构建一个匹配文档的大小为 from + size 的优先队列。

PS:在搜索的时候是会查询 Filesystem Cache 的,但是有部分数据还在 MemoryBuffer,所以搜索是近实时的。

(3)每个分片返回各自优先队列中 所有文档的 ID 和排序值 给协调节点,它合并这些值到自己的优先队列中来产生一个全局排序后的结果列表。

(4)接下来就是 取回阶段,协调节点辨别出哪些文档需要被取回并向相关的分片提交多个 GET 请求。每个分片加载并 丰 富 文档,如果有需要的话,接着返回文档给协调节点。一旦所有的文档都被取回了,协调节点返回结果给客户端。

(5)补充:Query Then Fetch 的搜索类型在文档相关性打分的时候参考的是本分片的数据,这样在文档数量较少的时候可能不够准确,DFS Query Then Fetch 增加了一个预查询的处理,询问 Term 和 Document frequency,这个评分更准确,但是性能会变差。*

img

在 Elasticsearch 中,是怎么根据一个词找到对应的倒排索引的?

(1)Lucene 的索引过程,就是按照全文检索的基本过程,将倒排表写成此文件格式的过程。

(2)Lucene 的搜索过程,就是按照此文件格式将索引进去的信息读出来,然后计算每篇文档打分(score)的过程。

Elasticsearch 在部署时,对 Linux 的设置有哪些优化方法?

(1)64 GB 内存的机器是非常理想的, 但是 32 GB 和 16 GB 机器也是很常见的。少于 8 GB 会适得其反。

(2)如果你要在更快的 CPUs 和更多的核心之间选择,选择更多的核心更好。多个内核提供的额外并发远胜过稍微快一点点的时钟频率。

(3)如果你负担得起 SSD,它将远远超出任何旋转介质。 基于 SSD 的节点,查询和索引性能都有提升。如果你负担得起,SSD 是一个好的选择。

(4)即使数据中心们近在咫尺,也要避免集群跨越多个数据中心。绝对要避免集群跨越大的地理距离。

(5)请确保运行你应用程序的 JVM 和服务器的 JVM 是完全一样的。 在 Elasticsearch 的几个地方,使用 Java 的本地序列化。

(6)通过设置 gateway.recover_after_nodes、gateway.expected_nodes、gateway.recover_after_time 可以在集群重启的时候避免过多的分片交换,这可能会让数据恢复从数个小时缩短为几秒钟。

(7)Elasticsearch 默认被配置为使用单播发现,以防止节点无意中加入集群。只有在同一台机器上运行的节点才会自动组成集群。最好使用单播代替组播。

(8)不要随意修改垃圾回收器(CMS)和各个线程池的大小。

(9)把你的内存的(少于)一半给 Lucene(但不要超过 32 GB!),通过 ES_HEAP_SIZE 环境变量设置。

(10)内存交换到磁盘对服务器性能来说是致命的。如果内存交换到磁盘上,一个 100 微秒的操作可能变成 10 毫秒。 再想想那么多 10 微秒的操作时延累加起来。 不难看出 swapping 对于性能是多么可怕。

(11)Lucene 使用了大 量 的文件。同时,Elasticsearch 在节点和 HTTP 客户端之间进行通信也使用了大量的套接字。 所有这一切都需要足够的文件描述符。你应该增加你的文件描述符,设置一个很大的值,如 64,000。

补充:索引阶段性能提升方法

(1)使用批量请求并调整其大小:每次批量数据 5–15 MB 大是个不错的起始点。

(2)存储:使用 SSD

(3)段和合并:Elasticsearch 默认值是 20 MB/s,对机械磁盘应该是个不错的设置。如果你用的是 SSD,可以考虑提高到 100–200 MB/s。如果你在做批量导入,完全不在意搜索,你可以彻底关掉合并限流。另外还可以增加 index.translog.flush_threshold_size 设置,从默认的 512 MB 到更大一些的值,比如 1 GB,这可以在一次清空触发的时候在事务日志里积累出更大的段。

(4)如果你的搜索结果不需要近实时的准确度,考虑把每个索引的 index.refresh_interval 改到 30s。

(5)如果你在做大批量导入,考虑通过设置 index.number_of_replicas: 0 关闭副本。

对于 GC 方面,在使用 Elasticsearch 时要注意什么?

(1)倒排词典的索引需要常驻内存,无法 GC,需要监控 data node 上 segmentmemory 增长趋势。

(2)各类缓存,field cache, filter cache, indexing cache, bulk queue 等等,要设置合理的大小,并且要应该根据最坏的情况来看 heap 是否够用,也就是各类缓存全部占满的时候,还有 heap 空间可以分配给其他任务吗?避免采用 clear cache 等“自欺欺人”的方式来释放内存。

(3)避免返回大量结果集的搜索与聚合。确实需要大量拉取数据的场景,可以采用 scan & scroll api 来实现。

(4)cluster stats 驻留内存并无法水平扩展,超大规模集群可以考虑分拆成多个集群通过 tribe node 连接。

(5)想知道 heap 够不够,必须结合实际应用场景,并对集群的 heap 使用情况做持续的监控。

(6)根据监控数据理解内存需求,合理配置各类 circuit breaker,将内存溢出风险降低到最低

18、Elasticsearch 对于大数据量(上亿量级)的聚合如何实现?

Elasticsearch 提供的首个近似聚合是 cardinality 度量。它提供一个字段的基数,即该字段的 distinct 或者 unique 值的数目。它是基于 HLL 算法的。HLL 会先对我们的输入作哈希运算,然后根据哈希运算的结果中的 bits 做概率估算从而得到基数。其特点是:可配置的精度,用来控制内存的使用(更精确 = 更多内存);小的数据集精度是非常高的;我们可以通过配置参数,来设置去重需要的固定内存使用量。无论数千还是数十亿的唯一值,内存使用量只与你配置的精确度相关。

19、在并发情况下,Elasticsearch 如果保证读写一致?

(1)可以通过版本号使用乐观并发控制,以确保新版本不会被旧版本覆盖,由应用层来处理具体的冲突;

(2)另外对于写操作,一致性级别支持 quorum/one/all,默认为 quorum,即只有当大多数分片可用时才允许写操作。但即使大多数可用,也可能存在因为网络等原因导致写入副本失败,这样该副本被认为故障,分片将会在一个不同的节点上重建。

(3)对于读操作,可以设置 replication 为 sync(默认),这使得操作在主分片和副本分片都完成后才会返回;如果设置 replication 为 async 时,也可以通过设置搜索请求参数_preference 为 primary 来查询主分片,确保文档是最新版本。

20、如何监控 Elasticsearch 集群状态?

Marvel 让你可以很简单的通过 Kibana 监控 Elasticsearch。你可以实时查看你的集群健康状态和性能,也可以分析过去的集群、索引和节点指标。

21、介绍下你们电商搜索的整体技术架构。

img

介绍一下你们的个性化搜索方案?

基于 word2vec 和 Elasticsearch 实现个性化搜索

(1)基于 word2vec、Elasticsearch 和自定义的脚本插件,我们就实现了一个个性化的搜索服务,相对于原有的实现,新版的点击率和转化率都有大幅的提升;

(2)基于 word2vec 的商品向量还有一个可用之处,就是可以用来实现相似商品的推荐;

(3)使用 word2vec 来实现个性化搜索或个性化推荐是有一定局限性的,因为它只能处理用户点击历史这样的时序数据,而无法全面的去考虑用户偏好,这个还是有很大的改进和提升的空间;

是否了解字典树?

常用字典数据结构如下所示:

img

Trie 的核心思想是空间换时间,利用字符串的公共前缀来降低查询时间的开销以达到提高效率的目的。它有 3 个基本性质:

1)根节点不包含字符,除根节点外每一个节点都只包含一个字符。

2)从根节点到某一节点,路径上经过的字符连接起来,为该节点对应的字符串。

3)每个节点的所有子节点包含的字符都不相同。

img

(1)可以看到,trie 树每一层的节点数是 26^i 级别的。所以为了节省空间,我们还可以用动态链表,或者用数组来模拟动态。而空间的花费,不会超过单词数 × 单词长度。

(2)实现:对每个结点开一个字母集大小的数组,每个结点挂一个链表,使用左儿子右兄弟表示法记录这棵树;

(3)对于中文的字典树,每个节点的子节点用一个哈希表存储,这样就不用浪费太大的空间,而且查询速度上可以保留哈希的复杂度 O(1)。

拼写纠错是如何实现的?

(1)拼写纠错是基于编辑距离来实现;编辑距离是一种标准的方法,它用来表示经过插入、删除和替换操作从一个字符串转换到另外一个字符串的最小操作步数;

(2)编辑距离的计算过程:比如要计算 batyu 和 beauty 的编辑距离,先创建一个 7×8 的表(batyu 长度为 5,coffee 长度为 6,各加 2),接着,在如下位置填入黑色数字。其他格的计算过程是取以下三个值的最小值:

如果最上方的字符等于最左方的字符,则为左上方的数字。否则为左上方的数字+1。(对于 3,3 来说为 0)

左方数字+1(对于 3,3 格来说为 2)

上方数字+1(对于 3,3 格来说为 2)

最终取右下角的值即为编辑距离的值 3。

img

对于拼写纠错,我们考虑构造一个度量空间(Metric Space),该空间内任何关系满足以下三条基本条件:

d(x,y) = 0 – 假如 x 与 y 的距离为 0,则 x=y

d(x,y) = d(y,x) – x 到 y 的距离等同于 y 到 x 的距离

d(x,y) + d(y,z) >= d(x,z) – 三角不等式

(1)根据三角不等式,则满足与 query 距离在 n 范围内的另一个字符转 B,其与 A 的距离最大为 d+n,最小为 d-n。

(2)BK 树的构造就过程如下:每个节点有任意个子节点,每条边有个值表示编辑距离。所有子节点到父节点的边上标注 n 表示编辑距离恰好为 n。比如,我们有棵树父节点是”book”和两个子节点”cake”和”books”,”book”到”books”的边标号 1,”book”到”cake”的边上标号 4。从字典里构造好树后,无论何时你想插入新单词时,计算该单词与根节点的编辑距离,并且查找数值为 d(neweord, root)的边。递归得与各子节点进行比较,直到没有子节点,你就可以创建新的子节点并将新单词保存在那。比如,插入”boo”到刚才上述例子的树中,我们先检查根节点,查找 d(“book”, “boo”) = 1 的边,然后检查标号为 1 的边的子节点,得到单词”books”。我们再计算距离 d(“books”, “boo”)=2,则将新单词插在”books”之后,边标号为 2。

(3)查询相似词如下:计算单词与根节点的编辑距离 d,然后递归查找每个子节点标号为 d-n 到 d+n(包含)的边。假如被检查的节点与搜索单词的距离 d 小于 n,则返回该节点并继续查询。比如输入 cape 且最大容忍距离为 1,则先计算和根的编辑距离 d(“book”, “cape”)=4,然后接着找和根节点之间编辑距离为 3 到 5 的,这个就找到了 cake 这个节点,计算 d(“cake”, “cape”)=1,满足条件所以返回 cake,然后再找和 cake 节点编辑距离是 0 到 2 的,分别找到 cape 和 cart 节点,这样就得到 cape 这个满足条件的结果。

img

ElasticSearch Rest API

Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。 作为 Elastic Stack 的核心,它集中存储您的数据,帮助您发现意料之中以及意料之外的情况。

Elasticsearch 基于搜索库 Lucene 开发。ElasticSearch 隐藏了 Lucene 的复杂性,提供了简单易用的 REST API / Java API 接口(另外还有其他语言的 API 接口)。

_以下简称 ES_。

REST API 最详尽的文档应该参考:ES 官方 REST API

ElasticSearch Rest API 语法格式

向 Elasticsearch 发出的请求的组成部分与其它普通的 HTTP 请求是一样的:

1
curl -X<VERB> '<PROTOCOL>://<HOST>:<PORT>/<PATH>?<QUERY_STRING>' -d '<BODY>'
  • VERB:HTTP 方法,支持:GET, POST, PUT, HEAD, DELETE
  • PROTOCOL:http 或者 https 协议(只有在 Elasticsearch 前面有 https 代理的时候可用)
  • HOST:Elasticsearch 集群中的任何一个节点的主机名,如果是在本地的节点,那么就叫 localhost
  • PORT:Elasticsearch HTTP 服务所在的端口,默认为 9200 PATH API 路径(例如_count 将返回集群中文档的数量),
  • PATH:可以包含多个组件,例如 _cluster/stats 或者 _nodes/stats/jvm
  • QUERY_STRING:一些可选的查询请求参数,例如?pretty 参数将使请求返回更加美观易读的 JSON 数据
  • BODY:一个 JSON 格式的请求主体(如果请求需要的话)

ElasticSearch Rest API 分为两种:

  • URI Search:在 URL 中使用查询参数
  • Request Body Search:基于 JSON 格式的、更加完备的 DSL

URI Search 示例:

Request Body Search 示例:

索引 API

参考资料:Elasticsearch 官方之 cat 索引 API

创建索引

新建 Index,可以直接向 ES 服务器发出 PUT 请求。

语法格式:

1
2
3
4
5
6
7
8
9
PUT /my_index
{
"settings": { ... any settings ... },
"mappings": {
"type_one": { ... any mappings ... },
"type_two": { ... any mappings ... },
...
}
}

示例:

1
2
3
4
5
6
7
8
9
PUT /user
{
"settings": {
"index": {
"number_of_shards": 3,
"number_of_replicas": 2
}
}
}

服务器返回一个 JSON 对象,里面的 acknowledged 字段表示操作成功。

1
{"acknowledged":true,"shards_acknowledged":true,"index":"user"}

如果你想禁止自动创建索引,可以通过在 config/elasticsearch.yml 的每个节点下添加下面的配置:

1
action.auto_create_index: false

删除索引

然后,我们可以通过发送 DELETE 请求,删除这个 Index。

1
DELETE /user

删除多个索引

1
2
DELETE /index_one,index_two
DELETE /index_*

查看索引

可以通过 GET 请求查看索引信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 查看索引相关信息
GET kibana_sample_data_ecommerce

# 查看索引的文档总数
GET kibana_sample_data_ecommerce/_count

# 查看前10条文档,了解文档格式
GET kibana_sample_data_ecommerce/_search

# _cat indices API
# 查看indices
GET /_cat/indices/kibana*?v&s=index

# 查看状态为绿的索引
GET /_cat/indices?v&health=green

# 按照文档个数排序
GET /_cat/indices?v&s=docs.count:desc

# 查看具体的字段
GET /_cat/indices/kibana*?pri&v&h=health,index,pri,rep,docs.count,mt

# 查看索引占用的内存
GET /_cat/indices?v&h=i,tm&s=tm:desc

索引别名

ES 的索引别名就是给一个索引或者多个索引起的另一个名字,典型的应用场景是针对索引使用的平滑切换。

首先,创建索引 my_index,然后将别名 my_alias 指向它,示例如下:

1
2
PUT /my_index
PUT /my_index/_alias/my_alias

也可以通过如下形式:

1
2
3
4
5
6
POST /_aliases
{
"actions": [
{ "add": { "index": "my_index", "alias": "my_alias" }}
]
}

也可以在一次请求中增加别名和移除别名混合使用:

1
2
3
4
5
6
7
POST /_aliases
{
"actions": [
{ "remove": { "index": "my_index", "alias": "my_alias" }}
{ "add": { "index": "my_index_v2", "alias": "my_alias" }}
]
}

需要注意的是,如果别名与索引是一对一的,使用别名索引文档或者查询文档是可以的,但是如果别名和索引是一对多的,使用别名会发生错误,因为 ES 不知道把文档写入哪个索引中去或者从哪个索引中读取文档。

ES 索引别名有个典型的应用场景是平滑切换,更多细节可以查看 Elasticsearch(ES)索引零停机(无需重启)无缝平滑切换的方法

打开/关闭索引

通过在 POST 中添加 _close_open 可以打开、关闭索引。

打开索引

1
2
3
4
# 打开索引
POST kibana_sample_data_ecommerce/_open
# 关闭索引
POST kibana_sample_data_ecommerce/_close

文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
############Create Document############
#create document. 自动生成 _id
POST users/_doc
{
"user" : "Mike",
"post_date" : "2019-04-15T14:12:12",
"message" : "trying out Kibana"
}

#create document. 指定Id。如果id已经存在,报错
PUT users/_doc/1?op_type=create
{
"user" : "Jack",
"post_date" : "2019-05-15T14:12:12",
"message" : "trying out Elasticsearch"
}

#create document. 指定 ID 如果已经存在,就报错
PUT users/_create/1
{
"user" : "Jack",
"post_date" : "2019-05-15T14:12:12",
"message" : "trying out Elasticsearch"
}

### Get Document by ID
#Get the document by ID
GET users/_doc/1


### Index & Update
#Update 指定 ID (先删除,在写入)
GET users/_doc/1

PUT users/_doc/1
{
"user" : "Mike"

}


#GET users/_doc/1
#在原文档上增加字段
POST users/_update/1/
{
"doc":{
"post_date" : "2019-05-15T14:12:12",
"message" : "trying out Elasticsearch"
}
}



### Delete by Id
# 删除文档
DELETE users/_doc/1


### Bulk 操作
#执行两次,查看每次的结果

#执行第1次
POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test2", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }


#执行第2次
POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test2", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }

### mget 操作
GET /_mget
{
"docs" : [
{
"_index" : "test",
"_id" : "1"
},
{
"_index" : "test",
"_id" : "2"
}
]
}


#URI中指定index
GET /test/_mget
{
"docs" : [
{

"_id" : "1"
},
{

"_id" : "2"
}
]
}


GET /_mget
{
"docs" : [
{
"_index" : "test",
"_id" : "1",
"_source" : false
},
{
"_index" : "test",
"_id" : "2",
"_source" : ["field3", "field4"]
},
{
"_index" : "test",
"_id" : "3",
"_source" : {
"include": ["user"],
"exclude": ["user.location"]
}
}
]
}

### msearch 操作
POST kibana_sample_data_ecommerce/_msearch
{}
{"query" : {"match_all" : {}},"size":1}
{"index" : "kibana_sample_data_flights"}
{"query" : {"match_all" : {}},"size":2}


### 清除测试数据
#清除数据
DELETE users
DELETE test
DELETE test2

创建文档

指定 ID

语法格式:

1
PUT /_index/_type/_create/_id

示例:

1
2
3
4
5
6
PUT /user/_doc/_create/1
{
"user": "张三",
"title": "工程师",
"desc": "数据库管理"
}

注意:指定 Id,如果 id 已经存在,则报错

自动生成 ID

新增记录的时候,也可以不指定 Id,这时要改成 POST 请求。

语法格式:

1
POST /_index/_type

示例:

1
2
3
4
5
6
POST /user/_doc
{
"user": "张三",
"title": "工程师",
"desc": "超级管理员"
}

删除文档

语法格式:

1
DELETE /_index/_doc/_id

示例:

1
DELETE /user/_doc/1

更新文档

先删除,再写入

语法格式:

1
PUT /_index/_type/_id

示例:

1
2
3
4
5
6
PUT /user/_doc/1
{
"user": "李四",
"title": "工程师",
"desc": "超级管理员"
}

在原文档上增加字段

语法格式:

1
POST /_index/_update/_id

示例:

1
2
3
4
5
6
POST /user/_update/1
{
"doc":{
"age" : "30"
}
}

查询文档

指定 ID 查询

语法格式:

1
GET /_index/_type/_id

示例:

1
GET /user/_doc/1

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"_index": "user",
"_type": "_doc",
"_id": "1",
"_version": 1,
"_seq_no": 536248,
"_primary_term": 2,
"found": true,
"_source": {
"user": "张三",
"title": "工程师",
"desc": "数据库管理"
}
}

返回的数据中,found 字段表示查询成功,_source 字段返回原始记录。

如果 id 不正确,就查不到数据,found 字段就是 false

查询所有记录

使用 GET 方法,直接请求 /index/type/_search,就会返回所有记录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
$ curl 'localhost:9200/user/admin/_search?pretty'
{
"took" : 1,
"timed_out" : false,
"_shards" : {
"total" : 3,
"successful" : 3,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "user",
"_type" : "admin",
"_id" : "WWuoDG8BHwECs7SiYn93",
"_score" : 1.0,
"_source" : {
"user" : "李四",
"title" : "工程师",
"desc" : "系统管理"
}
},
{
"_index" : "user",
"_type" : "admin",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"user" : "张三",
"title" : "工程师",
"desc" : "超级管理员"
}
}
]
}
}

上面代码中,返回结果的 took字段表示该操作的耗时(单位为毫秒),timed_out字段表示是否超时,hits字段表示命中的记录,里面子字段的含义如下。

  • total:返回记录数,本例是 2 条。
  • max_score:最高的匹配程度,本例是1.0
  • hits:返回的记录组成的数组。

返回的记录中,每条记录都有一个_score字段,表示匹配的程序,默认是按照这个字段降序排列。

全文搜索

ES 的查询非常特别,使用自己的查询语法,要求 GET 请求带有数据体。

1
2
3
4
$ curl -H 'Content-Type: application/json' 'localhost:9200/user/admin/_search?pretty'  -d '
{
"query" : { "match" : { "desc" : "管理" }}
}'

上面代码使用 Match 查询,指定的匹配条件是desc字段里面包含”软件”这个词。返回结果如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 3,
"successful" : 3,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 0.38200712,
"hits" : [
{
"_index" : "user",
"_type" : "admin",
"_id" : "WWuoDG8BHwECs7SiYn93",
"_score" : 0.38200712,
"_source" : {
"user" : "李四",
"title" : "工程师",
"desc" : "系统管理"
}
},
{
"_index" : "user",
"_type" : "admin",
"_id" : "1",
"_score" : 0.3487891,
"_source" : {
"user" : "张三",
"title" : "工程师",
"desc" : "超级管理员"
}
}
]
}
}

Elastic 默认一次返回 10 条结果,可以通过size字段改变这个设置,还可以通过from字段,指定位移。

1
2
3
4
5
6
$ curl 'localhost:9200/user/admin/_search'  -d '
{
"query" : { "match" : { "desc" : "管理" }},
"from": 1,
"size": 1
}'

上面代码指定,从位置 1 开始(默认是从位置 0 开始),只返回一条结果。

逻辑运算

如果有多个搜索关键字, Elastic 认为它们是or关系。

1
2
3
4
$ curl 'localhost:9200/user/admin/_search'  -d '
{
"query" : { "match" : { "desc" : "软件 系统" }}
}'

上面代码搜索的是软件 or 系统

如果要执行多个关键词的and搜索,必须使用布尔查询

1
2
3
4
5
6
7
8
9
10
11
$ curl -H 'Content-Type: application/json' 'localhost:9200/user/admin/_search?pretty'  -d '
{
"query": {
"bool": {
"must": [
{ "match": { "desc": "管理" } },
{ "match": { "desc": "超级" } }
]
}
}
}'

批量执行

支持在一次 API 调用中,对不同的索引进行操作

支持四种类型操作

  • index
  • create
  • update
  • delete

操作中单条操作失败,并不会影响其他操作。

返回结果包括了每一条操作执行的结果。

1
2
3
4
5
6
7
8
POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test2", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }

说明:上面的示例如果执行多次,执行结果都不一样。

批量读取

读多个索引

1
2
3
4
5
6
7
8
9
10
11
12
13
GET /_mget
{
"docs" : [
{
"_index" : "test",
"_id" : "1"
},
{
"_index" : "test",
"_id" : "2"
}
]
}

读一个索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
GET /test/_mget
{
"docs" : [
{

"_id" : "1"
},
{

"_id" : "2"
}
]
}

GET /_mget
{
"docs" : [
{
"_index" : "test",
"_id" : "1",
"_source" : false
},
{
"_index" : "test",
"_id" : "2",
"_source" : ["field3", "field4"]
},
{
"_index" : "test",
"_id" : "3",
"_source" : {
"include": ["user"],
"exclude": ["user.location"]
}
}
]
}

批量查询

1
2
3
4
5
POST kibana_sample_data_ecommerce/_msearch
{}
{"query" : {"match_all" : {}},"size":1}
{"index" : "kibana_sample_data_flights"}
{"query" : {"match_all" : {}},"size":2}

URI Search 查询语义

Elasticsearch URI Search 遵循 QueryString 查询语义,其形式如下:

1
2
3
4
GET /movies/_search?q=2012&df=title&sort=year:desc&from=0&size=10&timeout=1s
{
"profile": true
}
  • q 指定查询语句,使用 QueryString 语义
  • df 默认字段,不指定时
  • sort 排序:from 和 size 用于分页
  • profile 可以查看查询时如何被执行的
1
2
3
4
GET /movies/_search?q=title:2012&sort=year:desc&from=0&size=10&timeout=1s
{
"profile":"true"
}

Term 和 Phrase

Beautiful Mind 等效于 Beautiful OR Mind

“Beautiful Mind” 等效于 Beautiful AND Mind

1
2
3
4
5
6
7
8
9
10
11
# Term 查询
GET /movies/_search?q=title:Beautiful Mind
{
"profile":"true"
}

# 使用引号,Phrase 查询
GET /movies/_search?q=title:"Beautiful Mind"
{
"profile":"true"
}

分组与引号

title:(Beautiful AND Mind)

title=”Beautiful Mind”

AND、OR、NOT 或者 &&、||、!

注意:AND、OR、NOT 必须大写

1
2
3
4
5
6
7
8
9
10
# 布尔操作符
GET /movies/_search?q=title:(Beautiful AND Mind)
{
"profile":"true"
}

GET /movies/_search?q=title:(Beautiful NOT Mind)
{
"profile":"true"
}

范围查询

  • [] 表示闭区间
  • {} 表示开区间

示例:

1
2
3
4
5
6
7
8
9
10
# 范围查询 ,区间写法
GET /movies/_search?q=title:beautiful AND year:{2010 TO 2018%7D
{
"profile":"true"
}

GET /movies/_search?q=title:beautiful AND year:[* TO 2018]
{
"profile":"true"
}

算数符号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 2010 年以后的记录
GET /movies/_search?q=year:>2010
{
"profile":"true"
}

# 2010 年到 2018 年的记录
GET /movies/_search?q=year:(>2010 && <=2018)
{
"profile":"true"
}

# 2010 年到 2018 年的记录
GET /movies/_search?q=year:(+>2010 +<=2018)
{
"profile":"true"
}

通配符查询

  • ? 代表 1 个字符
  • * 代表 0 或多个字符

示例:

1
2
3
4
5
6
7
8
9
GET /movies/_search?q=title:mi?d
{
"profile":"true"
}

GET /movies/_search?q=title:b*
{
"profile":"true"
}

正则表达式

title:[bt]oy

模糊匹配与近似查询

示例:

1
2
3
4
5
6
7
8
9
10
11
# 相似度在 1 个字符以内
GET /movies/_search?q=title:beautifl~1
{
"profile":"true"
}

# 相似度在 2 个字符以内
GET /movies/_search?q=title:"Lord Rings"~2
{
"profile":"true"
}

Request Body & DSL

Elasticsearch 除了 URI Search 查询方式,还支持将查询语句通过 Http Request Body 发起查询。

1
2
3
4
5
6
7
GET /kibana_sample_data_ecommerce/_search?ignore_unavailable=true
{
"profile":"true",
"query": {
"match_all": {}
}
}

分页

1
2
3
4
5
6
7
8
9
GET /kibana_sample_data_ecommerce/_search?ignore_unavailable=true
{
"profile": "true",
"from": 0,
"size": 10,
"query": {
"match_all": {}
}
}

排序

最好在数字型或日期型字段上排序

因为对于多值类型或分析过的字段排序,系统会选一个值,无法得知该值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
GET /kibana_sample_data_ecommerce/_search?ignore_unavailable=true
{
"profile": "true",
"sort": [
{
"order_date": "desc"
}
],
"from": 1,
"size": 10,
"query": {
"match_all": {}
}
}

_source 过滤

如果 _source 没有存储,那就只返回匹配的文档的元数据

_source 支持使用通配符,如:_source["name*", "desc*"]

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
GET /kibana_sample_data_ecommerce/_search?ignore_unavailable=true
{
"profile": "true",
"_source": [
"order_date",
"category.keyword"
],
"from": 1,
"size": 10,
"query": {
"match_all": {}
}
}

脚本字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
GET /kibana_sample_data_ecommerce/_search?ignore_unavailable=true
{
"profile": "true",
"script_fields": {
"new_field": {
"script": {
"lang": "painless",
"source":"doc['order_date'].value+' hello'"
}
}
},
"from": 1,
"size": 10,
"query": {
"match_all": {}
}
}

使用查询表达式 - Match

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
POST movies/_search
{
"query": {
"match": {
"title": "last christmas"
}
}
}

POST movies/_search
{
"query": {
"match": {
"title": {
"query": "last christmas",
"operator": "and"
}
}
}
}

短语搜索 - Match Phrase

1
2
3
4
5
6
7
8
9
10
11
POST movies/_search
{
"query": {
"match_phrase": {
"title":{
"query": "last christmas"

}
}
}
}

集群 API

Elasticsearch 官方之 Cluster API

一些集群级别的 API 可能会在节点的子集上运行,这些节点可以用节点过滤器指定。例如,任务管理、节点统计和节点信息 API 都可以报告来自一组过滤节点而不是所有节点的结果。

节点过滤器以逗号分隔的单个过滤器列表的形式编写,每个过滤器从所选子集中添加或删除节点。每个过滤器可以是以下之一:

  • _all:将所有节点添加到子集
  • _local:将本地节点添加到子集
  • _master:将当前主节点添加到子集
  • 根据节点 ID 或节点名将匹配节点添加到子集
  • 根据 IP 地址或主机名将匹配节点添加到子集
  • 使用通配符,将节点名、地址名或主机名匹配的节点添加到子集
  • master:true, data:true, ingest:true, voting_only:true, ml:truecoordinating_only:true, 分别意味着将所有主节点、所有数据节点、所有摄取节点、所有仅投票节点、所有机器学习节点和所有协调节点添加到子集中。
  • master:false, data:false, ingest:false, voting_only:true, ml:falsecoordinating_only:false, 分别意味着将所有主节点、所有数据节点、所有摄取节点、所有仅投票节点、所有机器学习节点和所有协调节点排除在子集外。
  • 配对模式,使用 * 通配符,格式为 attrname:attrvalue,将所有具有自定义节点属性的节点添加到子集中,其名称和值与相应的模式匹配。自定义节点属性是通过 node.attr.attrname: attrvalue 形式在配置文件中设置的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 如果没有给出过滤器,默认是查询所有节点
GET /_nodes
# 查询所有节点
GET /_nodes/_all
# 查询本地节点
GET /_nodes/_local
# 查询主节点
GET /_nodes/_master
# 根据名称查询节点(支持通配符)
GET /_nodes/node_name_goes_here
GET /_nodes/node_name_goes_*
# 根据地址查询节点(支持通配符)
GET /_nodes/10.0.0.3,10.0.0.4
GET /_nodes/10.0.0.*
# 根据规则查询节点
GET /_nodes/_all,master:false
GET /_nodes/data:true,ingest:true
GET /_nodes/coordinating_only:true
GET /_nodes/master:true,voting_only:false
# 根据自定义属性查询节点(如:查询配置文件中含 node.attr.rack:2 属性的节点)
GET /_nodes/rack:2
GET /_nodes/ra*:2
GET /_nodes/ra*:2*

集群健康 API

1
2
3
4
GET /_cluster/health
GET /_cluster/health?level=shards
GET /_cluster/health/kibana_sample_data_ecommerce,kibana_sample_data_flights
GET /_cluster/health/kibana_sample_data_flights?level=shards

集群状态 API

集群状态 API 返回表示整个集群状态的元数据。

1
GET /_cluster/state

节点 API

Elasticsearch 官方之 cat Nodes API——返回有关集群节点的信息。

1
2
3
4
# 查看默认的字段
GET /_cat/nodes?v=true
# 查看指定的字段
GET /_cat/nodes?v=true&h=id,ip,port,v,m

分片 API

Elasticsearch 官方之 cat Shards API——shards 命令是哪些节点包含哪些分片的详细视图。它会告诉你它是主还是副本、文档数量、它在磁盘上占用的字节数以及它所在的节点。

1
2
3
4
5
6
# 查看默认的字段
GET /_cat/shards
# 根据名称查询分片(支持通配符)
GET /_cat/shards/my-index-*
# 查看指定的字段
GET /_cat/shards?h=index,shard,prirep,state,unassigned.reason

监控 API

Elasticsearch 中集群相关的健康、统计等相关的信息都是围绕着 cat API 进行的。

通过 GET 请求发送 cat,下面列出了所有可用的 API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
GET /_cat

=^.^=
/_cat/allocation
/_cat/shards
/_cat/shards/{index}
/_cat/master
/_cat/nodes
/_cat/tasks
/_cat/indices
/_cat/indices/{index}
/_cat/segments
/_cat/segments/{index}
/_cat/count
/_cat/count/{index}
/_cat/recovery
/_cat/recovery/{index}
/_cat/health
/_cat/pending_tasks
/_cat/aliases
/_cat/aliases/{alias}
/_cat/thread_pool
/_cat/thread_pool/{thread_pools}
/_cat/plugins
/_cat/fielddata
/_cat/fielddata/{fields}
/_cat/nodeattrs
/_cat/repositories
/_cat/snapshots/{repository}
/_cat/templates

参考资料

Elasticsearch 运维

Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。 作为 Elastic Stack 的核心,它集中存储您的数据,帮助您发现意料之中以及意料之外的情况。

Elasticsearch 安装

Elasticsearch 官方下载安装说明

(1)下载解压

访问 官方下载地址 ,选择需要的版本,下载解压到本地。

(2)运行

运行 bin/elasticsearch (Windows 系统上运行 bin\elasticsearch.bat )

(3)访问

执行 curl http://localhost:9200/ 测试服务是否启动

Elasticsearch 集群规划

ElasticSearch 集群需要根据业务实际情况去合理规划。

需要考虑的问题点:

  • 集群部署几个节点?
  • 有多少个索引?
  • 每个索引有多大数据量?
  • 每个索引有多少个分片?

一个参考规划:

  • 3 台机器,每台机器是 6 核 64G 的。
  • 我们 es 集群的日增量数据大概是 2000 万条,每天日增量数据大概是 500MB,每月增量数据大概是 6 亿,15G。目前系统已经运行了几个月,现在 es 集群里数据总量大概是 100G 左右。
  • 目前线上有 5 个索引(这个结合你们自己业务来,看看自己有哪些数据可以放 es 的),每个索引的数据量大概是 20G,所以这个数据量之内,我们每个索引分配的是 8 个 shard,比默认的 5 个 shard 多了 3 个 shard。

Elasticsearch 配置

ES 的默认配置文件为 config/elasticsearch.yml

基本配置说明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
cluster.name: elasticsearch
#配置es的集群名称,默认是elasticsearch,es会自动发现在同一网段下的es,如果在同一网段下有多个集群,就可以用这个属性来区分不同的集群。
node.name: 'Franz Kafka'
#节点名,默认随机指定一个name列表中名字,该列表在es的jar包中config文件夹里name.txt文件中,其中有很多作者添加的有趣名字。
node.master: true
#指定该节点是否有资格被选举成为node,默认是true,es是默认集群中的第一台机器为master,如果这台机挂了就会重新选举master。
node.data: true
#指定该节点是否存储索引数据,默认为true。
index.number_of_shards: 5
#设置默认索引分片个数,默认为5片。
index.number_of_replicas: 1
#设置默认索引副本个数,默认为1个副本。
path.conf: /path/to/conf
#设置配置文件的存储路径,默认是es根目录下的config文件夹。
path.data: /path/to/data
#设置索引数据的存储路径,默认是es根目录下的data文件夹,可以设置多个存储路径,用逗号隔开,例:
#path.data: /path/to/data1,/path/to/data2
path.work: /path/to/work
#设置临时文件的存储路径,默认是es根目录下的work文件夹。
path.logs: /path/to/logs
#设置日志文件的存储路径,默认是es根目录下的logs文件夹
path.plugins: /path/to/plugins
#设置插件的存放路径,默认是es根目录下的plugins文件夹
bootstrap.mlockall: true
#设置为true来锁住内存。因为当jvm开始swapping时es的效率会降低,所以要保证它不swap,可以把#ES_MIN_MEM和ES_MAX_MEM两个环境变量设置成同一个值,并且保证机器有足够的内存分配给es。同时也要#允许elasticsearch的进程可以锁住内存,linux下可以通过`ulimit -l unlimited`命令。
network.bind_host: 192.168.0.1
#设置绑定的ip地址,可以是ipv4或ipv6的,默认为0.0.0.0。
network.publish_host: 192.168.0.1
#设置其它节点和该节点交互的ip地址,如果不设置它会自动判断,值必须是个真实的ip地址。
network.host: 192.168.0.1
#这个参数是用来同时设置bind_host和publish_host上面两个参数。
transport.tcp.port: 9300
#设置节点间交互的tcp端口,默认是9300。
transport.tcp.compress: true
#设置是否压缩tcp传输时的数据,默认为false,不压缩。
http.port: 9200
#设置对外服务的http端口,默认为9200。
http.max_content_length: 100mb
#设置内容的最大容量,默认100mb
http.enabled: false
#是否使用http协议对外提供服务,默认为true,开启。
gateway.type: local
#gateway的类型,默认为local即为本地文件系统,可以设置为本地文件系统,分布式文件系统,hadoop的#HDFS,和amazon的s3服务器,其它文件系统的设置方法下次再详细说。
gateway.recover_after_nodes: 1
#设置集群中N个节点启动时进行数据恢复,默认为1。
gateway.recover_after_time: 5m
#设置初始化数据恢复进程的超时时间,默认是5分钟。
gateway.expected_nodes: 2
#设置这个集群中节点的数量,默认为2,一旦这N个节点启动,就会立即进行数据恢复。
cluster.routing.allocation.node_initial_primaries_recoveries: 4
#初始化数据恢复时,并发恢复线程的个数,默认为4。
cluster.routing.allocation.node_concurrent_recoveries: 2
#添加删除节点或负载均衡时并发恢复线程的个数,默认为4。
indices.recovery.max_size_per_sec: 0
#设置数据恢复时限制的带宽,如入100mb,默认为0,即无限制。
indices.recovery.concurrent_streams: 5
#设置这个参数来限制从其它分片恢复数据时最大同时打开并发流的个数,默认为5。
discovery.zen.minimum_master_nodes: 1
#设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点。默认为1,对于大的集群来说,可以设置大一点的值(2-4)
discovery.zen.ping.timeout: 3s
#设置集群中自动发现其它节点时ping连接超时时间,默认为3秒,对于比较差的网络环境可以高点的值来防止自动发现时出错。
discovery.zen.ping.multicast.enabled: false
#设置是否打开多播发现节点,默认是true。
discovery.zen.ping.unicast.hosts: ['host1', 'host2:port', 'host3[portX-portY]']
#设置集群中master节点的初始列表,可以通过这些节点来自动发现新加入集群的节点。

Elasticsearch FAQ

elasticsearch 不允许以 root 权限来运行

问题:在 Linux 环境中,elasticsearch 不允许以 root 权限来运行。

如果以 root 身份运行 elasticsearch,会提示这样的错误:

1
can not run elasticsearch as root

解决方法:使用非 root 权限账号运行 elasticsearch

1
2
3
4
5
6
7
8
# 创建用户组
groupadd elk
# 创建新用户,-g elk 设置其用户组为 elk,-p elk 设置其密码为 elk
useradd elk -g elk -p elk
# 更改 /opt 文件夹及内部文件的所属用户及组为 elk:elk
chown -R elk:elk /opt # 假设你的 elasticsearch 安装在 opt 目录下
# 切换账号
su elk

vm.max_map_count 不低于 262144

问题:vm.max_map_count 表示虚拟内存大小,它是一个内核参数。elasticsearch 默认要求 vm.max_map_count 不低于 262144。

1
max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]

解决方法:

你可以执行以下命令,设置 vm.max_map_count ,但是重启后又会恢复为原值。

1
sysctl -w vm.max_map_count=262144

持久性的做法是在 /etc/sysctl.conf 文件中修改 vm.max_map_count 参数:

1
2
echo "vm.max_map_count=262144" > /etc/sysctl.conf
sysctl -p

注意

如果运行环境为 docker 容器,可能会限制执行 sysctl 来修改内核参数。

这种情况下,你只能选择直接修改宿主机上的参数了。

nofile 不低于 65536

问题: nofile 表示进程允许打开的最大文件数。elasticsearch 进程要求可以打开的最大文件数不低于 65536。

1
max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]

解决方法:

/etc/security/limits.conf 文件中修改 nofile 参数:

1
2
echo "* soft nofile 65536" > /etc/security/limits.conf
echo "* hard nofile 131072" > /etc/security/limits.conf

nproc 不低于 2048

问题: nproc 表示最大线程数。elasticsearch 要求最大线程数不低于 2048。

1
max number of threads [1024] for user [user] is too low, increase to at least [2048]

解决方法:

/etc/security/limits.conf 文件中修改 nproc 参数:

1
2
echo "* soft nproc 2048" > /etc/security/limits.conf
echo "* hard nproc 4096" > /etc/security/limits.conf

参考资料

Elastic 快速入门

开源协议:Apache 2.0

1. 简介

1.1. Elastic Stack 是什么

Elastic StackELK Stack

ELK 是指 Elastic 公司旗下三款产品 ElasticSearchLogstashKibana 的首字母组合。

  • Elasticsearch 是一个搜索和分析引擎。
  • Logstash 是服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到诸如 Elasticsearch 等“存储库”中。
  • Kibana 则可以让用户在 Elasticsearch 中使用图形和图表对数据进行可视化。

而 Elastic Stack 是 ELK Stack 的更新换代产品,最新产品引入了轻量型的单一功能数据采集器,并把它们叫做 Beats

1.2. 为什么使用 Elastic Stack

对于有一定规模的公司来说,通常会很多个应用,并部署在大量的服务器上。运维和开发人员常常需要通过查看日志来定位问题。如果应用是集群化部署,试想如果登录一台台服务器去查看日志,是多么费时费力。

而通过 ELK 这套解决方案,可以同时实现日志收集、日志搜索和日志分析的功能。

1.3. Elastic Stack 架构

img

说明

以上是 Elastic Stack 的一个架构图。从图中可以清楚的看到数据流向。

  • Beats 是单一用途的数据传输平台,它可以将多台机器的数据发送到 Logstash 或 ElasticSearch。但 Beats 并不是不可或缺的一环,所以本文中暂不介绍。

  • Logstash 是一个动态数据收集管道。支持以 TCP/UDP/HTTP 多种方式收集数据(也可以接受 Beats 传输来的数据),并对数据做进一步丰富或提取字段处理。

  • ElasticSearch 是一个基于 JSON 的分布式的搜索和分析引擎。作为 ELK 的核心,它集中存储数据。

  • Kibana 是 ELK 的用户界面。它将收集的数据进行可视化展示(各种报表、图形化数据),并提供配置、管理 ELK 的界面。

2. ElasticSearch

Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。 作为 Elastic Stack 的核心,它集中存储您的数据,帮助您发现意料之中以及意料之外的情况。

2.1. ElasticSearch 简介

Elasticsearch 基于搜索库 Lucene 开发。ElasticSearch 隐藏了 Lucene 的复杂性,提供了简单易用的 REST API / Java API 接口(另外还有其他语言的 API 接口)。

ElasticSearch 可以视为一个文档存储,它将复杂数据结构序列化为 JSON 存储

ElasticSearch 是近乎于实时的全文搜素,这是指:

  • 从写入数据到数据可以被搜索,存在较小的延迟(大概是 1s)
  • 基于 ES 执行搜索和分析可以达到秒级

2.1.1. 核心概念

  • 索引(Index) 可以认为是文档(document)的优化集合。
  • 每个 文档(document) 都是字段(field)的集合。
  • 字段(field) 是包含数据的键值对。
  • 默认情况下,Elasticsearch 对每个字段中的所有数据建立索引,并且每个索引字段都具有专用的优化数据结构。
  • 每个索引里可以有一个或者多个类型(type)。类型(type) 是 index 的一个逻辑分类,
  • 当单台机器不足以存储大量数据时,Elasticsearch 可以将一个索引中的数据切分为多个 分片(shard)分片(shard) 分布在多台服务器上存储。有了 shard 就可以横向扩展,存储更多数据,让搜索和分析等操作分布到多台服务器上去执行,提升吞吐量和性能。每个 shard 都是一个 lucene index。
  • 任何一个服务器随时可能故障或宕机,此时 shard 可能就会丢失,因此可以为每个 shard 创建多个 **副本(replica)**。replica 可以在 shard 故障时提供备用服务,保证数据不丢失,多个 replica 还可以提升搜索操作的吞吐量和性能。primary shard(建立索引时一次设置,不能修改,默认 5 个),replica shard(随时修改数量,默认 1 个),默认每个索引 10 个 shard,5 个 primary shard,5 个 replica shard,最小的高可用配置,是 2 台服务器。

2.2. ElasticSearch 原理

2.2.1. ES 写数据过程

  • 客户端选择一个 node 发送请求过去,这个 node 就是 coordinating node(协调节点)。
  • coordinating node 对 document 进行路由,将请求转发给对应的 node(有 primary shard)。
  • 实际的 node 上的 primary shard 处理请求,然后将数据同步到 replica node
  • coordinating node 如果发现 primary node 和所有 replica node 都搞定之后,就返回响应结果给客户端。

es-write

2.2.2. es 读数据过程

可以通过 doc id 来查询,会根据 doc id 进行 hash,判断出来当时把 doc id 分配到了哪个 shard 上面去,从那个 shard 去查询。

  • 客户端发送请求到任意一个 node,成为 coordinate node
  • coordinate nodedoc id 进行哈希路由,将请求转发到对应的 node,此时会使用 round-robin 随机轮询算法,在 primary shard 以及其所有 replica 中随机选择一个,让读请求负载均衡。
  • 接收请求的 node 返回 document 给 coordinate node
  • coordinate node 返回 document 给客户端。

2.2.3. 写数据底层原理

es-write-detail

先写入内存 buffer,在 buffer 里的时候数据是搜索不到的;同时将数据写入 translog 日志文件。

如果 buffer 快满了,或者到一定时间,就会将内存 buffer 数据 refresh 到一个新的 segment file 中,但是此时数据不是直接进入 segment file 磁盘文件,而是先进入 os cache 。这个过程就是 refresh

每隔 1 秒钟,es 将 buffer 中的数据写入一个新的 segment file,每秒钟会产生一个新的磁盘文件 segment file,这个 segment file 中就存储最近 1 秒内 buffer 中写入的数据。

但是如果 buffer 里面此时没有数据,那当然不会执行 refresh 操作,如果 buffer 里面有数据,默认 1 秒钟执行一次 refresh 操作,刷入一个新的 segment file 中。

操作系统里面,磁盘文件其实都有一个东西,叫做 os cache,即操作系统缓存,就是说数据写入磁盘文件之前,会先进入 os cache,先进入操作系统级别的一个内存缓存中去。只要 buffer 中的数据被 refresh 操作刷入 os cache中,这个数据就可以被搜索到了。

为什么叫 es 是准实时的? NRT,全称 near real-time。默认是每隔 1 秒 refresh 一次的,所以 es 是准实时的,因为写入的数据 1 秒之后才能被看到。可以通过 es 的 restful api 或者 java api手动执行一次 refresh 操作,就是手动将 buffer 中的数据刷入 os cache中,让数据立马就可以被搜索到。只要数据被输入 os cache 中,buffer 就会被清空了,因为不需要保留 buffer 了,数据在 translog 里面已经持久化到磁盘去一份了。

重复上面的步骤,新的数据不断进入 buffer 和 translog,不断将 buffer 数据写入一个又一个新的 segment file 中去,每次 refresh 完 buffer 清空,translog 保留。随着这个过程推进,translog 会变得越来越大。当 translog 达到一定长度的时候,就会触发 commit 操作。

commit 操作发生第一步,就是将 buffer 中现有数据 refreshos cache 中去,清空 buffer。然后,将一个 commit point 写入磁盘文件,里面标识着这个 commit point 对应的所有 segment file,同时强行将 os cache 中目前所有的数据都 fsync 到磁盘文件中去。最后清空 现有 translog 日志文件,重启一个 translog,此时 commit 操作完成。

这个 commit 操作叫做 flush。默认 30 分钟自动执行一次 flush,但如果 translog 过大,也会触发 flush。flush 操作就对应着 commit 的全过程,我们可以通过 es api,手动执行 flush 操作,手动将 os cache 中的数据 fsync 强刷到磁盘上去。

translog 日志文件的作用是什么?你执行 commit 操作之前,数据要么是停留在 buffer 中,要么是停留在 os cache 中,无论是 buffer 还是 os cache 都是内存,一旦这台机器死了,内存中的数据就全丢了。所以需要将数据对应的操作写入一个专门的日志文件 translog 中,一旦此时机器宕机,再次重启的时候,es 会自动读取 translog 日志文件中的数据,恢复到内存 buffer 和 os cache 中去。

translog 其实也是先写入 os cache 的,默认每隔 5 秒刷一次到磁盘中去,所以默认情况下,可能有 5 秒的数据会仅仅停留在 buffer 或者 translog 文件的 os cache 中,如果此时机器挂了,会丢失 5 秒钟的数据。但是这样性能比较好,最多丢 5 秒的数据。也可以将 translog 设置成每次写操作必须是直接 fsync 到磁盘,但是性能会差很多。

实际上你在这里,如果面试官没有问你 es 丢数据的问题,你可以在这里给面试官炫一把,你说,其实 es 第一是准实时的,数据写入 1 秒后可以搜索到;可能会丢失数据的。有 5 秒的数据,停留在 buffer、translog os cache、segment file os cache 中,而不在磁盘上,此时如果宕机,会导致 5 秒的数据丢失

总结一下,数据先写入内存 buffer,然后每隔 1s,将数据 refresh 到 os cache,到了 os cache 数据就能被搜索到(所以我们才说 es 从写入到能被搜索到,中间有 1s 的延迟)。每隔 5s,将数据写入 translog 文件(这样如果机器宕机,内存数据全没,最多会有 5s 的数据丢失),translog 大到一定程度,或者默认每隔 30mins,会触发 commit 操作,将缓冲区的数据都 flush 到 segment file 磁盘文件中。

数据写入 segment file 之后,同时就建立好了倒排索引。

2.2.4. 删除/更新数据底层原理

如果是删除操作,commit 的时候会生成一个 .del 文件,里面将某个 doc 标识为 deleted 状态,那么搜索的时候根据 .del 文件就知道这个 doc 是否被删除了。

如果是更新操作,就是将原来的 doc 标识为 deleted 状态,然后新写入一条数据。

buffer 每 refresh 一次,就会产生一个 segment file,所以默认情况下是 1 秒钟一个 segment file,这样下来 segment file 会越来越多,此时会定期执行 merge。每次 merge 的时候,会将多个 segment file 合并成一个,同时这里会将标识为 deleted 的 doc 给物理删除掉,然后将新的 segment file 写入磁盘,这里会写一个 commit point,标识所有新的 segment file,然后打开 segment file 供搜索使用,同时删除旧的 segment file

2.2.5. 底层 lucene

简单来说,lucene 就是一个 jar 包,里面包含了封装好的各种建立倒排索引的算法代码。我们用 Java 开发的时候,引入 lucene jar,然后基于 lucene 的 api 去开发就可以了。

通过 lucene,我们可以将已有的数据建立索引,lucene 会在本地磁盘上面,给我们组织索引的数据结构。

2.2.6. 倒排索引

在搜索引擎中,每个文档都有一个对应的文档 ID,文档内容被表示为一系列关键词的集合。例如,文档 1 经过分词,提取了 20 个关键词,每个关键词都会记录它在文档中出现的次数和出现位置。

那么,倒排索引就是关键词到文档 ID 的映射,每个关键词都对应着一系列的文件,这些文件中都出现了关键词。

举个栗子。

有以下文档:

DocId Doc
1 谷歌地图之父跳槽 Facebook
2 谷歌地图之父加盟 Facebook
3 谷歌地图创始人拉斯离开谷歌加盟 Facebook
4 谷歌地图之父跳槽 Facebook 与 Wave 项目取消有关
5 谷歌地图之父拉斯加盟社交网站 Facebook

对文档进行分词之后,得到以下倒排索引

WordId Word DocIds
1 谷歌 1,2,3,4,5
2 地图 1,2,3,4,5
3 之父 1,2,4,5
4 跳槽 1,4
5 Facebook 1,2,3,4,5
6 加盟 2,3,5
7 创始人 3
8 拉斯 3,5
9 离开 3
10 4
.. .. ..

另外,实用的倒排索引还可以记录更多的信息,比如文档频率信息,表示在文档集合中有多少个文档包含某个单词。

那么,有了倒排索引,搜索引擎可以很方便地响应用户的查询。比如用户输入查询 Facebook,搜索系统查找倒排索引,从中读出包含这个单词的文档,这些文档就是提供给用户的搜索结果。

要注意倒排索引的两个重要细节:

  • 倒排索引中的所有词项对应一个或多个文档;
  • 倒排索引中的词项根据字典顺序升序排列

上面只是一个简单的栗子,并没有严格按照字典顺序升序排列。

3. Logstash

Logstash 是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。

3.1. Logstash 简介

Logstash 可以传输和处理你的日志、事务或其他数据。

Logstash 是 Elasticsearch 的最佳数据管道。

Logstash 是插件式管理模式,在输入、过滤、输出以及编码过程中都可以使用插件进行定制。Logstash 社区有超过 200 种可用插件。

3.2. Logstash 原理

Logstash 有两个必要元素:inputoutput ,一个可选元素:filter

这三个元素,分别代表 Logstash 事件处理的三个阶段:输入 > 过滤器 > 输出。

img

  • input - 负责从数据源采集数据。
  • filter - 将数据修改为你指定的格式或内容。
  • output - 将数据传输到目的地。

在实际应用场景中,通常输入、输出、过滤器不止一个。Logstash 的这三个元素都使用插件式管理方式,用户可以根据应用需要,灵活的选用各阶段需要的插件,并组合使用。

4. Beats

Beats 是安装在服务器上的数据中转代理

Beats 可以将数据直接传输到 Elasticsearch 或传输到 Logstash 。

img

Beats 有多种类型,可以根据实际应用需要选择合适的类型。

常用的类型有:

  • Packetbeat:网络数据包分析器,提供有关您的应用程序服务器之间交换的事务的信息。
  • Filebeat:从您的服务器发送日志文件。
  • Metricbeat:是一个服务器监视代理程序,它定期从服务器上运行的操作系统和服务收集指标。
  • Winlogbeat:提供 Windows 事件日志。

4.1. Filebeat 简介

_由于本人仅接触过 Filebeat,所以本文只介绍 Beats 组件中的 Filebeat_。

相比 Logstash,FileBeat 更加轻量化。

在任何环境下,应用程序都有停机的可能性。 Filebeat 读取并转发日志行,如果中断,则会记住所有事件恢复联机状态时所在位置。

Filebeat 带有内部模块(auditd,Apache,Nginx,System 和 MySQL),可通过一个指定命令来简化通用日志格式的收集,解析和可视化。

FileBeat 不会让你的管道超负荷。FileBeat 如果是向 Logstash 传输数据,当 Logstash 忙于处理数据,会通知 FileBeat 放慢读取速度。一旦拥塞得到解决,FileBeat 将恢复到原来的速度并继续传播。

img

4.2. Filebeat 原理

Filebeat 有两个主要组件:

  • harvester:负责读取一个文件的内容。它会逐行读取文件内容,并将内容发送到输出目的地。
  • prospector:负责管理 harvester 并找到所有需要读取的文件源。比如类型是日志,prospector 就会遍历制定路径下的所有匹配要求的文件。
1
2
3
4
5
filebeat.prospectors:
- type: log
paths:
- /var/log/*.log
- /var/path2/*.log

Filebeat 保持每个文件的状态,并经常刷新注册表文件中的磁盘状态。状态用于记住 harvester 正在读取的最后偏移量,并确保发送所有日志行。

Filebeat 将每个事件的传递状态存储在注册表文件中。所以它能保证事件至少传递一次到配置的输出,没有数据丢失。

5. 运维

6. 参考资料

Elastic 技术栈之 Filebeat

简介

Beats 是安装在服务器上的数据中转代理。

Beats 可以将数据直接传输到 Elasticsearch 或传输到 Logstash 。

img

Beats 有多种类型,可以根据实际应用需要选择合适的类型。

常用的类型有:

  • Packetbeat:网络数据包分析器,提供有关您的应用程序服务器之间交换的事务的信息。
  • Filebeat:从您的服务器发送日志文件。
  • Metricbeat:是一个服务器监视代理程序,它定期从服务器上运行的操作系统和服务收集指标。
  • Winlogbeat:提供 Windows 事件日志。

参考

更多 Beats 类型可以参考:community-beats

说明

由于本人工作中只应用了 FileBeat,所以后面内容仅介绍 FileBeat 。

FileBeat 的作用

相比 Logstash,FileBeat 更加轻量化。

在任何环境下,应用程序都有停机的可能性。 Filebeat 读取并转发日志行,如果中断,则会记住所有事件恢复联机状态时所在位置。

Filebeat 带有内部模块(auditd,Apache,Nginx,System 和 MySQL),可通过一个指定命令来简化通用日志格式的收集,解析和可视化。

FileBeat 不会让你的管道超负荷。FileBeat 如果是向 Logstash 传输数据,当 Logstash 忙于处理数据,会通知 FileBeat 放慢读取速度。一旦拥塞得到解决,FileBeat 将恢复到原来的速度并继续传播。

img

安装

Unix / Linux 系统建议使用下面方式安装,因为比较通用。

1
2
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.1.1-linux-x86_64.tar.gz
tar -zxf filebeat-6.1.1-linux-x86_64.tar.gz

参考

更多内容可以参考:filebeat-installation

配置

配置文件

首先,需要知道的是:filebeat.yml 是 filebeat 的配置文件。配置文件的路径会因为你安装方式的不同而变化。

Beat 所有系列产品的配置文件都基于 YAML 格式,FileBeat 当然也不例外。

filebeat.yml 部分配置示例:

1
2
3
4
5
6
7
8
filebeat:
prospectors:
- type: log
paths:
- /var/log/*.log
multiline:
pattern: '^['
match: after

参考

更多 filebeat 配置内容可以参考:配置 filebeat

更多 filebeat.yml 文件格式内容可以参考:filebeat.yml 文件格式

重要配置项

filebeat.prospectors

(文件监视器)用于指定需要关注的文件。

示例

1
2
3
4
5
filebeat.prospectors:
- type: log
enabled: true
paths:
- /var/log/*.log

output.elasticsearch

如果你希望使用 filebeat 直接向 elasticsearch 输出数据,需要配置 output.elasticsearch 。

示例

1
2
output.elasticsearch:
hosts: ['192.168.1.42:9200']

output.logstash

如果你希望使用 filebeat 向 logstash 输出数据,然后由 logstash 再向 elasticsearch 输出数据,需要配置 output.logstash。

注意

相比于向 elasticsearch 输出数据,个人更推荐向 logstash 输出数据。

因为 logstash 和 filebeat 一起工作时,如果 logstash 忙于处理数据,会通知 FileBeat 放慢读取速度。一旦拥塞得到解决,FileBeat 将恢复到原来的速度并继续传播。这样,可以减少管道超负荷的情况。

示例

1
2
output.logstash:
hosts: ['127.0.0.1:5044']

此外,还需要在 logstash 的配置文件(如 logstash.conf)中指定 beats input 插件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
input {
beats {
port => 5044 # 此端口需要与 filebeat.yml 中的端口相同
}
}

# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }

output {
elasticsearch {
hosts => "localhost:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}

setup.kibana

如果打算使用 Filebeat 提供的 Kibana 仪表板,需要配置 setup.kibana 。

示例

1
2
setup.kibana:
host: 'localhost:5601'

setup.template.settings

在 Elasticsearch 中,索引模板用于定义设置和映射,以确定如何分析字段。

在 Filebeat 中,setup.template.settings 用于配置索引模板。

Filebeat 推荐的索引模板文件由 Filebeat 软件包安装。如果您接受 filebeat.yml 配置文件中的默认配置,Filebeat 在成功连接到 Elasticsearch 后自动加载模板。

您可以通过在 Filebeat 配置文件中配置模板加载选项来禁用自动模板加载,或加载自己的模板。您还可以设置选项来更改索引和索引模板的名称。

参考

更多内容可以参考:filebeat-template

说明

如无必要,使用 Filebeat 配置文件中的默认索引模板即可。

setup.dashboards

Filebeat 附带了示例 Kibana 仪表板。在使用仪表板之前,您需要创建索引模式 filebeat- *,并将仪表板加载到 Kibana 中。为此,您可以运行 setup 命令或在 filebeat.yml 配置文件中配置仪表板加载。

为了在 Kibana 中加载 Filebeat 的仪表盘,需要在 filebeat.yml 配置中启动开关:

1
setup.dashboards.enabled: true

参考

更多内容可以参考:configuration-dashboards

命令

filebeat 提供了一系列命令来完成各种功能。

执行命令方式:

1
./filebeat COMMAND

参考

更多内容可以参考:command-line-options

说明

个人认为命令行没有必要一一掌握,因为绝大部分功能都可以通过配置来完成。且通过命令行指定功能这种方式要求每次输入同样参数,不利于固化启动方式。

最重要的当然是启动命令 run 了。

示例 指定配置文件启动

1
2
./filebeat run -e -c filebeat.yml -d "publish"
./filebeat -e -c filebeat.yml -d "publish" # run 可以省略

模块

Filebeat 提供了一套预构建的模块,让您可以快速实施和部署日志监视解决方案,并附带示例仪表板和数据可视化。这些模块支持常见的日志格式,例如 Nginx,Apache2 和 MySQL 等。

运行模块的步骤

  • 配置 elasticsearch 和 kibana
1
2
3
4
5
6
7
8
output.elasticsearch:
hosts: ["myEShost:9200"]
username: "elastic"
password: "elastic"
setup.kibana:
host: "mykibanahost:5601"
username: "elastic"
password: "elastic

username 和 password 是可选的,如果不需要认证则不填。

  • 初始化环境

执行下面命令,filebeat 会加载推荐索引模板。

1
./filebeat setup -e
  • 指定模块

执行下面命令,指定希望加载的模块。

1
./filebeat -e --modules system,nginx,mysql

参考

更多内容可以参考: 配置 filebeat 模块 | filebeat 支持模块

原理

Filebeat 有两个主要组件:

harvester:负责读取一个文件的内容。它会逐行读取文件内容,并将内容发送到输出目的地。

prospector:负责管理 harvester 并找到所有需要读取的文件源。比如类型是日志,prospector 就会遍历制定路径下的所有匹配要求的文件。

1
2
3
4
5
filebeat.prospectors:
- type: log
paths:
- /var/log/*.log
- /var/path2/*.log

Filebeat 保持每个文件的状态,并经常刷新注册表文件中的磁盘状态。状态用于记住 harvester 正在读取的最后偏移量,并确保发送所有日志行。

Filebeat 将每个事件的传递状态存储在注册表文件中。所以它能保证事件至少传递一次到配置的输出,没有数据丢失。

资料

Beats 官方文档

Filebeat 运维

Beats 平台集合了多种单一用途数据采集器。它们从成百上千或成千上万台机器和系统向 Logstash 或 Elasticsearch 发送数据。

因为我只接触过 Filebeat,所有本文仅介绍 Filebeat 的日常运维。

1. Filebeat 安装

1.1. 环境要求

版本:Elastic Stack 7.4

1.2. 安装步骤

Unix / Linux 系统建议使用下面方式安装,因为比较通用。

1
2
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.1.1-linux-x86_64.tar.gz
tar -zxf filebeat-6.1.1-linux-x86_64.tar.gz

更多内容可以参考:filebeat-installation

2. Filebeat 配置

首先,必须要知道的是:filebeat.yml 是 filebeat 的配置文件。其路径会因为你安装方式而有所不同。

Beat 所有系列产品的配置文件都基于 YAML 格式,FileBeat 当然也不例外。

更多 filebeat 配置内容可以参考:配置 filebeat

更多 filebeat.yml 文件格式内容可以参考:filebeat.yml 文件格式

filebeat.yml 部分配置示例:

1
2
3
4
5
6
7
8
filebeat:
prospectors:
- type: log
paths:
- /var/log/*.log
multiline:
pattern: '^['
match: after

2.1. 重要配置项

下面我将列举 Filebeat 的较为重要的配置项。

如果想了解更多配置信息,可以参考:

更多 filebeat 配置内容可以参考:配置 filebeat

更多 filebeat.yml 文件格式内容可以参考:filebeat.yml 文件格式

2.1.1. filebeat.prospectors

(文件监视器)用于指定需要关注的文件。

示例

1
2
3
4
5
filebeat.prospectors:
- type: log
enabled: true
paths:
- /var/log/*.log

2.1.2. output.elasticsearch

如果你希望使用 filebeat 直接向 elasticsearch 输出数据,需要配置 output.elasticsearch 。

示例

1
2
output.elasticsearch:
hosts: ['192.168.1.42:9200']

2.1.3. output.logstash

如果你希望使用 filebeat 向 logstash 输出数据,然后由 logstash 再向 elasticsearch 输出数据,需要配置 output.logstash。

注意

相比于向 elasticsearch 输出数据,个人更推荐向 logstash 输出数据。

因为 logstash 和 filebeat 一起工作时,如果 logstash 忙于处理数据,会通知 FileBeat 放慢读取速度。一旦拥塞得到解决,FileBeat 将恢复到原来的速度并继续传播。这样,可以减少管道超负荷的情况。

示例

1
2
output.logstash:
hosts: ['127.0.0.1:5044']

此外,还需要在 logstash 的配置文件(如 logstash.conf)中指定 beats input 插件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
input {
beats {
port => 5044 # 此端口需要与 filebeat.yml 中的端口相同
}
}

# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }

output {
elasticsearch {
hosts => "localhost:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}

2.1.4. setup.kibana

如果打算使用 Filebeat 提供的 Kibana 仪表板,需要配置 setup.kibana 。

示例

1
2
setup.kibana:
host: 'localhost:5601'

2.1.5. setup.template.settings

在 Elasticsearch 中,索引模板用于定义设置和映射,以确定如何分析字段。

在 Filebeat 中,setup.template.settings 用于配置索引模板。

Filebeat 推荐的索引模板文件由 Filebeat 软件包安装。如果您接受 filebeat.yml 配置文件中的默认配置,Filebeat 在成功连接到 Elasticsearch 后自动加载模板。

您可以通过在 Filebeat 配置文件中配置模板加载选项来禁用自动模板加载,或加载自己的模板。您还可以设置选项来更改索引和索引模板的名称。

参考

更多内容可以参考:filebeat-template

说明

如无必要,使用 Filebeat 配置文件中的默认索引模板即可。

2.1.6. setup.dashboards

Filebeat 附带了示例 Kibana 仪表板。在使用仪表板之前,您需要创建索引模式 filebeat- *,并将仪表板加载到 Kibana 中。为此,您可以运行 setup 命令或在 filebeat.yml 配置文件中配置仪表板加载。

为了在 Kibana 中加载 Filebeat 的仪表盘,需要在 filebeat.yml 配置中启动开关:

1
setup.dashboards.enabled: true

参考

更多内容可以参考:configuration-dashboards

3. Filebeat 命令

filebeat 提供了一系列命令来完成各种功能。

执行命令方式:

1
./filebeat COMMAND

参考

更多内容可以参考:command-line-options

说明

个人认为命令行没有必要一一掌握,因为绝大部分功能都可以通过配置来完成。且通过命令行指定功能这种方式要求每次输入同样参数,不利于固化启动方式。

最重要的当然是启动命令 run 了。

示例 指定配置文件启动

1
2
./filebeat run -e -c filebeat.yml -d "publish"
./filebeat -e -c filebeat.yml -d "publish" # run 可以省略

4. Filebeat 模块

FilebeatMetricbeat 内部集成了一系列模块,用以简化常见日志格式(例如 NGINX、Apache 或诸如 Redis 或 Docker 等系统指标)的收集、解析和可视化过程。

  • 配置 elasticsearch 和 kibana
1
2
3
4
5
6
7
8
output.elasticsearch:
hosts: ["myEShost:9200"]
username: "elastic"
password: "elastic"
setup.kibana:
host: "mykibanahost:5601"
username: "elastic"
password: "elastic

username 和 password 是可选的,如果不需要认证则不填。

  • 初始化环境

执行下面命令,filebeat 会加载推荐索引模板。

1
./filebeat setup -e
  • 指定模块

执行下面命令,指定希望加载的模块。

1
./filebeat -e --modules system,nginx,mysql

更多内容可以参考:

5. 参考资料

Elastic 技术栈之 Kibana

Discover

单击侧面导航栏中的 Discover ,可以显示 Kibana 的数据查询功能功能。

img

在搜索栏中,您可以输入 Elasticsearch 查询条件来搜索您的数据。您可以在 Discover 页面中浏览结果并在 Visualize 页面中创建已保存搜索条件的可视化。

当前索引模式显示在查询栏下方。索引模式确定提交查询时搜索哪些索引。要搜索一组不同的索引,请从下拉菜单中选择不同的模式。要添加索引模式(index pattern),请转至 Management/Kibana/Index Patterns 并单击 Add New

您可以使用字段名称和您感兴趣的值构建搜索。对于数字字段,可以使用比较运算符,如大于(>),小于(<)或等于(=)。您可以将元素与逻辑运算符 ANDORNOT 链接,全部使用大写。

默认情况下,每个匹配文档都显示所有字段。要选择要显示的文档字段,请将鼠标悬停在“可用字段”列表上,然后单击要包含的每个字段旁边的添加按钮。例如,如果只添加 account_number,则显示将更改为包含五个帐号的简单列表:

img

查询语义

kibana 的搜索栏遵循 query-string-syntax 文档中所说明的查询语义。

这里说明一些最基本的查询语义。

查询字符串会被解析为一系列的术语和运算符。一个术语可以是一个单词(如:quick、brown)或用双引号包围的短语(如”quick brown”)。

查询操作允许您自定义搜索 - 下面介绍了可用的选项。

字段名称

正如查询字符串查询中所述,将在搜索条件中搜索 default_field,但可以在查询语法中指定其他字段:

例如:

  • 查询 status 字段中包含 active 关键字
1
status:active
  • title 字段包含 quickbrown 关键字。如果您省略 OR 运算符,则将使用默认运算符
1
2
title:(quick OR brown)
title:(quick brown)
  • author 字段查找精确的短语 “john smith”,即精确查找。
1
author:"John Smith"
  • 任意字段 book.titlebook.contentbook.date 都包含 quickbrown(注意我们需要如何使用 \* 表示通配符)
1
book.\*:(quick brown)
  • title 字段包含任意非 null 值
1
_exists_:title

通配符

ELK 提供了 ? 和 * 两个通配符。

  • ? 表示任意单个字符;
  • * 表示任意零个或多个字符。
1
qu?ck bro*

注意:通配符查询会使用大量的内存并且执行性能较为糟糕,所以请慎用。 > 提示:纯通配符 * 被写入 exsits 查询,从而提高了查询效率。因此,通配符 field:* 将匹配包含空值的文档,如:{“field”:“”},但是如果字段丢失或显示将值置为 null 则不匹配,如:“field”:null} > 提示:在一个单词的开头(例如:*ing)使用通配符这种方式的查询量特别大,因为索引中的所有术语都需要检查,以防万一匹配。通过将 allow_leading_wildcard 设置为 false,可以禁用。

正则表达式

可以通过 / 将正则表达式包裹在查询字符串中进行查询

例:

1
name:/joh?n(ath[oa]n)/

支持的正则表达式语义可以参考:Regular expression syntax

模糊查询

我们可以使用 ~ 运算符来进行模糊查询。

例:

假设我们实际想查询

1
quick brown forks

但是,由于拼写错误,我们的查询关键字变成如下情况,依然可以查到想要的结果。

1
quikc\~ brwn\~ foks\~

这种模糊查询使用 Damerau-Levenshtein 距离来查找所有匹配最多两个更改的项。所谓的更改是指单个字符的插入,删除或替换,或者两个相邻字符的换位。

默认编辑距离为 2,但编辑距离为 1 应足以捕捉所有人类拼写错误的 80%。它可以被指定为:

1
quikc\~1

近似检索

尽管短语查询(例如,john smith)期望所有的词条都是完全相同的顺序,但是近似查询允许指定的单词进一步分开或以不同的顺序排列。与模糊查询可以为单词中的字符指定最大编辑距离一样,近似搜索也允许我们指定短语中单词的最大编辑距离:

1
"fox quick"\~5

字段中的文本越接近查询字符串中指定的原始顺序,该文档就越被认为是相关的。当与上面的示例查询相比时,短语 "quick fox" 将被认为比 "quick brown fox" 更近似查询条件。

范围

可以为日期,数字或字符串字段指定范围。闭区间范围用方括号 [min TO max] 和开区间范围用花括号 {min TO max} 来指定。

我们不妨来看一些示例。

  • 2012 年的所有日子
1
date:[2012-01-01 TO 2012-12-31]
  • 数字 1 到 5
1
count:[1 TO 5]
  • alphaomega 之间的标签,不包括 alphaomega
1
tags:{alpha TO omega}
  • 10 以上的数字
1
count:[10 TO *]
  • 2012 年以前的所有日期
1
date:{* TO 2012-01-01}

此外,开区间和闭区间也可以组合使用

  • 数组 1 到 5,但不包括 5
1
count:[1 TO 5}

一边无界的范围也可以使用以下语法:

1
2
3
4
age:>10
age:>=10
age:<10
age:<=10

当然,你也可以使用 AND 运算符来得到连个查询结果的交集

1
2
age:(>=10 AND <20)
age:(+>=10 +<20)

Boosting

使用操作符 ^ 使一个术语比另一个术语更相关。例如,如果我们想查找所有有关狐狸的文档,但我们对狐狸特别感兴趣:

1
quick^2 fox

默认提升值是 1,但可以是任何正浮点数。 0 到 1 之间的提升减少了相关性。

增强也可以应用于短语或组:

1
"john smith"^2   (foo bar)^4

布尔操作

默认情况下,只要一个词匹配,所有词都是可选的。搜索 foo bar baz 将查找包含 foobarbaz 中的一个或多个的任何文档。我们已经讨论了上面的default_operator,它允许你强制要求所有的项,但也有布尔运算符可以在查询字符串本身中使用,以提供更多的控制。

首选的操作符是 +(此术语必须存在)和 - (此术语不得存在)。所有其他条款是可选的。例如,这个查询:

1
quick brown +fox -news

这条查询意味着:

  • fox 必须存在
  • news 必须不存在
  • quick 和 brown 是可有可无的

熟悉的运算符 ANDORNOT(也写成 &&||!)也被支持。然而,这些操作符有一定的优先级:NOT 优先于 ANDAND 优先于 OR。虽然 +- 仅影响运算符右侧的术语,但 ANDOR 会影响左侧和右侧的术语。

分组

多个术语或子句可以用圆括号组合在一起,形成子查询

1
(quick OR brown) AND fox

可以使用组来定位特定的字段,或者增强子查询的结果:

1
status:(active OR pending) title:(full text search)^2

保留字

如果你需要使用任何在你的查询本身中作为操作符的字符(而不是作为操作符),那么你应该用一个反斜杠来转义它们。例如,要搜索(1 + 1)= 2,您需要将查询写为 \(1\+1\)\=2

保留字符是:+ - = && || > < ! ( ) { } [ ] ^ " ~ * ? : \ /

无法正确地转义这些特殊字符可能会导致语法错误,从而阻止您的查询运行。

空查询

如果查询字符串为空或仅包含空格,则查询将生成一个空的结果集。

Visualize

要想使用可视化的方式展示您的数据,请单击侧面导航栏中的 Visualize

Visualize 工具使您能够以多种方式(如饼图、柱状图、曲线图、分布图等)查看数据。要开始使用,请点击蓝色的 Create a visualization+ 按钮。

https://www.elastic.co/guide/en/kibana/6.1/images/tutorial-visualize-landing.png

有许多可视化类型可供选择。

https://www.elastic.co/guide/en/kibana/6.1/images/tutorial-visualize-wizard-step-1.png

下面,我们来看创建几个图标示例:

Pie

您可以从保存的搜索中构建可视化文件,也可以输入新的搜索条件。要输入新的搜索条件,首先需要选择一个索引模式来指定要搜索的索引。

默认搜索匹配所有文档。最初,一个“切片”包含整个饼图:

https://www.elastic.co/guide/en/kibana/6.1/images/tutorial-visualize-pie-1.png

要指定在图表中展示哪些数据,请使用 Elasticsearch 存储桶聚合。分组汇总只是将与您的搜索条件相匹配的文档分类到不同的分类中,也称为分组。

为每个范围定义一个存储桶:

  1. 单击 Split Slices
  2. Aggregation 列表中选择 Terms。_注意:这里的 Terms 是 Elk 采集数据时定义好的字段或标签_。
  3. Field 列表中选择 level.keyword
  4. 点击 images/apply-changes-button.png 按钮来更新图表。

image.png

完成后,如果想要保存这个图表,可以点击页面最上方一栏中的 Save 按钮。

Vertical Bar

我们在展示一下如何创建柱状图。

  1. 点击蓝色的 Create a visualization+ 按钮。选择 Vertical Bar
  2. 选择索引模式。由于您尚未定义任何 bucket ,因此您会看到一个大栏,显示与默认通配符查询匹配的文档总数。
  3. 指定 Y 轴所代表的字段
  4. 指定 X 轴所代表的字段
  5. 点击 images/apply-changes-button.png 按钮来更新图表。

image.png

完成后,如果想要保存这个图表,可以点击页面最上方一栏中的 Save 按钮。

Dashboard

Dashboard 可以整合和共享 Visualize 集合。

  1. 点击侧面导航栏中的 Dashboard。
  2. 点击添加显示保存的可视化列表。
  3. 点击之前保存的 Visualize,然后点击列表底部的小向上箭头关闭可视化列表。
  4. 将鼠标悬停在可视化对象上会显示允许您编辑,移动,删除和调整可视化对象大小的容器控件。