Dunwu Blog

大道至简,知易行难

Flink 入门

Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

img

概念

处理无界和有界数据

任何类型的数据都可以形成一种事件流。

数据可以被作为 无界 或者 有界 流来处理。

  • 无界流 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
  • 有界流 有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。

img

核心概念

显而易见,(数据)流是流处理的基本要素。然而,流也拥有着多种特征。这些特征决定了流如何以及何时被处理。Flink 是一个能够处理任何类型数据流的强大处理框架。

  • 有界无界 的数据流:流可以是无界的;也可以是有界的,例如固定大小的数据集。Flink 在无界的数据流处理上拥有诸多功能强大的特性,同时也针对有界的数据流开发了专用的高效算子。
  • 实时历史记录 的数据流:所有的数据都是以流的方式产生,但用户通常会使用两种截然不同的方法处理数据。或是在数据生成时进行实时的处理;亦或是先将数据流持久化到存储系统中——例如文件系统或对象存储,然后再进行批处理。Flink 的应用能够同时支持处理实时以及历史记录数据流。

状态

只有在每一个单独的事件上进行转换操作的应用才不需要状态,换言之,每一个具有一定复杂度的流处理应用都是有状态的。任何运行基本业务逻辑的流处理应用都需要在一定时间内存储所接收的事件或中间结果,以供后续的某个时间点(例如收到下一个事件或者经过一段特定时间)进行访问并进行后续处理。

img

应用状态是 Flink 中的一等公民,Flink 提供了许多状态管理相关的特性支持,其中包括:

  • 多种状态基础类型:Flink 为多种不同的数据结构提供了相对应的状态基础类型,例如原子值(value),列表(list)以及映射(map)。开发者可以基于处理函数对状态的访问方式,选择最高效、最适合的状态基础类型。
  • 插件化的 State Backend:State Backend 负责管理应用程序状态,并在需要的时候进行 checkpoint。Flink 支持多种 state backend,可以将状态存在内存或者 RocksDB。RocksDB 是一种高效的嵌入式、持久化键值存储引擎。Flink 也支持插件式的自定义 state backend 进行状态存储。
  • 精确一次语义:Flink 的 checkpoint 和故障恢复算法保证了故障发生后应用状态的一致性。因此,Flink 能够在应用程序发生故障时,对应用程序透明,不造成正确性的影响。
  • 超大数据量状态:Flink 能够利用其异步以及增量式的 checkpoint 算法,存储数 TB 级别的应用状态。
  • 可弹性伸缩的应用:Flink 能够通过在更多或更少的工作节点上对状态进行重新分布,支持有状态应用的分布式的横向伸缩。

时间

时间是流处理应用另一个重要的组成部分。因为事件总是在特定时间点发生,所以大多数的事件流都拥有事件本身所固有的时间语义。进一步而言,许多常见的流计算都基于时间语义,例如窗口聚合、会话计算、模式检测和基于时间的 join。流处理的一个重要方面是应用程序如何衡量时间,即区分事件时间(event-time)和处理时间(processing-time)。

Flink 提供了丰富的时间语义支持。

  • 事件时间模式:使用事件时间语义的流处理应用根据事件本身自带的时间戳进行结果的计算。因此,无论处理的是历史记录的事件还是实时的事件,事件时间模式的处理总能保证结果的准确性和一致性。
  • Watermark 支持:Flink 引入了 watermark 的概念,用以衡量事件时间进展。Watermark 也是一种平衡处理延时和完整性的灵活机制。
  • 迟到数据处理:当以带有 watermark 的事件时间模式处理数据流时,在计算完成之后仍会有相关数据到达。这样的事件被称为迟到事件。Flink 提供了多种处理迟到数据的选项,例如将这些数据重定向到旁路输出(side output)或者更新之前完成计算的结果。
  • 处理时间模式:除了事件时间模式,Flink 还支持处理时间语义。处理时间模式根据处理引擎的机器时钟触发计算,一般适用于有着严格的低延迟需求,并且能够容忍近似结果的流处理应用。
  • Flink 是基于事件驱动 (Event-driven) 的应用,能够同时支持流处理和批处理
  • 基于内存计算,能够保证高吞吐和低延迟,具有优越的性能表现;
  • 支持精确一次 (Exactly-once) 语意,能够完美地保证一致性和正确性;
  • 分层 API ,能够满足各个层次的开发需求;
  • 支持高可用配置,支持保存点机制,能够提供安全性和稳定性上的保证;
  • 多样化的部署方式,支持本地,远端,云端等多种部署方案;
  • 具有横向扩展架构,能够按照用户的需求进行动态扩容;
  • 活跃度极高的社区和完善的生态圈的支持。

分层架构

Flink 采用分层的架构设计,从而保证各层在功能和职责上的清晰。

自上而下,分别是 API & Libraries 层、Runtime 核心层以及物理部署层:

API & Libraries 层

这一层主要提供了编程 API 和 顶层类库:

  • 编程 API : 用于进行流处理的 DataStream API 和用于进行批处理的 DataSet API;
    • SQL & Table API - SQL & Table API 同时适用于批处理和流处理,这意味着你可以对有界数据流和无界数据流以相同的语义进行查询,并产生相同的结果。除了基本查询外, 它还支持自定义的标量函数,聚合函数以及表值函数,可以满足多样化的查询需求。
    • DataStream & DataSet API - DataStream & DataSet API 是 Flink 数据处理的核心 API,支持使用 Java 语言或 Scala 语言进行调用,提供了数据读取,数据转换和数据输出等一系列常用操作的封装。
    • Stateful Stream Processing - Stateful Stream Processing 是最低级别的抽象,它通过 Process Function 函数内嵌到 DataStream API 中。 Process Function 是 Flink 提供的最底层 API,具有最大的灵活性,允许开发者对于时间和状态进行细粒度的控制。
  • 顶层类库
    • 用于复杂事件处理的 CEP 库;
    • 用于结构化数据查询的 SQL & Table 库;
    • 基于批处理的机器学习库 FlinkML
    • 图形处理库 Gelly。

Runtime 核心层

这一层是 Flink 分布式计算框架的核心实现层,包括作业转换,任务调度,资源分配,任务执行等功能,基于这一层的实现,可以在流式引擎下同时运行流处理程序和批处理程序。

物理部署层

Flink 的物理部署层,用于支持在不同平台上部署运行 Flink 应用。

集群架构

核心组件

按照上面的介绍,Flink 核心架构的第二层是 Runtime 层, 该层采用标准的 Master - Slave 结构, 其中,Master 部分又包含了三个核心组件:Dispatcher、ResourceManager 和 JobManager,而 Slave 则主要是 TaskManager 进程。它们的功能分别如下:

  • JobManagers (也称为 masters) :JobManagers 接收由 Dispatcher 传递过来的执行程序,该执行程序包含了作业图 (JobGraph),逻辑数据流图 (logical dataflow graph) 及其所有的 classes 文件以及第三方类库 (libraries) 等等 。紧接着 JobManagers 会将 JobGraph 转换为执行图 (ExecutionGraph),然后向 ResourceManager 申请资源来执行该任务,一旦申请到资源,就将执行图分发给对应的 TaskManagers 。因此每个作业 (Job) 至少有一个 JobManager;高可用部署下可以有多个 JobManagers,其中一个作为 _leader_,其余的则处于 standby 状态。
  • TaskManagers (也称为 workers) : TaskManagers **负责实际的子任务 (subtasks) 的执行,每个 TaskManagers 都拥有一定数量的 slots。Slot 是一组固定大小的资源的合集 (如计算能力,存储空间)**。TaskManagers 启动后,会将其所拥有的 slots 注册到 ResourceManager 上,由 ResourceManager 进行统一管理。
  • Dispatcher:负责接收客户端提交的执行程序,并传递给 JobManager 。除此之外,它还提供了一个 WEB UI 界面,用于监控作业的执行情况。
  • ResourceManager :负责管理 slots 并协调集群资源。ResourceManager 接收来自 JobManager 的资源请求,并将存在空闲 slots 的 TaskManagers 分配给 JobManager 执行任务。Flink 基于不同的部署平台,如 YARN , Mesos,K8s 等提供了不同的资源管理器,当 TaskManagers 没有足够的 slots 来执行任务时,它会向第三方平台发起会话来请求额外的资源。

img

Task & SubTask

上面我们提到:TaskManagers 实际执行的是 SubTask,而不是 Task,这里解释一下两者的区别:

在执行分布式计算时,Flink 将可以链接的操作 (operators) 链接到一起,这就是 Task。之所以这样做, 是为了减少线程间切换和缓冲而导致的开销,在降低延迟的同时可以提高整体的吞吐量。 但不是所有的 operator 都可以被链接,如下 keyBy 等操作会导致网络 shuffle 和重分区,因此其就不能被链接,只能被单独作为一个 Task。 简单来说,一个 Task 就是一个可以链接的最小的操作链 (Operator Chains) 。如下图,source 和 map 算子被链接到一块,因此整个作业就只有三个 Task:

img

解释完 Task ,我们在解释一下什么是 SubTask,其准确的翻译是: _A subtask is one parallel slice of a task_,即一个 Task 可以按照其并行度拆分为多个 SubTask。如上图,source & map 具有两个并行度,KeyBy 具有两个并行度,Sink 具有一个并行度,因此整个虽然只有 3 个 Task,但是却有 5 个 SubTask。Jobmanager 负责定义和拆分这些 SubTask,并将其交给 Taskmanagers 来执行,每个 SubTask 都是一个单独的线程。

4.3 资源管理

理解了 SubTasks ,我们再来看看其与 Slots 的对应情况。一种可能的分配情况如下:

img

这时每个 SubTask 线程运行在一个独立的 TaskSlot, 它们共享所属的 TaskManager 进程的 TCP 连接(通过多路复用技术)和心跳信息 (heartbeat messages),从而可以降低整体的性能开销。此时看似是最好的情况,但是每个操作需要的资源都是不尽相同的,这里假设该作业 keyBy 操作所需资源的数量比 Sink 多很多 ,那么此时 Sink 所在 Slot 的资源就没有得到有效的利用。

基于这个原因,Flink 允许多个 subtasks 共享 slots,即使它们是不同 tasks 的 subtasks,但只要它们来自同一个 Job 就可以。假设上面 souce & map 和 keyBy 的并行度调整为 6,而 Slot 的数量不变,此时情况如下:

img

可以看到一个 Task Slot 中运行了多个 SubTask 子任务,此时每个子任务仍然在一个独立的线程中执行,只不过共享一组 Sot 资源而已。那么 Flink 到底如何确定一个 Job 至少需要多少个 Slot 呢?Flink 对于这个问题的处理很简单,默认情况一个 Job 所需要的 Slot 的数量就等于其 Operation 操作的最高并行度。如下, A,B,D 操作的并行度为 4,而 C,E 操作的并行度为 2,那么此时整个 Job 就需要至少四个 Slots 来完成。通过这个机制,Flink 就可以不必去关心一个 Job 到底会被拆分为多少个 Tasks 和 SubTasks。

img

4.4 组件通讯

Flink 的所有组件都基于 Actor System 来进行通讯。Actor system 是多种角色的 actor 的容器,它提供调度,配置,日志记录等多种服务,并包含一个可以启动所有 actor 的线程池,如果 actor 是本地的,则消息通过共享内存进行共享,但如果 actor 是远程的,则通过 RPC 的调用来传递消息。

按照统计维度的不同,Flink 中的窗口可以分为 时间窗口 (Time Windows) 和 计数窗口 (Count Windows) 。

时间窗口

时间窗口 (Time Windows) 用于以时间为维度来进行数据聚合,具体分为以下四类:

Tumbling Windows

滚动窗口 (Tumbling Windows) 是指彼此之间没有重叠的窗口。例如:每隔 1 小时统计过去 1 小时内的商品点击量,那么 1 天就只能分为 24 个窗口,每个窗口彼此之间是不存在重叠的,具体如下:

img

Sliding Windows

滑动窗口(Sliding Windows)用于滚动进行聚合分析,例如:每隔 6 分钟统计一次过去一小时内所有商品的点击量,那么统计窗口彼此之间就是存在重叠的,即 1 天可以分为 240 个窗口。图示如下:

img

可以看到 window 1 - 4 这四个窗口彼此之间都存在着时间相等的重叠部分。想要实现滑动窗口,只需要在使用 timeWindow 方法时额外传递第二个参数作为滚动时间即可,具体如下:

1
2
// 每隔3秒统计一次过去1分钟内的数据
timeWindow(Time.minutes(1),Time.seconds(3))

Session Windows

当用户在进行持续浏览时,可能每时每刻都会有点击数据,例如在活动区间内,用户可能频繁的将某类商品加入和移除购物车,而你只想知道用户本次浏览最终的购物车情况,此时就可以在用户持有的会话结束后再进行统计。想要实现这类统计,可以通过会话窗口(Session Windows) 来进行实现。

img

具体的实现代码如下:

1
2
3
4
// 以处理时间为衡量标准,如果10秒内没有任何数据输入,就认为会话已经关闭,此时触发统计
window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
// 以事件时间为衡量标准
window(EventTimeSessionWindows.withGap(Time.seconds(10)))

Global Windows

最后一个窗口是全局窗口(Global Windows), 全局窗口会将所有 key 相同的元素分配到同一个窗口中,其通常配合触发器 (trigger) 进行使用。如果没有相应触发器,则计算将不会被执行。

img

这里继续以上面词频统计的案例为例,示例代码如下:

1
2
// 当单词累计出现的次数每达到10次时,则触发计算,计算整个窗口内该单词出现的总数
window(GlobalWindows.create()).trigger(CountTrigger.of(10)).sum(1).print();

统计窗口

统计窗口(Count Windows)用于以数量为维度来进行数据聚合,同样也分为滚动窗口和滑动窗口,实现方式也和时间窗口完全一致,只是调用的 API 不同,具体如下:

1
2
3
4
// 滚动计数窗口,每1000次点击则计算一次
countWindow(1000)
// 滑动计数窗口,每10次点击发生后,则计算过去1000次点击的情况
countWindow(1000,10)

实际上计数窗口内部就是调用的我们上一部分介绍的全局窗口来实现的,其源码如下:

1
2
3
4
5
6
7
8
9
10
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}


public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}

Flink 一个比较重要的特性就是其支持有状态计算。即你可以将中间的计算结果进行保存,并提供给后续的计算使用:


img

具体而言,Flink 又将状态 (State) 分为 Keyed State 与 Operator State:

2.1 算子状态

算子状态 (Operator State):顾名思义,状态是和算子进行绑定的,一个算子的状态不能被其他算子所访问到。官方文档上对 Operator State 的解释是:_each operator state is bound to one parallel operator instance_,所以更为确切的说一个算子状态是与一个并发的算子实例所绑定的,即假设算子的并行度是 2,那么其应有两个对应的算子状态:

img

2.2 键控状态

键控状态 (Keyed State) :是一种特殊的算子状态,即状态是根据 key 值进行区分的,Flink 会为每类键值维护一个状态实例。如下图所示,每个颜色代表不同 key 值,对应四个不同的状态实例。需要注意的是键控状态只能在 KeyedStream 上进行使用,我们可以通过 stream.keyBy(...) 来得到 KeyedStream

img

事件驱动型应用

什么是事件驱动型应用?

事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。

事件驱动型应用是在计算存储分离的传统应用基础上进化而来。在传统架构中,应用需要读写远程事务型数据库。

相反,事件驱动型应用是基于状态化流处理来完成。在该设计中,数据和计算不会分离,应用只需访问本地(内存或磁盘)即可获取数据。系统容错性的实现依赖于定期向远程持久化存储写入 checkpoint。下图描述了传统应用和事件驱动型应用架构的区别。

img

事件驱动型应用的优势?

事件驱动型应用无须查询远程数据库,本地数据访问使得它具有更高的吞吐和更低的延迟。而由于定期向远程持久化存储的 checkpoint 工作可以异步、增量式完成,因此对于正常事件处理的影响甚微。事件驱动型应用的优势不仅限于本地数据访问。传统分层架构下,通常多个应用会共享同一个数据库,因而任何对数据库自身的更改(例如:由应用更新或服务扩容导致数据布局发生改变)都需要谨慎协调。反观事件驱动型应用,由于只需考虑自身数据,因此在更改数据表示或服务扩容时所需的协调工作将大大减少。

事件驱动型应用会受制于底层流处理系统对时间和状态的把控能力,Flink 诸多优秀特质都是围绕这些方面来设计的。它提供了一系列丰富的状态操作原语,允许以精确一次的一致性语义合并海量规模(TB 级别)的状态数据。此外,Flink 还支持事件时间和自由度极高的定制化窗口逻辑,而且它内置的 ProcessFunction 支持细粒度时间控制,方便实现一些高级业务逻辑。同时,Flink 还拥有一个复杂事件处理(CEP)类库,可以用来检测数据流中的模式。

Flink 中针对事件驱动应用的明星特性当属 savepoint。Savepoint 是一个一致性的状态映像,它可以用来初始化任意状态兼容的应用。在完成一次 savepoint 后,即可放心对应用升级或扩容,还可以启动多个版本的应用来完成 A/B 测试。

典型的事件驱动型应用实例

数据分析应用

什么是数据分析应用?

数据分析任务需要从原始数据中提取有价值的信息和指标。传统的分析方式通常是利用批查询,或将事件记录下来并基于此有限数据集构建应用来完成。为了得到最新数据的分析结果,必须先将它们加入分析数据集并重新执行查询或运行应用,随后将结果写入存储系统或生成报告。

借助一些先进的流处理引擎,还可以实时地进行数据分析。和传统模式下读取有限数据集不同,流式查询或应用会接入实时事件流,并随着事件消费持续产生和更新结果。这些结果数据可能会写入外部数据库系统或以内部状态的形式维护。仪表展示应用可以相应地从外部数据库读取数据或直接查询应用的内部状态。

如下图所示,Apache Flink 同时支持流式及批量分析应用。

img

流式分析应用的优势?

和批量分析相比,由于流式分析省掉了周期性的数据导入和查询过程,因此从事件中获取指标的延迟更低。不仅如此,批量查询必须处理那些由定期导入和输入有界性导致的人工数据边界,而流式查询则无须考虑该问题。

另一方面,流式分析会简化应用抽象。批量查询的流水线通常由多个独立部件组成,需要周期性地调度提取数据和执行查询。如此复杂的流水线操作起来并不容易,一旦某个组件出错将会影响流水线的后续步骤。而流式分析应用整体运行在 Flink 之类的高端流处理系统之上,涵盖了从数据接入到连续结果计算的所有步骤,因此可以依赖底层引擎提供的故障恢复机制。

Flink 为持续流式分析和批量分析都提供了良好的支持。具体而言,它内置了一个符合 ANSI 标准的 SQL 接口,将批、流查询的语义统一起来。无论是在记录事件的静态数据集上还是实时事件流上,相同 SQL 查询都会得到一致的结果。同时 Flink 还支持丰富的用户自定义函数,允许在 SQL 中执行定制化代码。如果还需进一步定制逻辑,可以利用 Flink DataStream API 和 DataSet API 进行更低层次的控制。此外,Flink 的 Gelly 库为基于批量数据集的大规模高性能图分析提供了算法和构建模块支持。

典型的数据分析应用实例

数据管道应用

什么是数据管道?

提取-转换-加载(ETL)是一种在存储系统之间进行数据转换和迁移的常用方法。ETL 作业通常会周期性地触发,将数据从事务型数据库拷贝到分析型数据库或数据仓库。

数据管道和 ETL 作业的用途相似,都可以转换、丰富数据,并将其从某个存储系统移动到另一个。但数据管道是以持续流模式运行,而非周期性触发。因此它支持从一个不断生成数据的源头读取记录,并将它们以低延迟移动到终点。例如:数据管道可以用来监控文件系统目录中的新文件,并将其数据写入事件日志;另一个应用可能会将事件流物化到数据库或增量构建和优化查询索引。

下图描述了周期性 ETL 作业和持续数据管道的差异。

img

数据管道的优势?

和周期性 ETL 作业相比,持续数据管道可以明显降低将数据移动到目的端的延迟。此外,由于它能够持续消费和发送数据,因此用途更广,支持用例更多。

很多常见的数据转换和增强操作可以利用 Flink 的 SQL 接口(或 Table API)及用户自定义函数解决。如果数据管道有更高级的需求,可以选择更通用的 DataStream API 来实现。Flink 为多种数据存储系统(如:Kafka、Kinesis、Elasticsearch、JDBC 数据库系统等)内置了连接器。同时它还提供了文件系统的连续型数据源及数据汇,可用来监控目录变化和以时间分区的方式写入文件。

典型的数据管道应用实例

参考资料

MapReduce

MapReduce 简介

MapReduce 是 Hadoop 项目中的分布式计算框架。它降低了分布式计算的门槛,可以让用户轻松编写程序,让其以可靠、容错的方式运行在大型集群上并行处理海量数据(TB 级)。

MapReduce 的设计思路是:

  • 分而治之,并行计算
  • 移动计算,而非移动数据

MapReduce 作业通过将输入的数据集拆分为独立的块,这些块由 map 任务以并行的方式处理。框架对 map 的输出进行排序,然后将其输入到 reduce 任务中。作业的输入和输出都存储在文件系统中。该框架负责调度任务、监控任务并重新执行失败的任务。

通常,计算节点和存储节点是相同的,即 MapReduce 框架和 HDFS 在同一组节点上运行。此配置允许框架在已存在数据的节点上有效地调度任务,从而在整个集群中实现非常高的聚合带宽。

MapReduce 框架由一个主 ResourceManager、每个集群节点一个工作程序 NodeManager 和每个应用程序的 MRAppMaster (YARN 组件) 组成。

MapReduce 框架仅对 <key、value> 对进行作,也就是说,框架将作业的输入视为一组 <key、value> 对,并生成一组 <key、value> 对作为作业的输出,可以想象是不同的类型。类必须可由框架序列化,因此需要实现 Writable 接口。此外,关键类必须实现 WritableComparable 接口,以便于按框架进行排序。

MapReduce 作业的 Input 和 Output 类型:

1
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

MapReduce 的特点

  • 计算跟着数据走
  • 良好的扩展性:计算能力随着节点数增加,近似线性递增
  • 高容错
  • 状态监控
  • 适合海量数据的离线批处理
  • 降低了分布式编程的门槛

MapReduce 应用场景

适用场景:

  • 数据统计,如:网站的 PV、UV 统计
  • 搜索引擎构建索引
  • 海量数据查询

不适用场景:

  • OLAP - 要求毫秒或秒级返回结果
  • 流计算 - 流计算的输入数据集是动态的,而 MapReduce 是静态的
  • DAG 计算
    • 多个作业存在依赖关系,后一个的输入是前一个的输出,构成有向无环图 DAG
    • 每个 MapReduce 作业的输出结果都会落盘,造成大量磁盘 IO,导致性能非常低下

MapReduce 工作流

MapReduce 编程模型:MapReduce 程序被分为 Map(映射)阶段和 Reduce(化简)阶段。

img

  1. input : 读取文本文件;
  2. splitting : 将文件按照行进行拆分,此时得到的 K1 行数,V1 表示对应行的文本内容;
  3. mapping : 并行将每一行按照空格进行拆分,拆分得到的 List(K2,V2),其中 K2 代表每一个单词,由于是做词频统计,所以 V2 的值为 1,代表出现 1 次;
  4. shuffling:由于 Mapping 操作可能是在不同的机器上并行处理的,所以需要通过 shuffling 将相同 key 值的数据分发到同一个节点上去合并,这样才能统计出最终的结果,此时得到 K2 为每一个单词,List(V2) 为可迭代集合,V2 就是 Mapping 中的 V2;
  5. Reducing : 这里的案例是统计单词出现的总次数,所以 ReducingList(V2) 进行归约求和操作,最终输出。

MapReduce 编程模型中 splittingshuffing 操作都是由框架实现的,需要我们自己编程实现的只有 mappingreducing,这也就是 MapReduce 这个称呼的来源。

MapReduce 组件

MapReduce 有以下核心组件:

  • Job - Job 表示 MapReduce 作业配置。Job 通常用于指定 Mapper、combiner(如果有)、PartitionerReducerInputFormatOutputFormat 实现。
  • Mapper - Mapper 负责将输入键值对映射到一组中间键值对。转换的中间记录不需要与输入记录具有相同的类型。一个给定的输入键值对可能映射到零个或多个输出键值对。
  • Combiner - combinermap 运算后的可选操作,它实际上是一个本地化的 reduce 操作。它执行中间输出的本地聚合,这有助于减少从 Mapper 传输到 Reducer 的数据量。
  • Reducer - Reducer 将共享一个 key 的一组中间值归并为一个小的数值集。Reducer 有 3 个主要子阶段:shuffle,sort 和 reduce。
    • shuffle - Reducer 的输入就是 mapper 的排序输出。在这个阶段,框架通过 HTTP 获取所有 mapper 输出的相关分区。
    • sort - 在这个阶段中,框架将按照 key (因为不同 mapper 的输出中可能会有相同的 key) 对 Reducer 的输入进行分组。shuffle 和 sort 两个阶段是同时发生的。
    • reduce - 对按键分组的数据进行聚合统计。
  • Partitioner - Partitioner 负责控制 map 中间输出结果的键的分区。
    • 键(或者键的子集)用于产生分区,通常通过一个散列函数。
    • 分区总数与作业的 reduce 任务数是一样的。因此,它控制中间输出结果(也就是这条记录)的键发送给 m 个 reduce 任务中的哪一个来进行 reduce 操作。
  • InputFormat - InputFormat 描述 MapReduce 作业的输入规范。MapReduce 框架依赖作业的 InputFormat 来完成以下工作:
    • 确认作业的输入规范。
    • 把输入文件分割成多个逻辑的 InputSplit 实例,然后将每个实例分配给一个单独的 Mapper。InputSplit 表示要由单个 Mapper 处理的数据。
    • 提供 RecordReader 的实现。RecordReaderInputSplit 中读取 <key, value> 对,并提供给 Mapper 实现进行处理。
  • OutputFormat - OutputFormat 描述 MapReduce 作业的输出规范。MapReduce 框架依赖作业的 OutputFormat 来完成以下工作:
    • 确认作业的输出规范,例如检查输出路径是否已经存在。
    • 提供 RecordWriter 实现。RecordWriter 将输出 <key, value> 对到文件系统。

img

参考资料

大数据学习路线

大数据简介

移动计算

传统的软件计算处理模型,都是“输入 -> 计算 -> 输出”模型。

如何解决 PB 级数据进行计算的问题呢?

采用分布式集群的解决方案,用数千台甚至上万台计算机构建一个大数据计算处理集群,利用更多的网络带宽、内存空间、磁盘容量、CPU 核心数去进行计算处理。

大数据计算处理通常针对的是网站的存量数据,网站大数据系统要做的就是将这些统计规律和关联关系计算出来,并由此进一步改善网站的用户体验和运营决策。

将程序分发到数据所在的地方进行计算,也就是所谓的移动计算比移动数据更划算。

大数据存储

大规模数据存储的核心问题:

  • 数据存储容量
  • 数据读写速度
  • 数据可靠性

解决方案:水平伸缩

大数据处理流程

img

1.1 数据采集

大数据处理的第一步是数据的收集。现在的中大型项目通常采用微服务架构进行分布式部署,所以数据的采集需要在多台服务器上进行,且采集过程不能影响正常业务的开展。基于这种需求,就衍生了多种日志收集工具,如 Flume 、Logstash、Kibana 等,它们都能通过简单的配置完成复杂的数据收集和数据聚合。

1.2 数据存储

收集到数据后,下一个问题就是:数据该如何进行存储?通常大家最为熟知是 MySQL、Oracle 等传统的关系型数据库,它们的优点是能够快速存储结构化的数据,并支持随机访问。但大数据的数据结构通常是半结构化(如日志数据)、甚至是非结构化的(如视频、音频数据),为了解决海量半结构化和非结构化数据的存储,衍生了 Hadoop HDFS 、KFS、GFS 等分布式文件系统,它们都能够支持结构化、半结构和非结构化数据的存储,并可以通过增加机器进行横向扩展。

分布式文件系统完美地解决了海量数据存储的问题,但是一个优秀的数据存储系统需要同时考虑数据存储和访问两方面的问题,比如你希望能够对数据进行随机访问,这是传统的关系型数据库所擅长的,但却不是分布式文件系统所擅长的,那么有没有一种存储方案能够同时兼具分布式文件系统和关系型数据库的优点,基于这种需求,就产生了 HBase、MongoDB。

1.3 数据分析

大数据处理最重要的环节就是数据分析,数据分析通常分为两种:批处理和流处理。

  • 批处理:对一段时间内海量的离线数据进行统一的处理,对应的处理框架有 Hadoop MapReduce、Spark、Flink 等;
  • 流处理:对运动中的数据进行处理,即在接收数据的同时就对其进行处理,对应的处理框架有 Storm、Spark Streaming、Flink Streaming 等。

批处理和流处理各有其适用的场景,时间不敏感或者硬件资源有限,可以采用批处理;时间敏感和实时性要求高就可以采用流处理。随着服务器硬件的价格越来越低和大家对及时性的要求越来越高,流处理越来越普遍,如股票价格预测和电商运营数据分析等。

上面的框架都是需要通过编程来进行数据分析,那么如果你不是一个后台工程师,是不是就不能进行数据的分析了?当然不是,大数据是一个非常完善的生态圈,有需求就有解决方案。为了能够让熟悉 SQL 的人员也能够进行数据的分析,查询分析框架应运而生,常用的有 Hive 、Spark SQL 、Flink SQL、 Pig、Phoenix 等。这些框架都能够使用标准的 SQL 或者 类 SQL 语法灵活地进行数据的查询分析。这些 SQL 经过解析优化后转换为对应的作业程序来运行,如 Hive 本质上就是将 SQL 转换为 MapReduce 作业,Spark SQL 将 SQL 转换为一系列的 RDDs 和转换关系(transformations),Phoenix 将 SQL 查询转换为一个或多个 HBase Scan。

1.4 数据应用

数据分析完成后,接下来就是数据应用的范畴,这取决于你实际的业务需求。比如你可以将数据进行可视化展现,或者将数据用于优化你的推荐算法,这种运用现在很普遍,比如短视频个性化推荐、电商商品推荐、头条新闻推荐等。当然你也可以将数据用于训练你的机器学习模型,这些都属于其他领域的范畴,都有着对应的框架和技术栈进行处理,这里就不一一赘述。

1.5 其他框架

上面是一个标准的大数据处理流程所用到的技术框架。但是实际的大数据处理流程比上面复杂很多,针对大数据处理中的各种复杂问题分别衍生了各类框架:

  • 单机的处理能力都是存在瓶颈的,所以大数据框架都是采用集群模式进行部署,为了更方便的进行集群的部署、监控和管理,衍生了 Ambari、Cloudera Manager 等集群管理工具;
  • 想要保证集群高可用,需要用到 ZooKeeper ,ZooKeeper 是最常用的分布式协调服务,它能够解决大多数集群问题,包括首领选举、失败恢复、元数据存储及其一致性保证。同时针对集群资源管理的需求,又衍生了 Hadoop YARN ;
  • 复杂大数据处理的另外一个显著的问题是,如何调度多个复杂的并且彼此之间存在依赖关系的作业?基于这种需求,产生了 Azkaban 和 Oozie 等工作流调度框架;
  • 大数据流处理中使用的比较多的另外一个框架是 Kafka,它可以用于消峰,避免在秒杀等场景下并发数据对流处理程序造成冲击;
  • 另一个常用的框架是 Sqoop ,主要是解决了数据迁移的问题,它能够通过简单的命令将关系型数据库中的数据导入到 HDFS 、Hive 或 HBase 中,或者从 HDFS 、Hive 导出到关系型数据库上。

大数据学习路线

框架分类

日志收集框架:Flume 、Logstash、Kibana

分布式文件存储系统:Hadoop HDFS

数据库系统:Mongodb、HBase

分布式计算框架

  • 批处理框架:Hadoop MapReduce
  • 流处理框架:Storm
  • 混合处理框架:Spark、Flink

查询分析框架:Hive 、Spark SQL 、Flink SQL、 Pig、Phoenix

集群资源管理器:Hadoop YARN

分布式协调服务:Zookeeper

数据迁移工具:Sqoop

任务调度框架:Azkaban、Oozie

集群部署和监控:Ambari、Cloudera Manager

上面列出的都是比较主流的大数据框架,社区都很活跃,学习资源也比较丰富。建议从 Hadoop 开始入门学习,因为它是整个大数据生态圈的基石,其它框架都直接或者间接依赖于 Hadoop 。接着就可以学习计算框架,Spark 和 Flink 都是比较主流的混合处理框架,Spark 出现得较早,所以其应用也比较广泛。 Flink 是当下最火热的新一代的混合处理框架,其凭借众多优异的特性得到了众多公司的青睐。两者可以按照你个人喜好或者实际工作需要进行学习。

img

学习资料

大数据最权威和最全面的学习资料就是官方文档。热门的大数据框架社区都比较活跃、版本更新迭代也比较快,所以其出版物都明显滞后于其实际版本,基于这个原因采用书本学习不是一个最好的方案。比较庆幸的是,大数据框架的官方文档都写的比较好,内容完善,重点突出,同时都采用了大量配图进行辅助讲解。当然也有一些优秀的书籍历经时间的检验,至今依然很经典,这里列出部分个人阅读过的经典书籍:

视频学习资料

上面我推荐的都是书籍学习资料,很少推荐视频学习资料,这里说明一下原因:因为书籍历经时间的考验,能够再版的或者豆瓣等平台评价高的证明都是被大众所认可的,从概率的角度上来说,其必然更加优秀,不容易浪费大家的学习时间和精力,所以我个人更倾向于官方文档或者书本的学习方式,而不是视频。因为视频学习资料,缺少一个公共的评价平台和完善的评价机制,所以其质量良莠不齐。但是视频任然有其不可替代的好处,学习起来更直观、印象也更深刻,所以对于习惯视频学习的小伙伴,这里我各推荐一个免费的和付费的视频学习资源,大家按需选择:

参考资料

Java 虚拟机之类加载

类加载机制

类是在运行期间动态加载的。

类的加载指的是将类的 .class 文件中的二进制数据读入到内存中,将其放在运行时数据区的方法区内,然后在堆区创建一个java.lang.Class对象,用来封装类在方法区内的数据结构。类的加载的最终产品是位于堆区中的Class对象,Class对象封装了类在方法区内的数据结构,并且向 Java 程序员提供了访问方法区内的数据结构的接口。

类加载器并不需要等到某个类被“首次主动使用”时再加载它,JVM 规范允许类加载器在预料某个类将要被使用时就预先加载它,如果在预先加载的过程中遇到了.class 文件缺失或存在错误,类加载器必须在程序首次主动使用该类时才报告错误(LinkageError 错误)如果这个类一直没有被程序主动使用,那么类加载器就不会报告错误

类的生命周期

Java 类的完整生命周期包括以下几个阶段:

  • 加载(Loading)
  • 链接(Linking)
    • 验证(Verification)
    • 准备(Preparation)
    • 解析(Resolution)
  • 初始化(Initialization)
  • 使用(Using)
  • 卸载(Unloading)

加载、验证、准备、初始化和卸载这 5 个阶段的顺序是确定的,类的加载过程必须按照这种顺序按部就班地开始。而解析过程在某些情况下可以在初始化阶段之后再开始,这是为了支持 Java 的动态绑定

类加载过程是指加载、验证、准备、解析和初始化这 5 个阶段。

(一)加载

加载是类加载的一个阶段,注意不要混淆。

加载,是指查找字节流,并且据此创建类的过程

加载过程完成以下三件事:

  • 通过一个类的全限定名来获取定义此类的二进制字节流。
  • 将这个字节流所代表的静态存储结构转化为方法区的运行时存储结构。
  • 在内存中生成一个代表这个类的 Class 对象,作为方法区这个类的各种数据的访问入口。

其中二进制字节流可以从以下方式中获取:

  • 从 ZIP 包读取,这很常见,最终成为日后 JAR、EAR、WAR 格式的基础。
  • 从网络中获取,这种场景最典型的应用是 Applet。
  • 运行时计算生成,这种场景使用得最多得就是动态代理技术,在 java.lang.reflect.Proxy 中,就是用了 ProxyGenerator.generateProxyClass 的代理类的二进制字节流。
  • 由其他文件生成,典型场景是 JSP 应用,即由 JSP 文件生成对应的 Class 类。
  • 从数据库读取,这种场景相对少见,例如有些中间件服务器(如 SAP Netweaver)可以选择把程序安装到数据库中来完成程序代码在集群间的分发。

更详细内容会在 3. ClassLoader 介绍。

(二)验证

验证是链接阶段的第一步。验证的目标是确保 Class 文件的字节流中包含的信息符合当前虚拟机的要求,并且不会危害虚拟机自身的安全。

验证阶段大致会完成 4 个阶段的检验动作:

  • 文件格式验证 - 验证字节流是否符合 Class 文件格式的规范,并且能被当前版本的虚拟机处理。
  • 元数据验证 - 对字节码描述的信息进行语义分析,以保证其描述的信息符合 Java 语言规范的要求。
  • 字节码验证 - 通过数据流和控制流分析,确保程序语义是合法、符合逻辑的。
  • 符号引用验证 - 发生在虚拟机将符号引用转换为直接引用的时候,对类自身以外(常量池中的各种符号引用)的信息进行匹配性校验。

验证阶段是非常重要的,但不是必须的,它对程序运行期没有影响,如果所引用的类经过反复验证,那么可以考虑采用 -Xverifynone 参数来关闭大部分的类验证措施,以缩短虚拟机类加载的时间。

(三)准备

类变量是被 static 修饰的变量,准备阶段为 static 变量在方法区分配内存并初始化为默认值,使用的是方法区的内存。

实例变量不会在这阶段分配内存,它将会在对象实例化时随着对象一起分配在 Java 堆中。(实例化不是类加载的一个过程,类加载发生在所有实例化操作之前,并且类加载只进行一次,实例化可以进行多次)

初始值一般为 0 值,例如下面的类变量 value 被初始化为 0 而不是 123。

1
public static int value = 123;

如果类变量是常量,那么会按照表达式来进行初始化,而不是赋值为 0。

1
public static final int value = 123;

准备阶段有以下几点需要注意:

  • 这时候进行内存分配的仅包括类变量(static),而不包括实例变量,实例变量会在对象实例化时随着对象一块分配在 Java 堆中。
  • 这里所设置的初始值通常情况下是数据类型默认的零值(如 00Lnullfalse 等),而不是被在 Java 代码中被显式地赋予的值。

假设一个类变量的定义为:public static int value = 3

那么变量 value 在准备阶段过后的初始值为 0,而不是 3,因为这时候尚未开始执行任何 Java 方法,而把 value 赋值为 3 的public static指令是在程序编译后,存放于类构造器()方法之中的,所以把 value 赋值为 3 的动作将在初始化阶段才会执行。

这里还需要注意如下几点:

  • 对基本数据类型来说,对于类变量(static)和全局变量,如果不显式地对其赋值而直接使用,则系统会为其赋予默认的零值,而对于局部变量来说,在使用前必须显式地为其赋值,否则编译时不通过。
  • 对于同时被 static 和 final 修饰的常量,必须在声明的时候就为其显式地赋值,否则编译时不通过;而只被 final 修饰的常量则既可以在声明时显式地为其赋值,也可以在类初始化时显式地为其赋值,总之,在使用前必须为其显式地赋值,系统不会为其赋予默认零值。
  • 对于引用数据类型 reference 来说,如数组引用、对象引用等,如果没有对其进行显式地赋值而直接使用,系统都会为其赋予默认的零值,即 null。
  • 如果在数组初始化时没有对数组中的各元素赋值,那么其中的元素将根据对应的数据类型而被赋予默认的零值。
  • 如果类字段的字段属性表中存在ConstantValue属性,即同时被 final 和 static 修饰,那么在准备阶段变量 value 就会被初始化为 ConstValue 属性所指定的值。

假设上面的类变量 value 被定义为: public static final int value = 3

编译时 Javac 将会为 value 生成 ConstantValue 属性,在准备阶段虚拟机就会根据ConstantValue的设置将 value 赋值为 3。我们可以理解为 static final 常量在编译期就将其结果放入了调用它的类的常量池中

(四)解析

在 class 文件被加载至 Java 虚拟机之前,这个类无法知道其他类及其方法、字段所对应的具体地址,甚至不知道自己方法、字段的地址。因此,每当需要引用这些成员时,Java 编译器会生成一个符号引用。在运行阶段,这个符号引用一般都能够无歧义地定位到具体目标上。

举例来说,对于一个方法调用,编译器会生成一个包含目标方法所在类的名字、目标方法的名字、接收参数类型以及返回值类型的符号引用,来指代所要调用的方法。

解析阶段目标是将常量池的符号引用替换为直接引用的过程。解析动作主要针对类或接口、字段、类方法、接口方法、方法类型、方法句柄和调用点限定符 7 类符号引用进行。

  • 符号引用(Symbolic References) - 符号引用以一组符号来描述所引用的目标,符号可以是任何形式的字面量,只要使用时能无歧义地定位到目标即可。
  • 直接引用(Direct Reference) - 直接引用可以是直接指向目标的指针、相对偏移量或是一个能间接定位到目标的句柄。

(五)初始化

在 Java 代码中,如果要初始化一个静态字段,可以在声明时直接赋值,也可以在静态代码块中对其赋值。

如果直接赋值的静态字段被 final 所修饰,并且它的类型是基本类型或字符串时,那么该字段便会被 Java 编译器标记成常量值(ConstantValue),其初始化直接由 Java 虚拟机完成。除此之外的直接赋值操作,以及所有静态代码块中的代码,则会被 Java 编译器置于同一方法中,并把它命名为 < clinit >

初始化阶段才真正开始执行类中的定义的 Java 程序代码。初始化,为类的静态变量赋予正确的初始值,JVM 负责对类进行初始化,主要对类变量进行初始化

类初始化方式

  • 声明类变量时指定初始值
  • 使用静态代码块为类变量指定初始值

在准备阶段,类变量已经赋过一次系统要求的初始值,而在初始化阶段,根据程序员通过程序制定的主观计划去初始化类变量和其它资源。

类初始化步骤

  1. 如果类还没有被加载和链接,开始加载该类。
  2. 如果该类的直接父类还没有被初始化,先初始化其父类。
  3. 如果该类有初始化语句,则依次执行这些初始化语句。

类初始化时机

只有主动引用类的时候才会导致类的初始化。

(1)主动引用

类的主动引用包括以下六种:

  • 创建类的实例 - 也就是 new 对象
  • 访问静态变量 - 访问某个类或接口的静态变量,或者对该静态变量赋值
  • 访问静态方法
  • 反射 - 如Class.forName(“com.shengsiyuan.Test”)
  • 初始化子类 - 初始化某个类的子类,则其父类也会被初始化
  • 启动类 - Java 虚拟机启动时被标明为启动类的类(Java Test),直接使用java.exe命令来运行某个主类

(2)被动引用

以上 5 种场景中的行为称为对一个类进行主动引用。除此之外,所有引用类的方式都不会触发初始化,称为被动引用。被动引用的常见例子包括:

  • 通过子类引用父类的静态字段,不会导致子类初始化
1
System.out.println(SubClass.value); // value 字段在 SuperClass 中定义
  • 通过数组定义来引用类,不会触发此类的初始化。该过程会对数组类进行初始化,数组类是一个由虚拟机自动生成的、直接继承自 Object 的子类,其中包含了数组的属性和方法。
1
SuperClass[] sca = new SuperClass[10];
  • 常量在编译阶段会存入调用类的常量池中,本质上并没有直接引用到定义常量的类,因此不会触发定义常量的类的初始化
1
System.out.println(ConstClass.HELLOWORLD);

类初始化细节

类初始化 <clinit>() 方法的细节:

  • 是由编译器自动收集类中所有类变量的赋值动作和静态语句块(static{} 块)中的语句合并产生的,编译器收集的顺序由语句在源文件中出现的顺序决定。特别注意的是,静态语句块只能访问到定义在它之前的类变量,定义在它之后的类变量只能赋值,不能访问。例如以下代码:
1
2
3
4
5
6
7
public class Test {
static {
i = 0; // 给变量赋值可以正常编译通过
System.out.print(i); // 这句编译器会提示“非法向前引用”
}
static int i = 1;
}
  • 与类的构造函数(或者说实例构造器 <init>())不同,不需要显式的调用父类的构造器。虚拟机会自动保证在子类的 <clinit>() 方法运行之前,父类的 <clinit>() 方法已经执行结束。因此虚拟机中第一个执行 <clinit>() 方法的类肯定为 java.lang.Object
  • 由于父类的 <clinit>() 方法先执行,也就意味着父类中定义的静态语句块要优于子类的变量赋值操作。例如以下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
static class Parent {
public static int A = 1;
static {
A = 2;
}
}

static class Sub extends Parent {
public static int B = A;
}

public static void main(String[] args) {
System.out.println(Sub.B); // 输出结果是父类中的静态变量 A 的值,也就是 2。
}
  • <clinit>() 方法对于类或接口不是必须的,如果一个类中不包含静态语句块,也没有对类变量的赋值操作,编译器可以不为该类生成 <clinit>() 方法。
  • 接口中不可以使用静态语句块,但仍然有类变量初始化的赋值操作,因此接口与类一样都会生成 <clinit>() 方法。但接口与类不同的是,执行接口的 <clinit>() 方法不需要先执行父接口的 <clinit>() 方法。只有当父接口中定义的变量使用时,父接口才会初始化。另外,接口的实现类在初始化时也一样不会执行接口的 <clinit>() 方法。
  • 虚拟机会保证一个类的 <clinit>() 方法在多线程环境下被正确的加锁和同步,如果多个线程同时初始化一个类,只会有一个线程执行这个类的 <clinit>() 方法,其它线程都会阻塞等待,直到活动线程执行 <clinit>() 方法完毕。如果在一个类的 <clinit>() 方法中有耗时的操作,就可能造成多个线程阻塞,在实际过程中此种阻塞很隐蔽。

ClassLoader

ClassLoader 即类加载器,负责将类加载到 JVM。在 Java 虚拟机外部实现,以便让应用程序自己决定如何去获取所需要的类。

JVM 加载 class 文件到内存有两种方式:

  • 隐式加载 - JVM 自动加载需要的类到内存中。
  • 显示加载 - 通过使用 ClassLoader 来加载一个类到内存中。

类与类加载器

如何判断两个类是否相等:类本身相等,并且使用同一个类加载器进行加载。这是因为每一个 ClassLoader 都拥有一个独立的类名称空间

这里的相等,包括类的 Class 对象的 equals() 方法、isAssignableFrom() 方法、isInstance() 方法的返回结果为 true,也包括使用 instanceof 关键字做对象所属关系判定结果为 true。

类加载器分类

img

Bootstrap ClassLoader

Bootstrap ClassLoader ,即启动类加载器 ,负责加载 JVM 自身工作所需要的类

Bootstrap ClassLoader 会将存放在 <JAVA_HOME>\lib 目录中的,或者被 -Xbootclasspath 参数所指定的路径中的,并且是虚拟机识别的(仅按照文件名识别,如 rt.jar,名字不符合的类库即使放在 lib 目录中也不会被加载)类库加载到虚拟机内存中

Bootstrap ClassLoader 是由 C++ 实现的,它完全由 JVM 自己控制的,启动类加载器无法被 Java 程序直接引用,用户在编写自定义类加载器时,如果需要把加载请求委派给启动类加载器,直接使用 null 代替即可。

ExtClassLoader

ExtClassLoader,即扩展类加载器,这个类加载器是由 ExtClassLoader(sun.misc.Launcher\$ExtClassLoader)实现的。

ExtClassLoader 负责将 <JAVA_HOME>\lib\ext 或者被 java.ext.dir 系统变量所指定路径中的所有类库加载到内存中,开发者可以直接使用扩展类加载器

AppClassLoader

AppClassLoader,即应用程序类加载器,这个类加载器是由 AppClassLoader(sun.misc.Launcher\$AppClassLoader) 实现的。由于这个类加载器是 ClassLoader 中的 getSystemClassLoader() 方法的返回值,因此一般称为系统类加载器。

AppClassLoader 负责加载用户类路径(即 classpath)上所指定的类库,开发者可以直接使用这个类加载器,如果应用程序中没有自定义过自己的类加载器,一般情况下这个就是程序中默认的类加载器。

自定义类加载器

自定义类加载器可以做到如下几点:

  • 在执行非置信代码之前,自动验证数字签名。
  • 动态地创建符合用户特定需要的定制化构建类。
  • 从特定的场所取得 java class,例如数据库中和网络中。

假设,我们需要自定义一个名为 FileSystemClassLoader 的类加载器,继承自 java.lang.ClassLoader,用于加载文件系统上的类。它首先根据类的全名在文件系统上查找类的字节代码文件(.class 文件),然后读取该文件内容,最后通过 defineClass() 方法来把这些字节代码转换成 java.lang.Class 类的实例。

java.lang.ClassLoader 类的方法 loadClass() 实现了双亲委派模型的逻辑,因此自定义类加载器一般不去覆写它,而是通过覆写 findClass() 方法。

ClassLoader 常用的场景:

  • 容器 - 典型应用:Servlet 容器(如:Tomcat、Jetty)、udf (Mysql、Hive)等。加载解压 jar 包或 war 包后,加载其 Class 到指定的类加载器中运行(通常需要考虑空间隔离)。
  • 热部署、热插拔 - 应用启动后,动态获得某个类信息,然后加载到 JVM 中工作。很多著名的容器软件、框架(如:Spring 等),都使用 ClassLoader 来实现自身的热部署。

【示例】自定义一个类加载器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class FileSystemClassLoader extends ClassLoader {

private String rootDir;

public FileSystemClassLoader(String rootDir) {
this.rootDir = rootDir;
}

protected Class<?> findClass(String name) throws ClassNotFoundException {
byte[] classData = getClassData(name);
if (classData == null) {
throw new ClassNotFoundException();
} else {
return defineClass(name, classData, 0, classData.length);
}
}

private byte[] getClassData(String className) {
String path = classNameToPath(className);
try {
InputStream ins = new FileInputStream(path);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
int bufferSize = 4096;
byte[] buffer = new byte[bufferSize];
int bytesNumRead;
while ((bytesNumRead = ins.read(buffer)) != -1) {
baos.write(buffer, 0, bytesNumRead);
}
return baos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

private String classNameToPath(String className) {
return rootDir + File.separatorChar
+ className.replace('.', File.separatorChar) + ".class";
}
}

双亲委派

理解双亲委派之前,先让我们看一个示例。

【示例】寻找类加载示例

1
2
3
4
5
6
public static void main(String[] args) {
ClassLoader loader = Thread.currentThread().getContextClassLoader();
System.out.println(loader);
System.out.println(loader.getParent());
System.out.println(loader.getParent().getParent());
}

输出:

1
2
3
sun.misc.Launcher$AppClassLoader@18b4aac2
sun.misc.Launcher$ExtClassLoader@19e1023e
null

从上面的结果可以看出,并没有获取到 ExtClassLoader 的父 Loader,原因是 Bootstrap Loader(引导类加载器)是用 C 语言实现的,找不到一个确定的返回父 Loader 的方式,于是就返回 null。

下图展示的类加载器之间的层次关系,称为类加载器的双亲委派模型(Parents Delegation Model)该模型要求除了顶层的 Bootstrap ClassLoader 外,其余的类加载器都应有自己的父类加载器这里类加载器之间的父子关系一般通过组合(Composition)关系来实现,而不是通过继承(Inheritance)的关系实现

(1)工作过程

一个类加载器首先将类加载请求传送到父类加载器,只有当父类加载器无法完成类加载请求时才尝试加载

(2)好处

使得 Java 类随着它的类加载器一起具有一种带有优先级的层次关系,从而使得基础类得到统一:

  • 系统类防止内存中出现多份同样的字节码
  • 保证 Java 程序安全稳定运行

例如: java.lang.Object 存放在 rt.jar 中,如果编写另外一个 java.lang.Object 的类并放到 classpath 中,程序可以编译通过。因为双亲委派模型的存在,所以在 rt.jar 中的 Object 比在 classpath 中的 Object 优先级更高,因为 rt.jar 中的 Object 使用的是启动类加载器,而 classpath 中的 Object 使用的是应用程序类加载器。正因为 rt.jar 中的 Object 优先级更高,因为程序中所有的 Object 都是这个 Object

(3)实现

以下是抽象类 java.lang.ClassLoader 的代码片段,其中的 loadClass() 方法运行过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public abstract class ClassLoader {
// The parent class loader for delegation
private final ClassLoader parent;

public Class<?> loadClass(String name) throws ClassNotFoundException {
return loadClass(name, false);
}

protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
synchronized (getClassLoadingLock(name)) {
// 首先判断该类型是否已经被加载
Class<?> c = findLoadedClass(name);
if (c == null) {
// 如果没有被加载,就委托给父类加载或者委派给启动类加载器加载
try {
if (parent != null) {
// 如果存在父类加载器,就委派给父类加载器加载
c = parent.loadClass(name, false);
} else {
// 如果不存在父类加载器,就检查是否是由启动类加载器加载的类,通过调用本地方法native Class findBootstrapClass(String name)
c = findBootstrapClassOrNull(name);
}
} catch (ClassNotFoundException e) {
// 如果父类加载器加载失败,会抛出 ClassNotFoundException
}

if (c == null) {
// 如果父类加载器和启动类加载器都不能完成加载任务,才调用自身的加载功能
c = findClass(name);
}
}
if (resolve) {
resolveClass(c);
}
return c;
}
}

protected Class<?> findClass(String name) throws ClassNotFoundException {
throw new ClassNotFoundException(name);
}
}

【说明】

  • 先检查类是否已经加载过,如果没有则让父类加载器去加载。
  • 当父类加载器加载失败时抛出 ClassNotFoundException,此时尝试自己去加载。

ClassLoader 参数

在生产环境上启动 java 应用时,通常会指定一些 ClassLoader 参数,以加载应用所需要的 lib:

1
java -jar xxx.jar -classpath lib/*

ClassLoader 相关参数选项:

参数选项 ClassLoader 类型 说明
-Xbootclasspath Bootstrap ClassLoader 设置 Bootstrap ClassLoader 搜索路径。【不常用】
-Xbootclasspath/a Bootstrap ClassLoader 把路径添加到已存在的 Bootstrap ClassLoader 搜索路径后面。【常用】
-Xbootclasspath/p Bootstrap ClassLoader 把路径添加到已存在的 Bootstrap ClassLoader 搜索路径前面。【不常用】
-Djava.ext.dirs ExtClassLoader 设置 ExtClassLoader 搜索路径。
-Djava.class.path-cp-classpath AppClassLoader 设置 AppClassLoader 搜索路径。

类的加载

类加载方式

类加载有三种方式:

  • 命令行启动应用时候由 JVM 初始化加载
  • 通过 Class.forName() 方法动态加载
  • 通过 ClassLoader.loadClass() 方法动态加载

Class.forName()ClassLoader.loadClass() 区别

  • Class.forName() 将类的 .class 文件加载到 jvm 中之外,还会对类进行解释,执行类中的 static 块;
  • ClassLoader.loadClass() 只干一件事情,就是将 .class 文件加载到 jvm 中,不会执行 static 中的内容,只有在 newInstance 才会去执行 static 块。
  • Class.forName(name, initialize, loader) 带参函数也可控制是否加载 static 块。并且只有调用了 newInstance() 方法采用调用构造函数,创建类的对象 。

加载类错误

ClassNotFoundException

ClassNotFoundException 异常出镜率极高。**ClassNotFoundException 表示当前 classpath 下找不到指定类**。

常见问题原因:

  • 调用 ClassforName() 方法,未找到类。
  • 调用 ClassLoader 中的 loadClass() 方法,未找到类。
  • 调用 ClassLoader 中的 findSystemClass() 方法,未找到类。

【示例】执行以下代码,会抛出 ClassNotFoundException 异常:

1
2
3
4
5
6
7
8
9
public class ClassNotFoundExceptionDemo {
public static void main(String[] args) {
try {
Class.forName("NotFound");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}

解决方法:检查 classpath 下有没有相应的 class 文件。

NoClassDefFoundError

常见问题原因:

  • 类依赖的 Class 或者 jar 不存在。
  • 类文件存在,但是存在不同的域中。

解决方法:现代 Java 项目,一般使用 mavengradle 等构建工具管理项目,仔细检查找不到的类所在的 jar 包是否已添加为依赖。

UnsatisfiedLinkError

这个异常倒不是很常见,但是出错的话,通常是在 JVM 启动的时候如果一不小心将在 JVM 中的某个 lib 删除了,可能就会报这个错误了。

【示例】执行以下代码,会抛出 UnsatisfiedLinkError 错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class UnsatisfiedLinkErrorDemo {

public native void nativeMethod();

static {
System.loadLibrary("NoLib");
}

public static void main(String[] args) {
new UnsatisfiedLinkErrorDemo().nativeMethod();
}

}

【输出】

1
2
3
4
5
java.lang.UnsatisfiedLinkError: no NoLib in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at io.github.dunwu.javacore.jvm.classloader.exception.UnsatisfiedLinkErrorDemo.<clinit>(UnsatisfiedLinkErrorDemo.java:12)

ClassCastException

ClassCastException 异常通常是在程序中强制类型转换失败时出现。

【示例】执行以下代码,会抛出 ClassCastException 异常。

1
2
3
4
5
6
7
8
9
10
public class ClassCastExceptionDemo {

public static void main(String[] args) {
Object obj = new Object();
EmptyClass newObj = (EmptyClass) obj;
}

static class EmptyClass {}

}

【输出】

1
2
Exception in thread "main" java.lang.ClassCastException: java.lang.Object cannot be cast to io.github.dunwu.javacore.jvm.classloader.exception.ClassCastExceptionDemo$EmptyClass
at io.github.dunwu.javacore.jvm.classloader.exception.ClassCastExceptionDemo.main(ClassCastExceptionDemo.java:11)

参考资料

Elastic 快速入门

开源协议:Apache 2.0

简介

Elastic Stack 是什么

Elastic StackELK Stack

ELK 是指 Elastic 公司旗下三款产品 ElasticSearchLogstashKibana 的首字母组合。

  • Elasticsearch 是一个搜索和分析引擎。
  • Logstash 是服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到诸如 Elasticsearch 等“存储库”中。
  • Kibana 则可以让用户在 Elasticsearch 中使用图形和图表对数据进行可视化。

而 Elastic Stack 是 ELK Stack 的更新换代产品,最新产品引入了轻量型的单一功能数据采集器,并把它们叫做 Beats

为什么使用 Elastic Stack

对于有一定规模的公司来说,通常会很多个应用,并部署在大量的服务器上。运维和开发人员常常需要通过查看日志来定位问题。如果应用是集群化部署,试想如果登录一台台服务器去查看日志,是多么费时费力。

而通过 ELK 这套解决方案,可以同时实现日志收集、日志搜索和日志分析的功能。

Elastic Stack 架构

img

说明

以上是 Elastic Stack 的一个架构图。从图中可以清楚的看到数据流向。

  • Beats 是单一用途的数据传输平台,它可以将多台机器的数据发送到 Logstash 或 ElasticSearch。但 Beats 并不是不可或缺的一环,所以本文中暂不介绍。
  • Logstash 是一个动态数据收集管道。支持以 TCP/UDP/HTTP 多种方式收集数据(也可以接受 Beats 传输来的数据),并对数据做进一步丰富或提取字段处理。
  • ElasticSearch 是一个基于 JSON 的分布式的搜索和分析引擎。作为 ELK 的核心,它集中存储数据。
  • Kibana 是 ELK 的用户界面。它将收集的数据进行可视化展示(各种报表、图形化数据),并提供配置、管理 ELK 的界面。

ElasticSearch

Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。 作为 Elastic Stack 的核心,它集中存储您的数据,帮助您发现意料之中以及意料之外的情况。

ElasticSearch 简介

Elasticsearch 基于搜索库 Lucene 开发。ElasticSearch 隐藏了 Lucene 的复杂性,提供了简单易用的 REST API / Java API 接口(另外还有其他语言的 API 接口)。

ElasticSearch 可以视为一个文档存储,它将复杂数据结构序列化为 JSON 存储

ElasticSearch 是近乎于实时的全文搜素,这是指:

  • 从写入数据到数据可以被搜索,存在较小的延迟(大概是 1s)
  • 基于 ES 执行搜索和分析可以达到秒级

2.1.1. 核心概念

  • 索引(Index) 可以认为是文档(document)的优化集合。
  • 每个 文档(document) 都是字段(field)的集合。
  • 字段(field) 是包含数据的键值对。
  • 默认情况下,Elasticsearch 对每个字段中的所有数据建立索引,并且每个索引字段都具有专用的优化数据结构。
  • 每个索引里可以有一个或者多个类型(type)。类型(type) 是 index 的一个逻辑分类,
  • 当单台机器不足以存储大量数据时,Elasticsearch 可以将一个索引中的数据切分为多个 分片(shard)分片(shard) 分布在多台服务器上存储。有了 shard 就可以横向扩展,存储更多数据,让搜索和分析等操作分布到多台服务器上去执行,提升吞吐量和性能。每个 shard 都是一个 lucene index。
  • 任何一个服务器随时可能故障或宕机,此时 shard 可能就会丢失,因此可以为每个 shard 创建多个 **副本(replica)**。replica 可以在 shard 故障时提供备用服务,保证数据不丢失,多个 replica 还可以提升搜索操作的吞吐量和性能。primary shard(建立索引时一次设置,不能修改,默认 5 个),replica shard(随时修改数量,默认 1 个),默认每个索引 10 个 shard,5 个 primary shard,5 个 replica shard,最小的高可用配置,是 2 台服务器。

ElasticSearch 原理

2.2.1. ES 写数据过程

  • 客户端选择一个 node 发送请求过去,这个 node 就是 coordinating node(协调节点)。
  • coordinating node 对 document 进行路由,将请求转发给对应的 node(有 primary shard)。
  • 实际的 node 上的 primary shard 处理请求,然后将数据同步到 replica node
  • coordinating node 如果发现 primary node 和所有 replica node 都搞定之后,就返回响应结果给客户端。

es-write

2.2.2. es 读数据过程

可以通过 doc id 来查询,会根据 doc id 进行 hash,判断出来当时把 doc id 分配到了哪个 shard 上面去,从那个 shard 去查询。

  • 客户端发送请求到任意一个 node,成为 coordinate node
  • coordinate nodedoc id 进行哈希路由,将请求转发到对应的 node,此时会使用 round-robin 随机轮询算法,在 primary shard 以及其所有 replica 中随机选择一个,让读请求负载均衡。
  • 接收请求的 node 返回 document 给 coordinate node
  • coordinate node 返回 document 给客户端。

2.2.3. 写数据底层原理

es-write-detail

先写入内存 buffer,在 buffer 里的时候数据是搜索不到的;同时将数据写入 translog 日志文件。

如果 buffer 快满了,或者到一定时间,就会将内存 buffer 数据 refresh 到一个新的 segment file 中,但是此时数据不是直接进入 segment file 磁盘文件,而是先进入 os cache 。这个过程就是 refresh

每隔 1 秒钟,es 将 buffer 中的数据写入一个新的 segment file,每秒钟会产生一个新的磁盘文件 segment file,这个 segment file 中就存储最近 1 秒内 buffer 中写入的数据。

但是如果 buffer 里面此时没有数据,那当然不会执行 refresh 操作,如果 buffer 里面有数据,默认 1 秒钟执行一次 refresh 操作,刷入一个新的 segment file 中。

操作系统里面,磁盘文件其实都有一个东西,叫做 os cache,即操作系统缓存,就是说数据写入磁盘文件之前,会先进入 os cache,先进入操作系统级别的一个内存缓存中去。只要 buffer 中的数据被 refresh 操作刷入 os cache中,这个数据就可以被搜索到了。

为什么叫 es 是准实时的? NRT,全称 near real-time。默认是每隔 1 秒 refresh 一次的,所以 es 是准实时的,因为写入的数据 1 秒之后才能被看到。可以通过 es 的 restful api 或者 java api手动执行一次 refresh 操作,就是手动将 buffer 中的数据刷入 os cache中,让数据立马就可以被搜索到。只要数据被输入 os cache 中,buffer 就会被清空了,因为不需要保留 buffer 了,数据在 translog 里面已经持久化到磁盘去一份了。

重复上面的步骤,新的数据不断进入 buffer 和 translog,不断将 buffer 数据写入一个又一个新的 segment file 中去,每次 refresh 完 buffer 清空,translog 保留。随着这个过程推进,translog 会变得越来越大。当 translog 达到一定长度的时候,就会触发 commit 操作。

commit 操作发生第一步,就是将 buffer 中现有数据 refreshos cache 中去,清空 buffer。然后,将一个 commit point 写入磁盘文件,里面标识着这个 commit point 对应的所有 segment file,同时强行将 os cache 中目前所有的数据都 fsync 到磁盘文件中去。最后清空 现有 translog 日志文件,重启一个 translog,此时 commit 操作完成。

这个 commit 操作叫做 flush。默认 30 分钟自动执行一次 flush,但如果 translog 过大,也会触发 flush。flush 操作就对应着 commit 的全过程,我们可以通过 es api,手动执行 flush 操作,手动将 os cache 中的数据 fsync 强刷到磁盘上去。

translog 日志文件的作用是什么?你执行 commit 操作之前,数据要么是停留在 buffer 中,要么是停留在 os cache 中,无论是 buffer 还是 os cache 都是内存,一旦这台机器死了,内存中的数据就全丢了。所以需要将数据对应的操作写入一个专门的日志文件 translog 中,一旦此时机器宕机,再次重启的时候,es 会自动读取 translog 日志文件中的数据,恢复到内存 buffer 和 os cache 中去。

translog 其实也是先写入 os cache 的,默认每隔 5 秒刷一次到磁盘中去,所以默认情况下,可能有 5 秒的数据会仅仅停留在 buffer 或者 translog 文件的 os cache 中,如果此时机器挂了,会丢失 5 秒钟的数据。但是这样性能比较好,最多丢 5 秒的数据。也可以将 translog 设置成每次写操作必须是直接 fsync 到磁盘,但是性能会差很多。

实际上你在这里,如果面试官没有问你 es 丢数据的问题,你可以在这里给面试官炫一把,你说,其实 es 第一是准实时的,数据写入 1 秒后可以搜索到;可能会丢失数据的。有 5 秒的数据,停留在 buffer、translog os cache、segment file os cache 中,而不在磁盘上,此时如果宕机,会导致 5 秒的数据丢失

总结一下,数据先写入内存 buffer,然后每隔 1s,将数据 refresh 到 os cache,到了 os cache 数据就能被搜索到(所以我们才说 es 从写入到能被搜索到,中间有 1s 的延迟)。每隔 5s,将数据写入 translog 文件(这样如果机器宕机,内存数据全没,最多会有 5s 的数据丢失),translog 大到一定程度,或者默认每隔 30mins,会触发 commit 操作,将缓冲区的数据都 flush 到 segment file 磁盘文件中。

数据写入 segment file 之后,同时就建立好了倒排索引。

2.2.4. 删除/更新数据底层原理

如果是删除操作,commit 的时候会生成一个 .del 文件,里面将某个 doc 标识为 deleted 状态,那么搜索的时候根据 .del 文件就知道这个 doc 是否被删除了。

如果是更新操作,就是将原来的 doc 标识为 deleted 状态,然后新写入一条数据。

buffer 每 refresh 一次,就会产生一个 segment file,所以默认情况下是 1 秒钟一个 segment file,这样下来 segment file 会越来越多,此时会定期执行 merge。每次 merge 的时候,会将多个 segment file 合并成一个,同时这里会将标识为 deleted 的 doc 给物理删除掉,然后将新的 segment file 写入磁盘,这里会写一个 commit point,标识所有新的 segment file,然后打开 segment file 供搜索使用,同时删除旧的 segment file

2.2.5. 底层 lucene

简单来说,lucene 就是一个 jar 包,里面包含了封装好的各种建立倒排索引的算法代码。我们用 Java 开发的时候,引入 lucene jar,然后基于 lucene 的 api 去开发就可以了。

通过 lucene,我们可以将已有的数据建立索引,lucene 会在本地磁盘上面,给我们组织索引的数据结构。

2.2.6. 倒排索引

在搜索引擎中,每个文档都有一个对应的文档 ID,文档内容被表示为一系列关键词的集合。例如,文档 1 经过分词,提取了 20 个关键词,每个关键词都会记录它在文档中出现的次数和出现位置。

那么,倒排索引就是关键词到文档 ID 的映射,每个关键词都对应着一系列的文件,这些文件中都出现了关键词。

举个栗子。

有以下文档:

DocId Doc
1 谷歌地图之父跳槽 Facebook
2 谷歌地图之父加盟 Facebook
3 谷歌地图创始人拉斯离开谷歌加盟 Facebook
4 谷歌地图之父跳槽 Facebook 与 Wave 项目取消有关
5 谷歌地图之父拉斯加盟社交网站 Facebook

对文档进行分词之后,得到以下倒排索引

WordId Word DocIds
1 谷歌 1,2,3,4,5
2 地图 1,2,3,4,5
3 之父 1,2,4,5
4 跳槽 1,4
5 Facebook 1,2,3,4,5
6 加盟 2,3,5
7 创始人 3
8 拉斯 3,5
9 离开 3
10 4
.. .. ..

另外,实用的倒排索引还可以记录更多的信息,比如文档频率信息,表示在文档集合中有多少个文档包含某个单词。

那么,有了倒排索引,搜索引擎可以很方便地响应用户的查询。比如用户输入查询 Facebook,搜索系统查找倒排索引,从中读出包含这个单词的文档,这些文档就是提供给用户的搜索结果。

要注意倒排索引的两个重要细节:

  • 倒排索引中的所有词项对应一个或多个文档;
  • 倒排索引中的词项根据字典顺序升序排列

上面只是一个简单的栗子,并没有严格按照字典顺序升序排列。

Logstash

Logstash 是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。

Logstash 简介

Logstash 可以传输和处理你的日志、事务或其他数据。

Logstash 是 Elasticsearch 的最佳数据管道。

Logstash 是插件式管理模式,在输入、过滤、输出以及编码过程中都可以使用插件进行定制。Logstash 社区有超过 200 种可用插件。

Logstash 原理

Logstash 有两个必要元素:inputoutput ,一个可选元素:filter

这三个元素,分别代表 Logstash 事件处理的三个阶段:输入 > 过滤器 > 输出。

img

  • input - 负责从数据源采集数据。
  • filter - 将数据修改为你指定的格式或内容。
  • output - 将数据传输到目的地。

在实际应用场景中,通常输入、输出、过滤器不止一个。Logstash 的这三个元素都使用插件式管理方式,用户可以根据应用需要,灵活的选用各阶段需要的插件,并组合使用。

Beats

Beats 是安装在服务器上的数据中转代理

Beats 可以将数据直接传输到 Elasticsearch 或传输到 Logstash 。

img

Beats 有多种类型,可以根据实际应用需要选择合适的类型。

常用的类型有:

  • Packetbeat:网络数据包分析器,提供有关您的应用程序服务器之间交换的事务的信息。
  • Filebeat:从您的服务器发送日志文件。
  • Metricbeat:是一个服务器监视代理程序,它定期从服务器上运行的操作系统和服务收集指标。
  • Winlogbeat:提供 Windows 事件日志。

Filebeat 简介

_由于本人仅接触过 Filebeat,所以本文只介绍 Beats 组件中的 Filebeat_。

相比 Logstash,FileBeat 更加轻量化。

在任何环境下,应用程序都有停机的可能性。 Filebeat 读取并转发日志行,如果中断,则会记住所有事件恢复联机状态时所在位置。

Filebeat 带有内部模块(auditd,Apache,Nginx,System 和 MySQL),可通过一个指定命令来简化通用日志格式的收集,解析和可视化。

FileBeat 不会让你的管道超负荷。FileBeat 如果是向 Logstash 传输数据,当 Logstash 忙于处理数据,会通知 FileBeat 放慢读取速度。一旦拥塞得到解决,FileBeat 将恢复到原来的速度并继续传播。

img

Filebeat 原理

Filebeat 有两个主要组件:

  • harvester:负责读取一个文件的内容。它会逐行读取文件内容,并将内容发送到输出目的地。
  • prospector:负责管理 harvester 并找到所有需要读取的文件源。比如类型是日志,prospector 就会遍历制定路径下的所有匹配要求的文件。
1
2
3
4
5
filebeat.prospectors:
- type: log
paths:
- /var/log/*.log
- /var/path2/*.log

Filebeat 保持每个文件的状态,并经常刷新注册表文件中的磁盘状态。状态用于记住 harvester 正在读取的最后偏移量,并确保发送所有日志行。

Filebeat 将每个事件的传递状态存储在注册表文件中。所以它能保证事件至少传递一次到配置的输出,没有数据丢失。

参考资料

Filebeat

简介

Beats 是安装在服务器上的数据中转代理。

Beats 可以将数据直接传输到 Elasticsearch 或传输到 Logstash 。

img

Beats 有多种类型,可以根据实际应用需要选择合适的类型。

常用的类型有:

  • Packetbeat:网络数据包分析器,提供有关您的应用程序服务器之间交换的事务的信息。
  • Filebeat:从您的服务器发送日志文件。
  • Metricbeat:是一个服务器监视代理程序,它定期从服务器上运行的操作系统和服务收集指标。
  • Winlogbeat:提供 Windows 事件日志。

参考

更多 Beats 类型可以参考:community-beats

说明

由于本人工作中只应用了 FileBeat,所以后面内容仅介绍 FileBeat 。

FileBeat 的作用

相比 Logstash,FileBeat 更加轻量化。

在任何环境下,应用程序都有停机的可能性。 Filebeat 读取并转发日志行,如果中断,则会记住所有事件恢复联机状态时所在位置。

Filebeat 带有内部模块(auditd,Apache,Nginx,System 和 MySQL),可通过一个指定命令来简化通用日志格式的收集,解析和可视化。

FileBeat 不会让你的管道超负荷。FileBeat 如果是向 Logstash 传输数据,当 Logstash 忙于处理数据,会通知 FileBeat 放慢读取速度。一旦拥塞得到解决,FileBeat 将恢复到原来的速度并继续传播。

img

安装

Unix / Linux 系统建议使用下面方式安装,因为比较通用。

1
2
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.1.1-linux-x86_64.tar.gz
tar -zxf filebeat-6.1.1-linux-x86_64.tar.gz

更多内容可以参考:filebeat-installation

配置

配置文件

首先,必须要知道的是:filebeat.yml 是 filebeat 的配置文件。其路径会因为你安装方式而有所不同。

Beat 所有系列产品的配置文件都基于 YAML 格式,FileBeat 当然也不例外。

更多 filebeat 配置内容可以参考:配置 filebeat

更多 filebeat.yml 文件格式内容可以参考:filebeat.yml 文件格式

filebeat.yml 部分配置示例:

1
2
3
4
5
6
7
8
filebeat:
prospectors:
- type: log
paths:
- /var/log/*.log
multiline:
pattern: '^['
match: after

参考

更多 filebeat 配置内容可以参考:配置 filebeat

更多 filebeat.yml 文件格式内容可以参考:filebeat.yml 文件格式

重要配置项

下面我将列举 Filebeat 的较为重要的配置项。

如果想了解更多配置信息,可以参考:

更多 filebeat 配置内容可以参考:配置 filebeat

更多 filebeat.yml 文件格式内容可以参考:filebeat.yml 文件格式

filebeat.prospectors

(文件监视器)用于指定需要关注的文件。

示例

1
2
3
4
5
filebeat.prospectors:
- type: log
enabled: true
paths:
- /var/log/*.log

output.elasticsearch

如果你希望使用 filebeat 直接向 elasticsearch 输出数据,需要配置 output.elasticsearch 。

示例

1
2
output.elasticsearch:
hosts: ['192.168.1.42:9200']

output.logstash

如果你希望使用 filebeat 向 logstash 输出数据,然后由 logstash 再向 elasticsearch 输出数据,需要配置 output.logstash。

注意

相比于向 elasticsearch 输出数据,个人更推荐向 logstash 输出数据。

因为 logstash 和 filebeat 一起工作时,如果 logstash 忙于处理数据,会通知 FileBeat 放慢读取速度。一旦拥塞得到解决,FileBeat 将恢复到原来的速度并继续传播。这样,可以减少管道超负荷的情况。

示例

1
2
output.logstash:
hosts: ['127.0.0.1:5044']

此外,还需要在 logstash 的配置文件(如 logstash.conf)中指定 beats input 插件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
input {
beats {
port => 5044 # 此端口需要与 filebeat.yml 中的端口相同
}
}

# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }

output {
elasticsearch {
hosts => "localhost:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}

setup.kibana

如果打算使用 Filebeat 提供的 Kibana 仪表板,需要配置 setup.kibana 。

示例

1
2
setup.kibana:
host: 'localhost:5601'

setup.template.settings

在 Elasticsearch 中,索引模板用于定义设置和映射,以确定如何分析字段。

在 Filebeat 中,setup.template.settings 用于配置索引模板。

Filebeat 推荐的索引模板文件由 Filebeat 软件包安装。如果您接受 filebeat.yml 配置文件中的默认配置,Filebeat 在成功连接到 Elasticsearch 后自动加载模板。

您可以通过在 Filebeat 配置文件中配置模板加载选项来禁用自动模板加载,或加载自己的模板。您还可以设置选项来更改索引和索引模板的名称。

参考

更多内容可以参考:filebeat-template

说明

如无必要,使用 Filebeat 配置文件中的默认索引模板即可。

setup.dashboards

Filebeat 附带了示例 Kibana 仪表板。在使用仪表板之前,您需要创建索引模式 filebeat- *,并将仪表板加载到 Kibana 中。为此,您可以运行 setup 命令或在 filebeat.yml 配置文件中配置仪表板加载。

为了在 Kibana 中加载 Filebeat 的仪表盘,需要在 filebeat.yml 配置中启动开关:

1
setup.dashboards.enabled: true

参考

更多内容可以参考:configuration-dashboards

命令

filebeat 提供了一系列命令来完成各种功能。

执行命令方式:

1
./filebeat COMMAND

参考

更多内容可以参考:command-line-options

说明

个人认为命令行没有必要一一掌握,因为绝大部分功能都可以通过配置来完成。且通过命令行指定功能这种方式要求每次输入同样参数,不利于固化启动方式。

最重要的当然是启动命令 run 了。

示例 指定配置文件启动

1
2
./filebeat run -e -c filebeat.yml -d "publish"
./filebeat -e -c filebeat.yml -d "publish" # run 可以省略

模块

FilebeatMetricbeat 内部集成了一系列模块,用以简化常见日志格式(例如 NGINX、Apache 或诸如 Redis 或 Docker 等系统指标)的收集、解析和可视化过程。

  • 配置 elasticsearch 和 kibana
1
2
3
4
5
6
7
8
output.elasticsearch:
hosts: ["myEShost:9200"]
username: "elastic"
password: "elastic"
setup.kibana:
host: "mykibanahost:5601"
username: "elastic"
password: "elastic

username 和 password 是可选的,如果不需要认证则不填。

  • 初始化环境

执行下面命令,filebeat 会加载推荐索引模板。

1
./filebeat setup -e
  • 指定模块

执行下面命令,指定希望加载的模块。

1
./filebeat -e --modules system,nginx,mysql

更多内容可以参考:

原理

Filebeat 有两个主要组件:

harvester:负责读取一个文件的内容。它会逐行读取文件内容,并将内容发送到输出目的地。

prospector:负责管理 harvester 并找到所有需要读取的文件源。比如类型是日志,prospector 就会遍历制定路径下的所有匹配要求的文件。

1
2
3
4
5
filebeat.prospectors:
- type: log
paths:
- /var/log/*.log
- /var/path2/*.log

Filebeat 保持每个文件的状态,并经常刷新注册表文件中的磁盘状态。状态用于记住 harvester 正在读取的最后偏移量,并确保发送所有日志行。

Filebeat 将每个事件的传递状态存储在注册表文件中。所以它能保证事件至少传递一次到配置的输出,没有数据丢失。

参考资料

Kibana

通过 Kibana,您可以对自己的 Elasticsearch 进行可视化,还可以在 Elastic Stack 中进行导航,这样您便可以进行各种操作了,从跟踪查询负载,到理解请求如何流经您的整个应用,都能轻松完成。

安装

环境要求

版本:Elastic Stack 7.4

安装步骤

安装步骤如下:

  1. kibana 官方下载地址 下载所需版本包并解压到本地。
  2. 修改 config/kibana.yml 配置文件,设置 elasticsearch.url 指向 Elasticsearch 实例。
  3. 运行 bin/kibana (Windows 上运行 bin\kibana.bat
  4. 在浏览器上访问 http://localhost:5601

使用

检索

单击侧面导航栏中的 检索(Discover) ,可以显示 Kibana 的数据查询功能功能。

img

在搜索栏中,您可以输入 Elasticsearch 查询条件来搜索您的数据。您可以在 Discover 页面中浏览结果并在 Visualize 页面中创建已保存搜索条件的可视化。

当前索引模式显示在查询栏下方。索引模式确定提交查询时搜索哪些索引。要搜索一组不同的索引,请从下拉菜单中选择不同的模式。要添加索引模式(index pattern),请转至 Management/Kibana/Index Patterns 并单击 Add New

您可以使用字段名称和您感兴趣的值构建搜索。对于数字字段,可以使用比较运算符,如大于(>),小于(<)或等于(=)。您可以将元素与逻辑运算符 ANDORNOT 链接,全部使用大写。

默认情况下,每个匹配文档都显示所有字段。要选择要显示的文档字段,请将鼠标悬停在“可用字段”列表上,然后单击要包含的每个字段旁边的添加按钮。例如,如果只添加 account_number,则显示将更改为包含五个帐号的简单列表:

img

kibana 的搜索栏遵循 query-string-syntax 文档中所说明的查询语义。

这里说明一些最基本的查询语义。

查询字符串会被解析为一系列的术语和运算符。一个术语可以是一个单词(如:quick、brown)或用双引号包围的短语(如”quick brown”)。

查询操作允许您自定义搜索 - 下面介绍了可用的选项。

2.1.1. 字段名称

正如查询字符串查询中所述,将在搜索条件中搜索 default_field,但可以在查询语法中指定其他字段:

例如:

  • 查询 status 字段中包含 active 关键字
1
status:active
  • title 字段包含 quickbrown 关键字。如果您省略 OR 运算符,则将使用默认运算符
1
2
title:(quick OR brown)
title:(quick brown)
  • author 字段查找精确的短语 “john smith”,即精确查找。
1
author:"John Smith"
  • 任意字段 book.titlebook.contentbook.date 都包含 quickbrown(注意我们需要如何使用 \* 表示通配符)
1
book.\*:(quick brown)
  • title 字段包含任意非 null 值
1
_exists_:title

2.1.2. 通配符

ELK 提供了 ? 和 * 两个通配符。

  • ? 表示任意单个字符;
  • * 表示任意零个或多个字符。
1
qu?ck bro*

注意:通配符查询会使用大量的内存并且执行性能较为糟糕,所以请慎用。 > 提示:纯通配符 * 被写入 exsits 查询,从而提高了查询效率。因此,通配符 field:* 将匹配包含空值的文档,如:{“field”:“”},但是如果字段丢失或显示将值置为 null 则不匹配,如:“field”:null} > 提示:在一个单词的开头(例如:*ing)使用通配符这种方式的查询量特别大,因为索引中的所有术语都需要检查,以防万一匹配。通过将 allow_leading_wildcard 设置为 false,可以禁用。

2.1.3. 正则表达式

可以通过 / 将正则表达式包裹在查询字符串中进行查询

例:

1
name:/joh?n(ath[oa]n)/

支持的正则表达式语义可以参考:Regular expression syntax

2.1.4. 模糊查询

我们可以使用 ~ 运算符来进行模糊查询。

例:

假设我们实际想查询

1
quick brown forks

但是,由于拼写错误,我们的查询关键字变成如下情况,依然可以查到想要的结果。

1
quikc\~ brwn\~ foks\~

这种模糊查询使用 Damerau-Levenshtein 距离来查找所有匹配最多两个更改的项。所谓的更改是指单个字符的插入,删除或替换,或者两个相邻字符的换位。

默认编辑距离为 2,但编辑距离为 1 应足以捕捉所有人类拼写错误的 80%。它可以被指定为:

1
quikc\~1

2.1.5. 近似检索

尽管短语查询(例如,john smith)期望所有的词条都是完全相同的顺序,但是近似查询允许指定的单词进一步分开或以不同的顺序排列。与模糊查询可以为单词中的字符指定最大编辑距离一样,近似搜索也允许我们指定短语中单词的最大编辑距离:

1
"fox quick"\~5

字段中的文本越接近查询字符串中指定的原始顺序,该文档就越被认为是相关的。当与上面的示例查询相比时,短语 "quick fox" 将被认为比 "quick brown fox" 更近似查询条件。

2.1.6. 范围

可以为日期,数字或字符串字段指定范围。闭区间范围用方括号 [min TO max] 和开区间范围用花括号 {min TO max} 来指定。

我们不妨来看一些示例。

  • 2012 年的所有日子
1
date:[2012-01-01 TO 2012-12-31]
  • 数字 1 到 5
1
count:[1 TO 5]
  • alphaomega 之间的标签,不包括 alphaomega
1
tags:{alpha TO omega}
  • 10 以上的数字
1
count:[10 TO *]
  • 2012 年以前的所有日期
1
date:{* TO 2012-01-01}

此外,开区间和闭区间也可以组合使用

  • 数组 1 到 5,但不包括 5
1
count:[1 TO 5}

一边无界的范围也可以使用以下语法:

1
2
3
4
age:>10
age:>=10
age:<10
age:<=10

当然,你也可以使用 AND 运算符来得到连个查询结果的交集

1
2
age:(>=10 AND <20)
age:(+>=10 +<20)

2.1.7. Boosting

使用操作符 ^ 使一个术语比另一个术语更相关。例如,如果我们想查找所有有关狐狸的文档,但我们对狐狸特别感兴趣:

1
quick^2 fox

默认提升值是 1,但可以是任何正浮点数。 0 到 1 之间的提升减少了相关性。

增强也可以应用于短语或组:

1
"john smith"^2   (foo bar)^4

2.1.8. 布尔操作

默认情况下,只要一个词匹配,所有词都是可选的。搜索 foo bar baz 将查找包含 foobarbaz 中的一个或多个的任何文档。我们已经讨论了上面的default_operator,它允许你强制要求所有的项,但也有布尔运算符可以在查询字符串本身中使用,以提供更多的控制。

首选的操作符是 +(此术语必须存在)和 - (此术语不得存在)。所有其他条款是可选的。例如,这个查询:

1
quick brown +fox -news

这条查询意味着:

  • fox 必须存在
  • news 必须不存在
  • quick 和 brown 是可有可无的

熟悉的运算符 ANDORNOT(也写成 &&||!)也被支持。然而,这些操作符有一定的优先级:NOT 优先于 ANDAND 优先于 OR。虽然 +- 仅影响运算符右侧的术语,但 ANDOR 会影响左侧和右侧的术语。

2.1.9. 分组

多个术语或子句可以用圆括号组合在一起,形成子查询

1
(quick OR brown) AND fox

可以使用组来定位特定的字段,或者增强子查询的结果:

1
status:(active OR pending) title:(full text search)^2

2.1.10. 保留字

如果你需要使用任何在你的查询本身中作为操作符的字符(而不是作为操作符),那么你应该用一个反斜杠来转义它们。例如,要搜索(1 + 1)= 2,您需要将查询写为 \(1\+1\)\=2

保留字符是:+ - = && || > < ! ( ) { } [ ] ^ " ~ * ? : \ /

无法正确地转义这些特殊字符可能会导致语法错误,从而阻止您的查询运行。

2.1.11. 空查询

如果查询字符串为空或仅包含空格,则查询将生成一个空的结果集。

可视化

要想使用可视化的方式展示您的数据,请单击侧面导航栏中的 可视化(Visualize)

Visualize 工具使您能够以多种方式(如饼图、柱状图、曲线图、分布图等)查看数据。要开始使用,请点击蓝色的 Create a visualization+ 按钮。

https://www.elastic.co/guide/en/kibana/6.1/images/tutorial-visualize-landing.png

有许多可视化类型可供选择。

https://www.elastic.co/guide/en/kibana/6.1/images/tutorial-visualize-wizard-step-1.png

下面,我们来看创建几个图标示例:

2.2.1. Pie

您可以从保存的搜索中构建可视化文件,也可以输入新的搜索条件。要输入新的搜索条件,首先需要选择一个索引模式来指定要搜索的索引。

默认搜索匹配所有文档。最初,一个“切片”包含整个饼图:

https://www.elastic.co/guide/en/kibana/6.1/images/tutorial-visualize-pie-1.png

要指定在图表中展示哪些数据,请使用 Elasticsearch 存储桶聚合。分组汇总只是将与您的搜索条件相匹配的文档分类到不同的分类中,也称为分组。

为每个范围定义一个存储桶:

  1. 单击 Split Slices
  2. Aggregation 列表中选择 Terms。_注意:这里的 Terms 是 Elk 采集数据时定义好的字段或标签_。
  3. Field 列表中选择 level.keyword
  4. 点击 images/apply-changes-button.png 按钮来更新图表。

image.png

完成后,如果想要保存这个图表,可以点击页面最上方一栏中的 Save 按钮。

2.2.2. Vertical Bar

我们在展示一下如何创建柱状图。

  1. 点击蓝色的 Create a visualization+ 按钮。选择 Vertical Bar
  2. 选择索引模式。由于您尚未定义任何 bucket ,因此您会看到一个大栏,显示与默认通配符查询匹配的文档总数。
  3. 指定 Y 轴所代表的字段
  4. 指定 X 轴所代表的字段
  5. 点击 images/apply-changes-button.png 按钮来更新图表。

image.png

完成后,如果想要保存这个图表,可以点击页面最上方一栏中的 Save 按钮。

报表

报表(Dashboard) 可以整合和共享 Visualize 集合。

  1. 点击侧面导航栏中的 Dashboard。
  2. 点击添加显示保存的可视化列表。
  3. 点击之前保存的 Visualize,然后点击列表底部的小向上箭头关闭可视化列表。
  4. 将鼠标悬停在可视化对象上会显示允许您编辑,移动,删除和调整可视化对象大小的容器控件。

FAQ

Kibana No Default Index Pattern Warning

问题:安装 ELK 后,访问 kibana 页面时,提示以下错误信息:

1
2
3
Warning No default index pattern. You must select or create one to continue.
...
Unable to fetch mapping. Do you have indices matching the pattern?

这就说明 logstash 没有把日志写入到 elasticsearch。

解决方法:

检查 logstash 与 elasticsearch 之间的通讯是否有问题,一般问题就出在这。

参考资料

Logstash

简介

Logstash 是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。

功能

Logstash 是 Elasticsearch 的最佳数据管道。

Logstash 是插件式管理模式,在输入、过滤、输出以及编码过程中都可以使用插件进行定制。Logstash 社区有超过 200 种可用插件。

工作原理

Logstash 有两个必要元素:inputoutput ,一个可选元素:filter

这三个元素,分别代表 Logstash 事件处理的三个阶段:输入 > 过滤器 > 输出。

img

  • input 负责从数据源采集数据。
  • filter 将数据修改为你指定的格式或内容。
  • output 将数据传输到目的地。

在实际应用场景中,通常输入、输出、过滤器不止一个。Logstash 的这三个元素都使用插件式管理方式,用户可以根据应用需要,灵活的选用各阶段需要的插件,并组合使用。

后面将对插件展开讲解,暂且不表。

安装

安装步骤

安装步骤如下:

(1)在 logstash 官方下载地址 下载所需版本包并解压到本地。

(2)添加一个 logstash.conf 文件,指定要使用的插件以及每个插件的设置。举个简单的例子:

1
2
3
4
5
input { stdin { } }
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}

(3)运行 bin/logstash -f logstash.conf (Windows 上运行bin/logstash.bat -f logstash.conf

配置

设置文件

  • **logstash.yml**:logstash 的默认启动配置文件
  • **jvm.options**:logstash 的 JVM 配置文件。
  • startup.options (Linux):包含系统安装脚本在 /usr/share/logstash/bin 中使用的选项为您的系统构建适当的启动脚本。安装 Logstash 软件包时,系统安装脚本将在安装过程结束时执行,并使用 startup.options 中指定的设置来设置用户,组,服务名称和服务描述等选项。

logstash.yml 设置项

节选部分设置项,更多项请参考:https://www.elastic.co/guide/en/logstash/current/logstash-settings-file.html

参数 描述 默认值
node.name 节点名 机器的主机名
path.data Logstash 及其插件用于任何持久性需求的目录。 LOGSTASH_HOME/data
pipeline.workers 同时执行管道的过滤器和输出阶段的工作任务数量。如果发现事件正在备份,或 CPU 未饱和,请考虑增加此数字以更好地利用机器处理能力。 Number of the host’s CPU cores
pipeline.batch.size 尝试执行过滤器和输出之前,单个工作线程从输入收集的最大事件数量。较大的批量处理大小一般来说效率更高,但是以增加的内存开销为代价。您可能必须通过设置 LS_HEAP_SIZE 变量来有效使用该选项来增加 JVM 堆大小。 125
pipeline.batch.delay 创建管道事件批处理时,在将一个尺寸过小的批次发送给管道工作任务之前,等待每个事件需要多长时间(毫秒)。 5
pipeline.unsafe_shutdown 如果设置为 true,则即使在内存中仍存在 inflight 事件时,也会强制 Logstash 在关闭期间退出。默认情况下,Logstash 将拒绝退出,直到所有接收到的事件都被推送到输出。启用此选项可能会导致关机期间数据丢失。 false
path.config 主管道的 Logstash 配置路径。如果您指定一个目录或通配符,配置文件将按字母顺序从目录中读取。 Platform-specific. See [dir-layout].
config.string 包含用于主管道的管道配置的字符串。使用与配置文件相同的语法。 None
config.test_and_exit 设置为 true 时,检查配置是否有效,然后退出。请注意,使用此设置不会检查 grok 模式的正确性。 Logstash 可以从目录中读取多个配置文件。如果将此设置与 log.level:debug 结合使用,则 Logstash 将记录组合的配置文件,并注掉其源文件的配置块。 false
config.reload.automatic 设置为 true 时,定期检查配置是否已更改,并在配置更改时重新加载配置。这也可以通过 SIGHUP 信号手动触发。 false
config.reload.interval Logstash 检查配置文件更改的时间间隔。 3s
config.debug 设置为 true 时,将完全编译的配置显示为调试日志消息。您还必须设置log.level:debug。警告:日志消息将包括任何传递给插件配置作为明文的“密码”选项,并可能导致明文密码出现在您的日志! false
config.support_escapes 当设置为 true 时,带引号的字符串将处理转义字符。 false
modules 配置时,模块必须处于上表所述的嵌套 YAML 结构中。 None
http.host 绑定地址 "127.0.0.1"
http.port 绑定端口 9600
log.level 日志级别。有效选项:fatal > error > warn > info > debug > trace info
log.format 日志格式。json (JSON 格式)或 plain (原对象) plain
path.logs Logstash 自身日志的存储路径 LOGSTASH_HOME/logs
path.plugins 在哪里可以找到自定义的插件。您可以多次指定此设置以包含多个路径。

启动

命令行

通过命令行启动 logstash 的方式如下:

1
bin/logstash [options]

其中 options 是您可以指定用于控制 Logstash 执行的命令行标志。

在命令行上设置的任何标志都会覆盖 Logstash 设置文件(logstash.yml)中的相应设置,但设置文件本身不会更改。

虽然可以通过指定命令行参数的方式,来控制 logstash 的运行方式,但显然这么做很麻烦。

建议通过指定配置文件的方式,来控制 logstash 运行,启动命令如下:

1
bin/logstash -f logstash.conf

若想了解更多的命令行参数细节,请参考:https://www.elastic.co/guide/en/logstash/current/running-logstash-command-line.html

配置文件

上节,我们了解到,logstash 可以执行 bin/logstash -f logstash.conf ,按照配置文件中的参数去覆盖默认设置文件(logstash.yml)中的设置。

这节,我们就来学习一下这个配置文件如何配置参数。

配置文件结构

在工作原理一节中,我们已经知道了 Logstash 主要有三个工作阶段 input 、filter、output。而 logstash 配置文件文件结构也与之相对应:

1
2
3
4
5
input {}

filter {}

output {}

每个部分都包含一个或多个插件的配置选项。如果指定了多个过滤器,则会按照它们在配置文件中的显示顺序应用它们。

插件配置

插件的配置由插件名称和插件的一个设置块组成。

下面的例子中配置了两个输入文件配置:

1
2
3
4
5
6
7
8
9
10
11
input {
file {
path => "/var/log/messages"
type => "syslog"
}

file {
path => "/var/log/apache/access.log"
type => "apache"
}
}

您可以配置的设置因插件类型而异。你可以参考: Input Plugins, Output Plugins, Filter Plugins, 和 Codec Plugins

值类型

一个插件可以要求设置的值是一个特定的类型,比如布尔值,列表或哈希值。以下值类型受支持。

  • Array
1
users => [ {id => 1, name => bob}, {id => 2, name => jane} ]
  • Lists
1
2
;(path) => ['/var/log/messages', '/var/log/*.log']
;(uris) => ['http://elastic.co', 'http://example.net']
  • Boolean
1
;(ssl_enable) => true
  • Bytes
1
2
3
4
my_bytes => "1113"   # 1113 bytes
my_bytes => "10MiB" # 10485760 bytes
my_bytes => "100kib" # 102400 bytes
my_bytes => "180 mb" # 180000000 bytes
  • Codec
1
;(codec) => 'json'
  • Hash
1
2
3
4
5
match => {
"field1" => "value1"
"field2" => "value2"
...
}
  • Number
1
;(port) => 33
  • Password
1
;(my_password) => 'password'
  • URI
1
;(my_uri) => 'http://foo:bar@example.net'
  • Path
1
;(my_path) => '/tmp/logstash'
  • String

  • 转义字符

插件

input

Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。

常用 input 插件

  • file:从文件系统上的文件读取,就像 UNIX 命令 tail -0F 一样
  • syslog:在众所周知的端口 514 上侦听系统日志消息,并根据 RFC3164 格式进行解析
  • redis:从 redis 服务器读取,使用 redis 通道和 redis 列表。 Redis 经常用作集中式 Logstash 安装中的“代理”,它将来自远程 Logstash“托运人”的 Logstash 事件排队。
  • beats:处理由 Filebeat 发送的事件。

更多详情请见:Input Plugins

filter

过滤器是 Logstash 管道中的中间处理设备。如果符合特定条件,您可以将条件过滤器组合在一起,对事件执行操作。

常用 filter 插件

  • grok:解析和结构任意文本。 Grok 目前是 Logstash 中将非结构化日志数据解析为结构化和可查询的最佳方法。
  • mutate:对事件字段执行一般转换。您可以重命名,删除,替换和修改事件中的字段。
  • drop:完全放弃一个事件,例如调试事件。
  • clone:制作一个事件的副本,可能会添加或删除字段。
  • geoip:添加有关 IP 地址的地理位置的信息(也可以在 Kibana 中显示惊人的图表!)

更多详情请见:Filter Plugins

output

输出是 Logstash 管道的最后阶段。一个事件可以通过多个输出,但是一旦所有输出处理完成,事件就完成了执行。

常用 output 插件

  • elasticsearch:将事件数据发送给 Elasticsearch(推荐模式)。
  • file:将事件数据写入文件或磁盘。
  • graphite:将事件数据发送给 graphite(一个流行的开源工具,存储和绘制指标。 http://graphite.readthedocs.io/en/latest/)。
  • statsd:将事件数据发送到 statsd (这是一种侦听统计数据的服务,如计数器和定时器,通过 UDP 发送并将聚合发送到一个或多个可插入的后端服务)。

更多详情请见:Output Plugins

codec

用于格式化对应的内容。

常用 codec 插件

  • json:以 JSON 格式对数据进行编码或解码。
  • multiline:将多行文本事件(如 java 异常和堆栈跟踪消息)合并为单个事件。

更多插件请见:Codec Plugins

实战

前面的内容都是对 Logstash 的介绍和原理说明。接下来,我们来实战一些常见的应用场景。

传输控制台数据

stdin input 插件从标准输入读取事件。这是最简单的 input 插件,一般用于测试场景。

应用

(1)创建 logstash-input-stdin.conf

1
2
3
4
5
input { stdin { } }
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}

更多配置项可以参考:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-stdin.html

(2)执行 logstash,使用 -f 来指定你的配置文件:

1
bin/logstash -f logstash-input-stdin.conf

传输 logback 日志

elk 默认使用的 Java 日志工具是 log4j2 ,并不支持 logback 和 log4j。

想使用 logback + logstash ,可以使用 logstash-logback-encoderlogstash-logback-encoder 提供了 UDP / TCP / 异步方式来传输日志数据到 logstash。

如果你使用的是 log4j ,也不是不可以用这种方式,只要引入桥接 jar 包即可。如果你对 log4j 、logback ,或是桥接 jar 包不太了解,可以参考我的这篇博文:细说 Java 主流日志工具库

TCP 应用

logstash 配置

(1)创建 logstash-input-tcp.conf

1
2
3
4
5
6
7
8
9
10
input {
tcp {
port => 9251
codec => json_lines
mode => server
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}

更多配置项可以参考:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html

(2)执行 logstash,使用 -f 来指定你的配置文件:bin/logstash -f logstash-input-udp.conf

java 应用配置

(1)在 Java 应用的 pom.xml 中引入 jar 包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>4.11</version>
</dependency>

<!-- logback 依赖包 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-access</artifactId>
<version>1.2.3</version>
</dependency>

(2)接着,在 logback.xml 中添加 appender

1
2
3
4
5
6
7
8
9
10
11
<appender name="ELK-TCP" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<!--
destination 是 logstash 服务的 host:port,
相当于和 logstash 建立了管道,将日志数据定向传输到 logstash
-->
<destination>192.168.28.32:9251</destination>
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<logger name="io.github.dunwu.spring" level="TRACE" additivity="false">
<appender-ref ref="ELK-TCP" />
</logger>

(3)接下来,就是 logback 的具体使用 ,如果对此不了解,不妨参考一下我的这篇博文:细说 Java 主流日志工具库

实例:我的 logback.xml

UDP 应用

UDP 和 TCP 的使用方式大同小异。

logstash 配置

(1)创建 logstash-input-udp.conf

1
2
3
4
5
6
7
8
9
input {
udp {
port => 9250
codec => json
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}

更多配置项可以参考:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-udp.html

(2)执行 logstash,使用 -f 来指定你的配置文件:bin/logstash -f logstash-input-udp.conf

java 应用配置

(1)在 Java 应用的 pom.xml 中引入 jar 包:

TCP 应用 一节中的引入依赖包完全相同。

(2)接着,在 logback.xml 中添加 appender

1
2
3
4
5
6
7
<appender name="ELK-UDP" class="net.logstash.logback.appender.LogstashSocketAppender">
<host>192.168.28.32</host>
<port>9250</port>
</appender>
<logger name="io.github.dunwu.spring" level="TRACE" additivity="false">
<appender-ref ref="ELK-UDP" />
</logger>

(3)接下来,就是 logback 的具体使用 ,如果对此不了解,不妨参考一下我的这篇博文:细说 Java 主流日志工具库

实例:我的 logback.xml

传输文件

在 Java Web 领域,需要用到一些重要的工具,例如 Tomcat 、Nginx 、Mysql 等。这些不属于业务应用,但是它们的日志数据对于定位问题、分析统计同样很重要。这时无法使用 logback 方式将它们的日志传输到 logstash。

如何采集这些日志文件呢?别急,你可以使用 logstash 的 file input 插件。

需要注意的是,传输文件这种方式,必须在日志所在的机器上部署 logstash 。

应用

logstash 配置

(1)创建 logstash-input-file.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
input {
file {
path => ["/var/log/nginx/access.log"]
type => "nginx-access-log"
start_position => "beginning"
}
}

output {
if [type] == "nginx-access-log" {
elasticsearch {
hosts => ["localhost:9200"]
index => "nginx-access-log"
}
}
}

(2)执行 logstash,使用 -f 来指定你的配置文件:bin/logstash -f logstash-input-file.conf

更多配置项可以参考:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html

小技巧

启动、终止应用

如果你的 logstash 每次都是通过指定配置文件方式启动。不妨建立一个启动脚本。

1
2
# cd xxx 进入 logstash 安装目录下的 bin 目录
logstash -f logstash.conf

如果你的 logstash 运行在 linux 系统下,不妨使用 nohup 来启动一个守护进程。这样做的好处在于,即使关闭终端,应用仍会运行。

创建 startup.sh

1
nohup ./logstash -f logstash.conf >> nohup.out 2>&1 &

终止应用没有什么好方法,你只能使用 ps -ef | grep logstash ,查出进程,将其 kill 。不过,我们可以写一个脚本来干这件事:

创建 shutdown.sh

脚本不多解释,请自行领会作用。

1
2
PID=`ps -ef | grep logstash | awk '{ print $2}' | head -n 1`
kill -9 ${PID}

参考资料

ElasticSearch API

Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。 作为 Elastic Stack 的核心,它集中存储您的数据,帮助您发现意料之中以及意料之外的情况。

Elasticsearch 基于搜索库 Lucene 开发。ElasticSearch 隐藏了 Lucene 的复杂性,提供了简单易用的 REST API / Java API 接口(另外还有其他语言的 API 接口)。

_以下简称 ES_。

REST API 最详尽的文档应该参考:ES 官方 REST API

ElasticSearch API 简介

Elasticsearch 官方提供了很多版本的 Java 客户端,包含但不限于:

如果当前是:8.X 版本,推荐 Elasticsearch Java API客户端。

如果当前是:7.X 版本且不考虑升级,推荐 High Level REST客户端。

如果当前是:5.X、6.X 版本,推荐尽早升级集群版本。

Elasticsearch Java API Client 快速入门

:::detail 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//创建一个低级的客户端
final RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).build();
//创建 JSON 对象映射器
final RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
//创建 API 客户端
final ElasticsearchClient client = new ElasticsearchClient(transport);
//查询所有索引-------------------------------------------------------------------------------------
final GetIndexResponse response = client.indices().get(query -> query.index("_all"));
final IndexState products = response.result().get("products");
System.out.println(products.toString());
//关闭
client.shutdown();
transport.close();
restClient.close();

:::

Transport Client 快速入门

TransportClient 使用 transport 模块远程连接到 Elasticsearch 集群。它不会加入集群,而只是获取一个或多个初始传输地址,并以轮询方式与它们通信。

扩展:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html

:::detail 示例

启动客户端:

1
2
3
4
5
6
7
// 启动
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new TransportAddress(InetAddress.getByName("host1"), 9300))
.addTransportAddress(new TransportAddress(InetAddress.getByName("host2"), 9300));

// 关闭
client.close();

配置集群名称

注意,如果使用的集群名称与 “elasticsearch” 不同,则必须设置集群名称。

1
2
3
4
Settings settings = Settings.builder()
.put("cluster.name", "myClusterName").build();
TransportClient client = new PreBuiltTransportClient(settings);
// Add transport addresses and do something with the client...

启用 sniffing

1
2
3
Settings settings = Settings.builder()
.put("client.transport.sniff", true).build();
TransportClient client = new PreBuiltTransportClient(settings);

:::

ElasticSearch Rest

ElasticSearch Rest API 语法格式

向 Elasticsearch 发出的请求的组成部分与其它普通的 HTTP 请求是一样的:

1
curl -X<VERB> '<PROTOCOL>://<HOST>:<PORT>/<PATH>?<QUERY_STRING>' -d '<BODY>'
  • VERB:HTTP 方法,支持:GET, POST, PUT, HEAD, DELETE
  • PROTOCOL:http 或者 https 协议(只有在 Elasticsearch 前面有 https 代理的时候可用)
  • HOST:Elasticsearch 集群中的任何一个节点的主机名,如果是在本地的节点,那么就叫 localhost
  • PORT:Elasticsearch HTTP 服务所在的端口,默认为 9200 PATH API 路径(例如、_count 将返回集群中文档的数量),
  • PATH:可以包含多个组件,例如 _cluster/stats 或者 _nodes/stats/jvm
  • QUERY_STRING:一些可选的查询请求参数,例如?pretty 参数将使请求返回更加美观易读的 JSON 数据
  • BODY:一个 JSON 格式的请求主体(如果请求需要的话)

ElasticSearch Rest API 分为两种:

  • URI Search:在 URL 中使用查询参数
  • Request Body Search:基于 JSON 格式的、更加完备的 DSL

URI Search 示例:

Request Body Search 示例:

索引 API

参考资料:Elasticsearch 官方之 cat 索引 API

创建索引

新建 Index,可以直接向 ES 服务器发出 PUT 请求。

语法格式:

1
2
3
4
5
6
7
8
9
PUT /my_index
{
"settings": { ... any settings ... },
"mappings": {
"type_one": { ... any mappings ... },
"type_two": { ... any mappings ... },
...
}
}

示例:

1
2
3
4
5
6
7
8
9
PUT /user
{
"settings": {
"index": {
"number_of_shards": 3,
"number_of_replicas": 2
}
}
}

服务器返回一个 JSON 对象,里面的 acknowledged 字段表示操作成功。

1
{"acknowledged":true,"shards_acknowledged":true,"index":"user"}

如果你想禁止自动创建索引,可以通过在 config/elasticsearch.yml 的每个节点下添加下面的配置:

1
action.auto_create_index: false

删除索引

然后,我们可以通过发送 DELETE 请求,删除这个 Index。

1
DELETE /user

删除多个索引

1
2
DELETE /index_one,index_two
DELETE /index_*

查看索引

可以通过 GET 请求查看索引信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 查看索引相关信息
GET kibana_sample_data_ecommerce

# 查看索引的文档总数
GET kibana_sample_data_ecommerce/_count

# 查看前 10 条文档,了解文档格式
GET kibana_sample_data_ecommerce/_search

# _cat indices API
# 查看 indices
GET /_cat/indices/kibana*?v&s=index

# 查看状态为绿的索引
GET /_cat/indices?v&health=green

# 按照文档个数排序
GET /_cat/indices?v&s=docs.count:desc

# 查看具体的字段
GET /_cat/indices/kibana*?pri&v&h=health,index,pri,rep,docs.count,mt

# 查看索引占用的内存
GET /_cat/indices?v&h=i,tm&s=tm:desc

索引别名

ES 的索引别名就是给一个索引或者多个索引起的另一个名字,典型的应用场景是针对索引使用的平滑切换。

首先,创建索引 my_index,然后将别名 my_alias 指向它,示例如下:

1
2
PUT /my_index
PUT /my_index/_alias/my_alias

也可以通过如下形式:

1
2
3
4
5
6
POST /_aliases
{
"actions": [
{ "add": { "index": "my_index", "alias": "my_alias" }}
]
}

也可以在一次请求中增加别名和移除别名混合使用:

1
2
3
4
5
6
7
POST /_aliases
{
"actions": [
{ "remove": { "index": "my_index", "alias": "my_alias" }}
{ "add": { "index": "my_index_v2", "alias": "my_alias" }}
]
}

需要注意的是,如果别名与索引是一对一的,使用别名索引文档或者查询文档是可以的,但是如果别名和索引是一对多的,使用别名会发生错误,因为 ES 不知道把文档写入哪个索引中去或者从哪个索引中读取文档。

ES 索引别名有个典型的应用场景是平滑切换,更多细节可以查看 Elasticsearch(ES)索引零停机(无需重启)无缝平滑切换的方法

打开/关闭索引

通过在 POST 中添加 _close_open 可以打开、关闭索引。

打开索引

1
2
3
4
# 打开索引
POST kibana_sample_data_ecommerce/_open
# 关闭索引
POST kibana_sample_data_ecommerce/_close

文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
############Create Document############
#create document. 自动生成 _id
POST users/_doc
{
"user" : "Mike",
"post_date" : "2019-04-15T14:12:12",
"message" : "trying out Kibana"
}

#create document. 指定 Id。如果 id 已经存在,报错
PUT users/_doc/1?op_type=create
{
"user" : "Jack",
"post_date" : "2019-05-15T14:12:12",
"message" : "trying out Elasticsearch"
}

#create document. 指定 ID 如果已经存在,就报错
PUT users/_create/1
{
"user" : "Jack",
"post_date" : "2019-05-15T14:12:12",
"message" : "trying out Elasticsearch"
}

### Get Document by ID
#Get the document by ID
GET users/_doc/1

### Index & Update
#Update 指定 ID (先删除,在写入)
GET users/_doc/1

PUT users/_doc/1
{
"user" : "Mike"

}

#GET users/_doc/1
#在原文档上增加字段
POST users/_update/1/
{
"doc":{
"post_date" : "2019-05-15T14:12:12",
"message" : "trying out Elasticsearch"
}
}

### Delete by Id
# 删除文档
DELETE users/_doc/1

### Bulk 操作
#执行两次,查看每次的结果

#执行第 1 次
POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test2", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }

#执行第 2 次
POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test2", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }

### mget 操作
GET /_mget
{
"docs" : [
{
"_index" : "test",
"_id" : "1"
},
{
"_index" : "test",
"_id" : "2"
}
]
}

#URI 中指定 index
GET /test/_mget
{
"docs" : [
{

"_id" : "1"
},
{

"_id" : "2"
}
]
}

GET /_mget
{
"docs" : [
{
"_index" : "test",
"_id" : "1",
"_source" : false
},
{
"_index" : "test",
"_id" : "2",
"_source" : ["field3", "field4"]
},
{
"_index" : "test",
"_id" : "3",
"_source" : {
"include": ["user"],
"exclude": ["user.location"]
}
}
]
}

### msearch 操作
POST kibana_sample_data_ecommerce/_msearch
{}
{"query" : {"match_all" : {}},"size":1}
{"index" : "kibana_sample_data_flights"}
{"query" : {"match_all" : {}},"size":2}

### 清除测试数据
#清除数据
DELETE users
DELETE test
DELETE test2

创建文档

指定 ID

语法格式:

1
PUT /_index/_type/_create/_id

示例:

1
2
3
4
5
6
PUT /user/_doc/_create/1
{
"user": "张三",
"title": "工程师",
"desc": "数据库管理"
}

注意:指定 Id,如果 id 已经存在,则报错

自动生成 ID

新增记录的时候,也可以不指定 Id,这时要改成 POST 请求。

语法格式:

1
POST /_index/_type

示例:

1
2
3
4
5
6
POST /user/_doc
{
"user": "张三",
"title": "工程师",
"desc": "超级管理员"
}

删除文档

语法格式:

1
DELETE /_index/_doc/_id

示例:

1
DELETE /user/_doc/1

更新文档

先删除,再写入

语法格式:

1
PUT /_index/_type/_id

示例:

1
2
3
4
5
6
PUT /user/_doc/1
{
"user": "李四",
"title": "工程师",
"desc": "超级管理员"
}

在原文档上增加字段

语法格式:

1
POST /_index/_update/_id

示例:

1
2
3
4
5
6
POST /user/_update/1
{
"doc":{
"age" : "30"
}
}

查询文档

指定 ID 查询

语法格式:

1
GET /_index/_type/_id

示例:

1
GET /user/_doc/1

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"_index": "user",
"_type": "_doc",
"_id": "1",
"_version": 1,
"_seq_no": 536248,
"_primary_term": 2,
"found": true,
"_source": {
"user": "张三",
"title": "工程师",
"desc": "数据库管理"
}
}

返回的数据中,found 字段表示查询成功,_source 字段返回原始记录。

如果 id 不正确,就查不到数据,found 字段就是 false

查询所有记录

使用 GET 方法,直接请求 /index/type/_search,就会返回所有记录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
$ curl 'localhost:9200/user/admin/_search?pretty'
{
"took" : 1,
"timed_out" : false,
"_shards" : {
"total" : 3,
"successful" : 3,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "user",
"_type" : "admin",
"_id" : "WWuoDG8BHwECs7SiYn93",
"_score" : 1.0,
"_source" : {
"user" : "李四",
"title" : "工程师",
"desc" : "系统管理"
}
},
{
"_index" : "user",
"_type" : "admin",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"user" : "张三",
"title" : "工程师",
"desc" : "超级管理员"
}
}
]
}
}

上面代码中,返回结果的 took字段表示该操作的耗时(单位为毫秒),timed_out字段表示是否超时,hits字段表示命中的记录,里面子字段的含义如下。

  • total:返回记录数,本例是 2 条。
  • max_score:最高的匹配程度,本例是1.0
  • hits:返回的记录组成的数组。

返回的记录中,每条记录都有一个_score字段,表示匹配的程序,默认是按照这个字段降序排列。

全文搜索

ES 的查询非常特别,使用自己的 查询语法,要求 GET 请求带有数据体。

1
2
3
4
$ curl -H 'Content-Type: application/json' 'localhost:9200/user/admin/_search?pretty'  -d '
{
"query" : { "match" : { "desc" : "管理" }}
}'

上面代码使用 Match 查询,指定的匹配条件是desc字段里面包含”软件”这个词。返回结果如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 3,
"successful" : 3,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 0.38200712,
"hits" : [
{
"_index" : "user",
"_type" : "admin",
"_id" : "WWuoDG8BHwECs7SiYn93",
"_score" : 0.38200712,
"_source" : {
"user" : "李四",
"title" : "工程师",
"desc" : "系统管理"
}
},
{
"_index" : "user",
"_type" : "admin",
"_id" : "1",
"_score" : 0.3487891,
"_source" : {
"user" : "张三",
"title" : "工程师",
"desc" : "超级管理员"
}
}
]
}
}

Elastic 默认一次返回 10 条结果,可以通过size字段改变这个设置,还可以通过from字段,指定位移。

1
2
3
4
5
6
$ curl 'localhost:9200/user/admin/_search'  -d '
{
"query" : { "match" : { "desc" : "管理" }},
"from": 1,
"size": 1
}'

上面代码指定,从位置 1 开始(默认是从位置 0 开始),只返回一条结果。

逻辑运算

如果有多个搜索关键字, Elastic 认为它们是or关系。

1
2
3
4
$ curl 'localhost:9200/user/admin/_search'  -d '
{
"query" : { "match" : { "desc" : "软件 系统" }}
}'

上面代码搜索的是软件 or 系统

如果要执行多个关键词的and搜索,必须使用 布尔查询

1
2
3
4
5
6
7
8
9
10
11
$ curl -H 'Content-Type: application/json' 'localhost:9200/user/admin/_search?pretty'  -d '
{
"query": {
"bool": {
"must": [
{ "match": { "desc": "管理" } },
{ "match": { "desc": "超级" } }
]
}
}
}'

批量执行

支持在一次 API 调用中,对不同的索引进行操作

支持四种类型操作

  • index
  • create
  • update
  • delete

操作中单条操作失败,并不会影响其他操作。

返回结果包括了每一条操作执行的结果。

1
2
3
4
5
6
7
8
POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test2", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }

说明:上面的示例如果执行多次,执行结果都不一样。

批量读取

读多个索引

1
2
3
4
5
6
7
8
9
10
11
12
13
GET /_mget
{
"docs" : [
{
"_index" : "test",
"_id" : "1"
},
{
"_index" : "test",
"_id" : "2"
}
]
}

读一个索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
GET /test/_mget
{
"docs" : [
{

"_id" : "1"
},
{

"_id" : "2"
}
]
}

GET /_mget
{
"docs" : [
{
"_index" : "test",
"_id" : "1",
"_source" : false
},
{
"_index" : "test",
"_id" : "2",
"_source" : ["field3", "field4"]
},
{
"_index" : "test",
"_id" : "3",
"_source" : {
"include": ["user"],
"exclude": ["user.location"]
}
}
]
}

批量查询

1
2
3
4
5
POST kibana_sample_data_ecommerce/_msearch
{}
{"query" : {"match_all" : {}},"size":1}
{"index" : "kibana_sample_data_flights"}
{"query" : {"match_all" : {}},"size":2}

URI Search 查询语义

Elasticsearch URI Search 遵循 QueryString 查询语义,其形式如下:

1
2
3
4
GET /movies/_search?q=2012&df=title&sort=year:desc&from=0&size=10&timeout=1s
{
"profile": true
}
  • q 指定查询语句,使用 QueryString 语义
  • df 默认字段,不指定时
  • sort 排序:from 和 size 用于分页
  • profile 可以查看查询时如何被执行的
1
2
3
4
GET /movies/_search?q=title:2012&sort=year:desc&from=0&size=10&timeout=1s
{
"profile":"true"
}

Term 和 Phrase

Beautiful Mind 等效于 Beautiful OR Mind

“Beautiful Mind” 等效于 Beautiful AND Mind

1
2
3
4
5
6
7
8
9
10
11
# Term 查询
GET /movies/_search?q=title:Beautiful Mind
{
"profile":"true"
}

# 使用引号,Phrase 查询
GET /movies/_search?q=title:"Beautiful Mind"
{
"profile":"true"
}

分组与引号

title:(Beautiful AND Mind)

title=”Beautiful Mind”

AND、OR、NOT 或者 &&、||、!

注意:AND、OR、NOT 必须大写

1
2
3
4
5
6
7
8
9
10
# 布尔操作符
GET /movies/_search?q=title:(Beautiful AND Mind)
{
"profile":"true"
}

GET /movies/_search?q=title:(Beautiful NOT Mind)
{
"profile":"true"
}

范围查询

  • [] 表示闭区间
  • {} 表示开区间

示例:

1
2
3
4
5
6
7
8
9
10
# 范围查询 , 区间写法
GET /movies/_search?q=title:beautiful AND year:{2010 TO 2018%7D
{
"profile":"true"
}

GET /movies/_search?q=title:beautiful AND year:[* TO 2018]
{
"profile":"true"
}

算数符号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 2010 年以后的记录
GET /movies/_search?q=year:>2010
{
"profile":"true"
}

# 2010 年到 2018 年的记录
GET /movies/_search?q=year:(>2010 && <=2018)
{
"profile":"true"
}

# 2010 年到 2018 年的记录
GET /movies/_search?q=year:(+>2010 +<=2018)
{
"profile":"true"
}

通配符查询

  • ? 代表 1 个字符
  • * 代表 0 或多个字符

示例:

1
2
3
4
5
6
7
8
9
GET /movies/_search?q=title:mi?d
{
"profile":"true"
}

GET /movies/_search?q=title:b*
{
"profile":"true"
}

正则表达式

title:[bt]oy

模糊匹配与近似查询

示例:

1
2
3
4
5
6
7
8
9
10
11
# 相似度在 1 个字符以内
GET /movies/_search?q=title:beautifl~1
{
"profile":"true"
}

# 相似度在 2 个字符以内
GET /movies/_search?q=title:"Lord Rings"~2
{
"profile":"true"
}

Request Body & DSL

Elasticsearch 除了 URI Search 查询方式,还支持将查询语句通过 Http Request Body 发起查询。

1
2
3
4
5
6
7
GET /kibana_sample_data_ecommerce/_search?ignore_unavailable=true
{
"profile":"true",
"query": {
"match_all": {}
}
}

分页

1
2
3
4
5
6
7
8
9
GET /kibana_sample_data_ecommerce/_search?ignore_unavailable=true
{
"profile": "true",
"from": 0,
"size": 10,
"query": {
"match_all": {}
}
}

排序

最好在数字型或日期型字段上排序

因为对于多值类型或分析过的字段排序,系统会选一个值,无法得知该值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
GET /kibana_sample_data_ecommerce/_search?ignore_unavailable=true
{
"profile": "true",
"sort": [
{
"order_date": "desc"
}
],
"from": 1,
"size": 10,
"query": {
"match_all": {}
}
}

_source 过滤

如果 _source 没有存储,那就只返回匹配的文档的元数据

_source 支持使用通配符,如:_source["name*", "desc*"]

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
GET /kibana_sample_data_ecommerce/_search?ignore_unavailable=true
{
"profile": "true",
"_source": [
"order_date",
"category.keyword"
],
"from": 1,
"size": 10,
"query": {
"match_all": {}
}
}

脚本字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
GET /kibana_sample_data_ecommerce/_search?ignore_unavailable=true
{
"profile": "true",
"script_fields": {
"new_field": {
"script": {
"lang": "painless",
"source":"doc['order_date'].value+' hello'"
}
}
},
"from": 1,
"size": 10,
"query": {
"match_all": {}
}
}

使用查询表达式 - Match

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
POST movies/_search
{
"query": {
"match": {
"title": "last christmas"
}
}
}

POST movies/_search
{
"query": {
"match": {
"title": {
"query": "last christmas",
"operator": "and"
}
}
}
}

短语搜索 - Match Phrase

1
2
3
4
5
6
7
8
9
10
11
POST movies/_search
{
"query": {
"match_phrase": {
"title":{
"query": "last christmas"

}
}
}
}

集群 API

Elasticsearch 官方之 Cluster API

一些集群级别的 API 可能会在节点的子集上运行,这些节点可以用节点过滤器指定。例如,任务管理、节点统计和节点信息 API 都可以报告来自一组过滤节点而不是所有节点的结果。

节点过滤器以逗号分隔的单个过滤器列表的形式编写,每个过滤器从所选子集中添加或删除节点。每个过滤器可以是以下之一:

  • _all:将所有节点添加到子集
  • _local:将本地节点添加到子集
  • _master:将当前主节点添加到子集
  • 根据节点 ID 或节点名将匹配节点添加到子集
  • 根据 IP 地址或主机名将匹配节点添加到子集
  • 使用通配符,将节点名、地址名或主机名匹配的节点添加到子集
  • master:true, data:true, ingest:true, voting_only:true, ml:truecoordinating_only:true, 分别意味着将所有主节点、所有数据节点、所有摄取节点、所有仅投票节点、所有机器学习节点和所有协调节点添加到子集中。
  • master:false, data:false, ingest:false, voting_only:true, ml:falsecoordinating_only:false, 分别意味着将所有主节点、所有数据节点、所有摄取节点、所有仅投票节点、所有机器学习节点和所有协调节点排除在子集外。
  • 配对模式,使用 * 通配符,格式为 attrname:attrvalue,将所有具有自定义节点属性的节点添加到子集中,其名称和值与相应的模式匹配。自定义节点属性是通过 node.attr.attrname: attrvalue 形式在配置文件中设置的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 如果没有给出过滤器,默认是查询所有节点
GET /_nodes
# 查询所有节点
GET /_nodes/_all
# 查询本地节点
GET /_nodes/_local
# 查询主节点
GET /_nodes/_master
# 根据名称查询节点(支持通配符)
GET /_nodes/node_name_goes_here
GET /_nodes/node_name_goes_*
# 根据地址查询节点(支持通配符)
GET /_nodes/10.0.0.3,10.0.0.4
GET /_nodes/10.0.0.*
# 根据规则查询节点
GET /_nodes/_all,master:false
GET /_nodes/data:true,ingest:true
GET /_nodes/coordinating_only:true
GET /_nodes/master:true,voting_only:false
# 根据自定义属性查询节点(如:查询配置文件中含 node.attr.rack:2 属性的节点)
GET /_nodes/rack:2
GET /_nodes/ra*:2
GET /_nodes/ra*:2*

集群健康 API

1
2
3
4
GET /_cluster/health
GET /_cluster/health?level=shards
GET /_cluster/health/kibana_sample_data_ecommerce,kibana_sample_data_flights
GET /_cluster/health/kibana_sample_data_flights?level=shards

集群状态 API

集群状态 API 返回表示整个集群状态的元数据。

1
GET /_cluster/state

节点 API

Elasticsearch 官方之 cat Nodes API——返回有关集群节点的信息。

1
2
3
4
# 查看默认的字段
GET /_cat/nodes?v=true
# 查看指定的字段
GET /_cat/nodes?v=true&h=id,ip,port,v,m

分片 API

Elasticsearch 官方之 cat Shards API——shards 命令是哪些节点包含哪些分片的详细视图。它会告诉你它是主还是副本、文档数量、它在磁盘上占用的字节数以及它所在的节点。

1
2
3
4
5
6
# 查看默认的字段
GET /_cat/shards
# 根据名称查询分片(支持通配符)
GET /_cat/shards/my-index-*
# 查看指定的字段
GET /_cat/shards?h=index,shard,prirep,state,unassigned.reason

监控 API

Elasticsearch 中集群相关的健康、统计等相关的信息都是围绕着 cat API 进行的。

通过 GET 请求发送 cat,下面列出了所有可用的 API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
GET /_cat

=^.^=
/_cat/allocation
/_cat/shards
/_cat/shards/{index}
/_cat/master
/_cat/nodes
/_cat/tasks
/_cat/indices
/_cat/indices/{index}
/_cat/segments
/_cat/segments/{index}
/_cat/count
/_cat/count/{index}
/_cat/recovery
/_cat/recovery/{index}
/_cat/health
/_cat/pending_tasks
/_cat/aliases
/_cat/aliases/{alias}
/_cat/thread_pool
/_cat/thread_pool/{thread_pools}
/_cat/plugins
/_cat/fielddata
/_cat/fielddata/{fields}
/_cat/nodeattrs
/_cat/repositories
/_cat/snapshots/{repository}
/_cat/templates

参考资料

Elasticsearch CRUD

::: info 概述

CRUD 由英文单词 Create, Read, Update, Delete 的首字母组成,即增删改查

本文通过介绍基本的 Elasticsearch CRUD 方法,向读者呈现如何访问 Elasticsearch 数据。
:::

阅读全文 »