Dunwu Blog

大道至简,知易行难

ActiveMQ 快速入门

JMS 基本概念

JMSJava 消息服务(Java Message Service)API,是一个 Java 平台中关于面向消息中间件的 API。它用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对 JMS 提供支持。

消息模型

JMS 有两种消息模型:

  • Point-to-Point(P2P)
  • Publish/Subscribe(Pub/Sub)

P2P 的特点

img

在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列 javax.jms.Queue 相关联。

每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)。

发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。

接收者在成功接收消息之后需向队列应答成功。

如果你希望发送的每个消息都应该被成功处理的话,那么你需要 P2P 模式。

Pub/Sub 的特点

img

发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题 javax.jms.Topic 关联。

每个消息可以有多个消费者。

发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。

为了缓和这样严格的时间相关性,JMS 允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

如果你希望发送的消息可以不被做任何处理、或者被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用 Pub/Sub 模型。

JMS 编程模型

img

ConnectionFactory

创建 Connection 对象的工厂,针对两种不同的 jms 消息模型,分别有 QueueConnectionFactoryTopicConnectionFactory 两种。可以通过 JNDI 来查找 ConnectionFactory 对象。

Connection

Connection 表示在客户端和 JMS 系统之间建立的链接(对 TCP/IP socket 的包装)。Connection 可以产生一个或多个Session。跟 ConnectionFactory 一样,Connection 也有两种类型:QueueConnectionTopicConnection

Destination

Destination 是一个包装了消息目标标识符的被管对象。消息目标是指消息发布和接收的地点,或者是队列 Queue ,或者是主题 Topic 。JMS 管理员创建这些对象,然后用户通过 JNDI 发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的 Queue,以及发布者/订阅者模型的 Topic

Session

Session 表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息,生产者来发送消息,消费者来接收消息。同样,Session 也分 QueueSessionTopicSession

MessageConsumer

MessageConsumerSession 创建,并用于将消息发送到 Destination。消费者可以同步地(阻塞模式),或(非阻塞)接收 QueueTopic 类型的消息。同样,消息生产者分两种类型:QueueSenderTopicPublisher

MessageProducer

MessageProducerSession 创建,用于接收被发送到 Destination 的消息。MessageProducer 有两种类型:QueueReceiverTopicSubscriber。可分别通过 sessioncreateReceiver(Queue)createSubscriber(Topic) 来创建。当然,也可以 sessioncreatDurableSubscriber 方法来创建持久化的订阅者。

Message

是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分:

  • 消息头(必须):包含用于识别和为消息寻找路由的操作设置。
  • 一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
  • 一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。

消息接口非常灵活,并提供了许多方式来定制消息的内容。

Common Point-to-Point Publish-Subscribe
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopicSession
MessageProducer QueueSender TopicPublisher
MessageSender QueueReceiver, QueueBrowser TopicSubscriber

安装

安装条件

JDK1.7 及以上版本

本地配置了 JAVA_HOME 环境变量。

下载

支持 Windows/Unix/Linux/Cygwin

ActiveMQ 官方下载地址

Windows 下运行

(1)解压压缩包到本地;

(2)打开控制台,进入安装目录的 bin 目录下;

1
cd [activemq_install_dir]

(3)执行 activemq start 来启动 ActiveMQ

1
bin\activemq start

测试安装是否成功

(1)ActiveMQ 默认监听端口为 61616

1
netstat -an|find “61616”

(2)访问 http://127.0.0.1:8161/admin/

(3)输入用户名、密码

1
2
Login: admin
Passwort: admin

(4)点击导航栏的 Queues 菜单

(5)添加一个队列(queue)

项目中的应用

引入依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.1</version>
</dependency>

Sender.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class Sender {
private static final int SEND_NUMBER = 4;

public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory;
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
Session session;
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// MessageProducer:消息发送者
MessageProducer producer;
// TextMessage message;
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
// 得到消息生成者【发送者】
producer = session.createProducer(destination);
// 设置不持久化,此处学习,实际根据项目决定
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 构造消息,此处写死,项目就是参数,或者方法获取
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}

public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("ActiveMq 发送的消息" + i);
// 发送消息到目的地方
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
}
}
}

Receiver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class Receiver {
public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory;
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
Session session;
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// 消费者,消息接收者
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
consumer = session.createConsumer(destination);
while (true) {
//设置接收者接收消息的时间,为了便于测试,这里谁定为100s
TextMessage message = (TextMessage) consumer.receive(100000);
if (null != message) {
System.out.println("收到消息" + message.getText());
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}

运行

先运行 Receiver.java 进行消息监听,再运行 Send.java 发送消息。

输出

Send 的输出内容

1
2
3
4
发送消息:Activemq 发送消息0
发送消息:Activemq 发送消息1
发送消息:Activemq 发送消息2
发送消息:Activemq 发送消息3

Receiver 的输出内容

1
2
3
4
收到消息ActiveMQ 发送消息0
收到消息ActiveMQ 发送消息1
收到消息ActiveMQ 发送消息2
收到消息ActiveMQ 发送消息3

资源

Flink API

Flink 为流式/批式处理应用程序的开发提供了不同级别的抽象。

Programming levels of abstraction

  • Flink API 最底层的抽象为有状态实时流处理。其抽象实现是 Process Function,并且 Process Function 被 Flink 框架集成到了 DataStream API 中来为我们使用。它允许用户在应用程序中自由地处理来自单流或多流的事件(数据),并提供具有全局一致性和容错保障的状态。此外,用户可以在此层抽象中注册事件时间(event time)和处理时间(processing time)回调方法,从而允许程序可以实现复杂计算。

  • Flink API 第二层抽象是 Core APIs。实际上,许多应用程序不需要使用到上述最底层抽象的 API,而是可以使用 Core APIs 进行编程:其中包含 DataStream API(应用于有界/无界数据流场景)和 DataSet API(应用于有界数据集场景)两部分。Core APIs 提供的流式 API(Fluent API)为数据处理提供了通用的模块组件,例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等。此层 API 中处理的数据类型在每种编程语言中都有其对应的类。

    Process Function 这类底层抽象和 DataStream API 的相互集成使得用户可以选择使用更底层的抽象 API 来实现自己的需求。DataSet API 还额外提供了一些原语,比如循环/迭代(loop/iteration)操作。

  • Flink API 第三层抽象是 Table APITable API 是以表(Table)为中心的声明式编程(DSL)API,例如在流式数据场景下,它可以表示一张正在动态改变的表。Table API 遵循(扩展)关系模型:即表拥有 schema(类似于关系型数据库中的 schema),并且 Table API 也提供了类似于关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。Table API 程序是以声明的方式定义应执行的逻辑操作,而不是确切地指定程序应该执行的代码。尽管 Table API 使用起来很简洁并且可以由各种类型的用户自定义函数扩展功能,但还是比 Core API 的表达能力差。此外,Table API 程序在执行之前还会使用优化器中的优化规则对用户编写的表达式进行优化。

    表和 DataStream/DataSet 可以进行无缝切换,Flink 允许用户在编写应用程序时将 Table APIDataStream/DataSet API 混合使用。

  • Flink API 最顶层抽象是 SQL。这层抽象在语义和程序表达式上都类似于 _Table API_,但是其程序实现都是 SQL 查询表达式。SQL 抽象与 Table API 抽象之间的关联是非常紧密的,并且 SQL 查询语句可以在 Table API 中定义的表上执行。

ProcessFunction

ProcessFunction 是 Flink 所提供的最具表达力的接口。ProcessFunction 可以处理一或两条输入数据流中的单个事件或者归入一个特定窗口内的多个事件。它提供了对于时间和状态的细粒度控制。开发者可以在其中任意地修改状态,也能够注册定时器用以在未来的某一时刻触发回调函数。因此,你可以利用 ProcessFunction 实现许多有状态的事件驱动应用所需要的基于单个事件的复杂业务逻辑。

下面的代码示例展示了如何在 KeyedStream 上利用 KeyedProcessFunction 对标记为 STARTEND 的事件进行处理。当收到 START 事件时,处理函数会记录其时间戳,并且注册一个时长 4 小时的计时器。如果在计时器结束之前收到 END 事件,处理函数会计算其与上一个 START 事件的时间间隔,清空状态并将计算结果返回。否则,计时器结束,并清空状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**

* 将相邻的 keyed START 和 END 事件相匹配并计算两者的时间间隔
* 输入数据为 Tuple2<String, String> 类型,第一个字段为 key 值,
* 第二个字段标记 START 和 END 事件。
*/
public static class StartEndDuration
extends KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, Long>> {

private ValueState<Long> startTime;

@Override
public void open(Configuration conf) {
// obtain state handle
startTime = getRuntimeContext()
.getState(new ValueStateDescriptor<Long>("startTime", Long.class));
}

/** Called for each processed event. */
@Override
public void processElement(
Tuple2<String, String> in,
Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {

switch (in.f1) {
case "START":
// set the start time if we receive a start event.
startTime.update(ctx.timestamp());
// register a timer in four hours from the start event.
ctx.timerService()
.registerEventTimeTimer(ctx.timestamp() + 4 * 60 * 60 * 1000);
break;
case "END":
// emit the duration between start and end event
Long sTime = startTime.value();
if (sTime != null) {
out.collect(Tuple2.of(in.f0, ctx.timestamp() - sTime));
// clear the state
startTime.clear();
}
default:
// do nothing
}
}

/** Called when a timer fires. */
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) {

// Timeout interval exceeded. Cleaning up the state.
startTime.clear();
}
}

这个例子充分展现了 KeyedProcessFunction 强大的表达力,也因此是一个实现相当复杂的接口。

DataStream API

DataStream API 为许多通用的流处理操作提供了处理原语。这些操作包括窗口、逐条记录的转换操作,在处理事件时进行外部数据库查询等。DataStream API 支持 Java 和 Scala 语言,预先定义了例如map()reduce()aggregate() 等函数。你可以通过扩展实现预定义接口或使用 Java、Scala 的 lambda 表达式实现自定义的函数。

下面的代码示例展示了如何捕获会话时间范围内所有的点击流事件,并对每一次会话的点击量进行计数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 网站点击 Click 的数据流
DataStream<Click> clicks = ...

DataStream<Tuple2<String, Long>> result = clicks
// 将网站点击映射为 (userId, 1) 以便计数
.map(
// 实现 MapFunction 接口定义函数
new MapFunction<Click, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Click click) {
return Tuple2.of(click.userId, 1L);
}
})
// 以 userId (field 0) 作为 key
.keyBy(0)
// 定义 30 分钟超时的会话窗口
.window(EventTimeSessionWindows.withGap(Time.minutes(30L)))
// 对每个会话窗口的点击进行计数,使用 lambda 表达式定义 reduce 函数
.reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));

SQL & Table API

Flink 支持两种关系型的 API,Table API 和 SQL。这两个 API 都是批处理和流处理统一的 API,这意味着在无边界的实时数据流和有边界的历史记录数据流上,关系型 API 会以相同的语义执行查询,并产生相同的结果。Table API 和 SQL 借助了 Apache Calcite 来进行查询的解析,校验以及优化。它们可以与 DataStream 和 DataSet API 无缝集成,并支持用户自定义的标量函数,聚合函数以及表值函数。

Flink 的关系型 API 旨在简化数据分析数据流水线和 ETL 应用的定义。

下面的代码示例展示了如何使用 SQL 语句查询捕获会话时间范围内所有的点击流事件,并对每一次会话的点击量进行计数。此示例与上述 DataStream API 中的示例有着相同的逻辑。

1
2
3
SELECT userId, COUNT(*)
FROM clicks
GROUP BY SESSION(clicktime, INTERVAL '30' MINUTE), userId

Flink 具有数个适用于常见数据处理应用场景的扩展库。这些库通常嵌入在 API 中,且并不完全独立于其它 API。它们也因此可以受益于 API 的所有特性,并与其他库集成。

  • **复杂事件处理(CEP)**:模式检测是事件流处理中的一个非常常见的用例。Flink 的 CEP 库提供了 API,使用户能够以例如正则表达式或状态机的方式指定事件模式。CEP 库与 Flink 的 DataStream API 集成,以便在 DataStream 上评估模式。CEP 库的应用包括网络入侵检测,业务流程监控和欺诈检测。
  • **DataSet API*:DataSet API 是 Flink 用于批处理应用程序的核心 API。DataSet API 所提供的基础算子包括mapreduce_、(outer) join_、_co-group_、iterate*等。所有算子都有相应的算法和数据结构支持,对内存中的序列化数据进行操作。如果数据大小超过预留内存,则过量数据将存储到磁盘。Flink 的 DataSet API 的数据处理算法借鉴了传统数据库算法的实现,例如混合散列连接(hybrid hash-join)和外部归并排序(external merge-sort)。
  • Gelly: Gelly 是一个可扩展的图形处理和分析库。Gelly 是在 DataSet API 之上实现的,并与 DataSet API 集成。因此,它能够受益于其可扩展且健壮的操作符。Gelly 提供了内置算法,如 label propagation、triangle enumeration 和 page rank 算法,也提供了一个简化自定义图算法实现的 Graph API

参考资料

Flink ETL

Apache Flink 的一种常见应用场景是 ETL(抽取、转换、加载)管道任务。从一个或多个数据源获取数据,进行一些转换操作和信息补充,将结果存储起来。在这个教程中,我们将介绍如何使用 Flink 的 DataStream API 实现这类应用。

这里注意,Flink 的 Table 和 SQL API 完全可以满足很多 ETL 使用场景。但无论你最终是否直接使用 DataStream API,对这里介绍的基本知识有扎实的理解都是有价值的。

无状态的转换

本节涵盖了 map()flatmap(),这两种算子可以用来实现无状态转换的基本操作。

map()

在第一个练习中,你将过滤出租车行程数据中的事件。在同一代码仓库中,有一个 GeoUtils 类,提供了一个静态方法 GeoUtils.mapToGridCell(float lon, float lat),它可以将位置坐标(经度,维度)映射到 100x100 米的对应不同区域的网格单元。

现在让我们为每个出租车行程时间的数据对象增加 startCellendCell 字段。你可以创建一个继承 TaxiRideEnrichedRide 类,添加这些字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static class EnrichedRide extends TaxiRide {
public int startCell;
public int endCell;

public EnrichedRide() {}

public EnrichedRide(TaxiRide ride) {
this.rideId = ride.rideId;
this.isStart = ride.isStart;
...
this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
}

public String toString() {
return super.toString() + "," +
Integer.toString(this.startCell) + "," +
Integer.toString(this.endCell);
}
}

然后你可以创建一个应用来转换这个流

1
2
3
4
5
6
7
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));

DataStream<EnrichedRide> enrichedNYCRides = rides
.filter(new RideCleansingSolution.NYCFilter())
.map(new Enrichment());

enrichedNYCRides.print();

使用这个 MapFunction:

1
2
3
4
5
6
7
public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {

@Override
public EnrichedRide map(TaxiRide taxiRide) throws Exception {
return new EnrichedRide(taxiRide);
}
}

flatmap()

MapFunction 只适用于一对一的转换:对每个进入算子的流元素,map() 将仅输出一个转换后的元素。对于除此以外的场景,你将要使用 flatmap()

1
2
3
4
5
6
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));

DataStream<EnrichedRide> enrichedNYCRides = rides
.flatMap(new NYCEnrichment());

enrichedNYCRides.print();

其中用到的 FlatMapFunction :

1
2
3
4
5
6
7
8
9
10
public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {

@Override
public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
if (valid.filter(taxiRide)) {
out.collect(new EnrichedRide(taxiRide));
}
}
}

使用接口中提供的 Collectorflatmap() 可以输出你想要的任意数量的元素,也可以一个都不发。

Keyed Streams

keyBy()

将一个流根据其中的一些属性来进行分区是十分有用的,这样我们可以使所有具有相同属性的事件分到相同的组里。例如,如果你想找到从每个网格单元出发的最远的出租车行程。按 SQL 查询的方式来考虑,这意味着要对 startCell 进行 GROUP BY 再排序,在 Flink 中这部分可以用 keyBy(KeySelector) 实现。

1
2
3
rides
.flatMap(new NYCEnrichment())
.keyBy(enrichedRide -> enrichedRide.startCell)

每个 keyBy 会通过 shuffle 来为数据流进行重新分区。总体来说这个开销是很大的,它涉及网络通信、序列化和反序列化。

keyBy and network shuffle

通过计算得到键

KeySelector 不仅限于从事件中抽取键。你也可以按想要的方式计算得到键值,只要最终结果是确定的,并且实现了 hashCode()equals()。这些限制条件不包括产生随机数或者返回 Arrays 或 Enums 的 KeySelector,但你可以用元组和 POJO 来组成键,只要他们的元素遵循上述条件。

键必须按确定的方式产生,因为它们会在需要的时候被重新计算,而不是一直被带在流记录中。

例如,比起创建一个新的带有 startCell 字段的 EnrichedRide 类,用这个字段作为 key:

1
keyBy(enrichedRide -> enrichedRide.startCell)

我们更倾向于这样做:

1
keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))

Keyed Stream 的聚合

以下代码为每个行程结束事件创建了一个新的包含 startCell 和时长(分钟)的元组流:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.joda.time.Interval;

DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
.flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {

@Override
public void flatMap(EnrichedRide ride,
Collector<Tuple2<Integer, Minutes>> out) throws Exception {
if (!ride.isStart) {
Interval rideInterval = new Interval(ride.startTime, ride.endTime);
Minutes duration = rideInterval.toDuration().toStandardMinutes();
out.collect(new Tuple2<>(ride.startCell, duration));
}
}
});

现在就可以产生一个流,对每个 startCell 仅包含那些最长行程的数据。

有很多种方法表示使用哪个字段作为键。前面使用 EnrichedRide POJO 的例子,用字段名来指定键。而这个使用 Tuple2 对象的例子中,用字段在元组中的序号(从 0 开始)来指定键。

1
2
3
4
minutesByStartCell
.keyBy(value -> value.f0) // .keyBy(value -> value.startCell)
.maxBy(1) // duration
.print();

现在每次行程时长达到新的最大值,都会输出一条新记录,例如下面这个对应 50797 网格单元的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
...
4> (64549,5M)
4> (46298,18M)
1> (51549,14M)
1> (53043,13M)
1> (56031,22M)
1> (50797,6M)
...
1> (50797,8M)
...
1> (50797,11M)
...
1> (50797,12M)

(隐式的)状态

这是培训中第一个涉及到有状态流的例子。尽管状态的处理是透明的,Flink 必须跟踪每个不同的键的最大时长。

只要应用中有状态,你就应该考虑状态的大小。如果键值的数量是无限的,那 Flink 的状态需要的空间也同样是无限的。

在流处理场景中,考虑有限窗口的聚合往往比整个流聚合更有意义。

reduce() 和其他聚合算子

上面用到的 maxBy() 只是 Flink 中 KeyedStream 上众多聚合函数中的一个。还有一个更通用的 reduce() 函数可以用来实现你的自定义聚合。

有状态的转换

在 Flink 不参与管理状态的情况下,你的应用也可以使用状态,但 Flink 为其管理状态提供了一些引人注目的特性:

  • 本地性: Flink 状态是存储在使用它的机器本地的,并且可以以内存访问速度来获取
  • 持久性: Flink 状态是容错的,例如,它可以自动按一定的时间间隔产生 checkpoint,并且在任务失败后进行恢复
  • 纵向可扩展性: Flink 状态可以存储在集成的 RocksDB 实例中,这种方式下可以通过增加本地磁盘来扩展空间
  • 横向可扩展性: Flink 状态可以随着集群的扩缩容重新分布
  • 可查询性: Flink 状态可以通过使用 状态查询 API 从外部进行查询。

在本节中你将学习如何使用 Flink 的 API 来管理 keyed state。

Rich Functions

至此,你已经看到了 Flink 的几种函数接口,包括 FilterFunctionMapFunction,和 FlatMapFunction。这些都是单一抽象方法模式。

对其中的每一个接口,Flink 同样提供了一个所谓 “rich” 的变体,如 RichFlatMapFunction,其中增加了以下方法,包括:

  • open(Configuration c)
  • close()
  • getRuntimeContext()

open() 仅在算子初始化时调用一次。可以用来加载一些静态数据,或者建立外部服务的链接等。

getRuntimeContext() 为整套潜在有趣的东西提供了一个访问途径,最明显的,它是你创建和访问 Flink 状态的途径。

一个使用 Keyed State 的例子

在这个例子里,想象你有一个要去重的事件数据流,对每个键只保留第一个事件。下面是完成这个功能的应用,使用一个名为 DeduplicatorRichFlatMapFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static class Event {
public final String key;
public final long timestamp;
...
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(new EventSource())
.keyBy(e -> e.key)
.flatMap(new Deduplicator())
.print();

env.execute();
}

为了实现这个功能,Deduplicator 需要记录每个键是否已经有了相应的记录。它将通过使用 Flink 的 keyed state 接口来做这件事。

当你使用像这样的 keyed stream 的时候,Flink 会为每个状态中管理的条目维护一个键值存储。

Flink 支持几种不同方式的 keyed state,这个例子使用的是最简单的一个,叫做 ValueState。意思是对于 每个键 ,Flink 将存储一个单一的对象 —— 在这个例子中,存储的是一个 Boolean 类型的对象。

我们的 Deduplicator 类有两个方法:open()flatMap()open() 方法通过定义 ValueStateDescriptor<Boolean> 建立了管理状态的使用。构造器的参数定义了这个状态的名字(“keyHasBeenSeen”),并且为如何序列化这些对象提供了信息(在这个例子中的 Types.BOOLEAN)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
ValueState<Boolean> keyHasBeenSeen;

@Override
public void open(Configuration conf) {
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
keyHasBeenSeen = getRuntimeContext().getState(desc);
}

@Override
public void flatMap(Event event, Collector<Event> out) throws Exception {
if (keyHasBeenSeen.value() == null) {
out.collect(event);
keyHasBeenSeen.update(true);
}
}
}

当 flatMap 方法调用 keyHasBeenSeen.value() 时,Flink 会在 当前键的上下文 中检索状态值,只有当状态为 null 时,才会输出当前事件。这种情况下,它同时也将更新 keyHasBeenSeentrue

这种访问和更新按键分区的状态的机制也许看上去很神奇,因为在 Deduplicator 的实现中,键不是明确可见的。当 Flink 运行时调用 RichFlatMapFunctionopen 方法时, 是没有事件的,所以这个时候上下文中不含有任何键。但当它调用 flatMap 方法,被处理的事件的键在运行时中就是可用的了,并且被用来确定操作哪个 Flink 状态后端的入口。

部署在分布式集群时,将会有很多 Deduplicator 的实例,每一个实例将负责整个键空间的互斥子集中的一个。所以,当你看到一个单独的 ValueState,比如

1
ValueState<Boolean> keyHasBeenSeen;

要理解这个代表的不仅仅是一个单独的布尔类型变量,而是一个分布式的共享键值存储。

清理状态

上面例子有一个潜在的问题:当键空间是无界的时候将发生什么?Flink 会对每个使用过的键都存储一个 Boolean 类型的实例。如果是键是有限的集合还好,但在键无限增长的应用中,清除再也不会使用的状态是很必要的。这通过在状态对象上调用 clear() 来实现,如下:

1
keyHasBeenSeen.clear()

对一个给定的键值,你也许想在它一段时间不使用后来做这件事。当学习 ProcessFunction 的相关章节时,你将看到在事件驱动的应用中怎么用定时器来做这个。

也可以选择使用 状态的过期时间(TTL),为状态描述符配置你想要旧状态自动被清除的时间。

Non-keyed State

在没有键的上下文中我们也可以使用 Flink 管理的状态。这也被称作 算子的状态。它包含的接口是很不一样的,由于对用户定义的函数来说使用 non-keyed state 是不太常见的,所以这里就不多介绍了。这个特性最常用于 source 和 sink 的实现。

Connected Streams

相比于下面这种预先定义的转换:

simple transformation

有时你想要更灵活地调整转换的某些功能,比如数据流的阈值、规则或者其他参数。Flink 支持这种需求的模式称为 connected streams ,一个单独的算子有两个输入流。

connected streams

connected stream 也可以被用来实现流的关联。

示例

在这个例子中,一个控制流是用来指定哪些词需要从 streamOfWords 里过滤掉的。 一个称为 ControlFunctionRichCoFlatMapFunction 作用于连接的流来实现这个功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> control = env
.fromElements("DROP", "IGNORE")
.keyBy(x -> x);

DataStream<String> streamOfWords = env
.fromElements("Apache", "DROP", "Flink", "IGNORE")
.keyBy(x -> x);

control
.connect(streamOfWords)
.flatMap(new ControlFunction())
.print();

env.execute();
}

这里注意两个流只有键一致的时候才能连接。 keyBy 的作用是将流数据分区,当 keyed stream 被连接时,他们必须按相同的方式分区。这样保证了两个流中所有键相同的事件发到同一个实例上。这样也使按键关联两个流成为可能。

在这个例子中,两个流都是 DataStream<String> 类型的,并且都将字符串作为键。正如你将在下面看到的,RichCoFlatMapFunction 在状态中存了一个布尔类型的变量,这个变量被两个流共享。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private ValueState<Boolean> blocked;

@Override
public void open(Configuration config) {
blocked = getRuntimeContext()
.getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}

@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}

@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(data_value);
}
}
}

RichCoFlatMapFunction 是一种可以被用于一对连接流的 FlatMapFunction,并且它可以调用 rich function 的接口。这意味着它可以是有状态的。

布尔变量 blocked 被用于记录在数据流 control 中出现过的键(在这个例子中是单词),并且这些单词从 streamOfWords 过滤掉。这是 keyed state,并且它是被两个流共享的,这也是为什么两个流必须有相同的键值空间。

在 Flink 运行时中,flatMap1flatMap2 在连接流有新元素到来时被调用 —— 在我们的例子中,control 流中的元素会进入 flatMap1streamOfWords 中的元素会进入 flatMap2。这是由两个流连接的顺序决定的,本例中为 control.connect(streamOfWords)

认识到你没法控制 flatMap1flatMap2 的调用顺序是很重要的。这两个输入流是相互竞争的关系,Flink 运行时将根据从一个流或另一个流中消费的事件做它要做的。对于需要保证时间和/或顺序的场景,你会发现在 Flink 的管理状态中缓存事件一直到它们能够被处理是必须的。(注意:如果你真的感到绝望,可以使用自定义的算子实现 InputSelectable 接口,在两输入算子消费它的输入流时增加一些顺序上的限制。)

参考资料

Flink 事件驱动

处理函数(Process Functions)

简介

ProcessFunction 将事件处理与 Timer,State 结合在一起,使其成为流处理应用的强大构建模块。 这是使用 Flink 创建事件驱动应用程序的基础。它和 RichFlatMapFunction 十分相似, 但是增加了 Timer。

示例

如果你已经体验了 流式分析训练动手实践, 你应该记得,它是采用 TumblingEventTimeWindow 来计算每个小时内每个司机的小费总和, 像下面的示例这样:

1
2
3
4
5
// 计算每个司机每小时的小费总和
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.process(new AddTips());

使用 KeyedProcessFunction 去实现相同的操作更加直接且更有学习意义。 让我们开始用以下代码替换上面的代码:

1
2
3
4
// 计算每个司机每小时的小费总和
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.process(new PseudoWindow(Time.hours(1)));

在这个代码片段中,一个名为 PseudoWindowKeyedProcessFunction 被应用于 KeyedStream, 其结果是一个 DataStream<Tuple3<Long, Long, Float>> (与使用 Flink 内置时间窗口的实现生成的流相同)。

PseudoWindow 的总体轮廓示意如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 在时长跨度为一小时的窗口中计算每个司机的小费总和。
// 司机ID作为 key。
public static class PseudoWindow extends
KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {

private final long durationMsec;

public PseudoWindow(Time duration) {
this.durationMsec = duration.toMilliseconds();
}

@Override
// 在初始化期间调用一次。
public void open(Configuration conf) {
. . .
}

@Override
// 每个票价事件(TaxiFare-Event)输入(到达)时调用,以处理输入的票价事件。
public void processElement(
TaxiFare fare,
Context ctx,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {

. . .
}

@Override
// 当当前水印(watermark)表明窗口现在需要完成的时候调用。
public void onTimer(long timestamp,
OnTimerContext context,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {

. . .
}
}

注意事项:

  • 有几种类型的 ProcessFunctions – 不仅包括 KeyedProcessFunction,还包括 CoProcessFunctionsBroadcastProcessFunctions 等.
  • KeyedProcessFunction 是一种 RichFunction。作为 RichFunction,它可以访问使用 Managed Keyed State 所需的 opengetRuntimeContext 方法。
  • 有两个回调方法须要实现: processElementonTimer。每个输入事件都会调用 processElement 方法; 当计时器触发时调用 onTimer。它们可以是基于事件时间(event time)的 timer,也可以是基于处理时间(processing time)的 timer。 除此之外,processElementonTimer 都提供了一个上下文对象,该对象可用于与 TimerService 交互。 这两个回调还传递了一个可用于发出结果的 Collector

open() 方法

1
2
3
4
5
6
7
8
9
10
11
// 每个窗口都持有托管的 Keyed state 的入口,并且根据窗口的结束时间执行 keyed 策略。
// 每个司机都有一个单独的MapState对象。
private transient MapState<Long, Float> sumOfTips;

@Override
public void open(Configuration conf) {

MapStateDescriptor<Long, Float> sumDesc =
new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
sumOfTips = getRuntimeContext().getMapState(sumDesc);
}

由于票价事件(fare-event)可能会乱序到达,有时需要在计算输出前一个小时结果前,处理下一个小时的事件。 这样能够保证“乱序造成的延迟数据”得到正确处理(放到前一个小时中)。 实际上,如果 Watermark 延迟比窗口长度长得多,则可能有多个窗口同时打开,而不仅仅是两个。 此实现通过使用 MapState 来支持处理这一点,该 MapState 将每个窗口的结束时间戳映射到该窗口的小费总和。

processElement() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void processElement(
TaxiFare fare,
Context ctx,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {

long eventTime = fare.getEventTime();
TimerService timerService = ctx.timerService();

if (eventTime <= timerService.currentWatermark()) {
// 事件延迟;其对应的窗口已经触发。
} else {
// 将 eventTime 向上取值并将结果赋值到包含当前事件的窗口的末尾时间点。
long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);

// 在窗口完成时将启用回调
timerService.registerEventTimeTimer(endOfWindow);

// 将此票价的小费添加到该窗口的总计中。
Float sum = sumOfTips.get(endOfWindow);
if (sum == null) {
sum = 0.0F;
}
sum += fare.tip;
sumOfTips.put(endOfWindow, sum);
}
}

需要考虑的事项:

  • 延迟的事件怎么处理?watermark 后面的事件(即延迟的)正在被删除。 如果你想做一些比这更高级的操作,可以考虑使用旁路输出(Side outputs),这将在下一节中解释。
  • 本例使用一个 MapState,其中 keys 是时间戳(timestamp),并为同一时间戳设置一个 Timer。 这是一种常见的模式;它使得在 Timer 触发时查找相关信息变得简单高效。

onTimer() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public void onTimer(
long timestamp,
OnTimerContext context,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {

long driverId = context.getCurrentKey();
// 查找刚结束的一小时结果。
Float sumOfTips = this.sumOfTips.get(timestamp);

Tuple3<Long, Long, Float> result = Tuple3.of(driverId, timestamp, sumOfTips);
out.collect(result);
this.sumOfTips.remove(timestamp);
}

注意:

  • 传递给 onTimerOnTimerContext context 可用于确定当前 key。
  • 我们的 pseudo-windows 在当前 Watermark 到达每小时结束时触发,此时调用 onTimer。 这个 onTimer 方法从 sumOfTips 中删除相关的条目,这样做的效果是不可能容纳延迟的事件。 这相当于在使用 Flink 的时间窗口时将 allowedLateness 设置为零。

性能考虑

Flink 提供了为 RocksDB 优化的 MapStateListState 类型。 相对于 ValueState,更建议使用 MapStateListState,因为使用 RocksDBStateBackend 的情况下, MapStateListStateValueState 性能更好。 RocksDBStateBackend 可以附加到 ListState,而无需进行(反)序列化, 对于 MapState,每个 key/value 都是一个单独的 RocksDB 对象,因此可以有效地访问和更新 MapState

旁路输出(Side Outputs)

简介

有几个很好的理由希望从 Flink 算子获得多个输出流,如下报告条目:

  • 异常情况(exceptions)
  • 格式错误的事件(malformed events)
  • 延迟的事件(late events)
  • operator 告警(operational alerts),如与外部服务的连接超时

旁路输出(Side outputs)是一种方便的方法。除了错误报告之外,旁路输出也是实现流的 n 路分割的好方法。

示例

现在你可以对上一节中忽略的延迟事件执行某些操作。

Side output channel 与 OutputTag<T> 相关联。这些标记拥有自己的名称,并与对应 DataStream 类型一致。

1
private static final OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {};

上面显示的是一个静态 OutputTag<TaxiFare> ,当在 PseudoWindowprocessElement 方法中发出延迟事件时,可以引用它:

1
2
3
4
5
6
if (eventTime <= timerService.currentWatermark()) {
// 事件延迟,其对应的窗口已经触发。
ctx.output(lateFares, fare);
} else {
. . .
}

以及当在作业的 main 中从该旁路输出访问流时:

1
2
3
4
5
6
// 计算每个司机每小时的小费总和
SingleOutputStreamOperator hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.process(new PseudoWindow(Time.hours(1)));

hourlyTips.getSideOutput(lateFares).print();

或者,可以使用两个同名的 OutputTag 来引用同一个旁路输出,但如果这样做,它们必须具有相同的类型。

结语

在本例中,你已经了解了如何使用 ProcessFunction 重新实现一个简单的时间窗口。 当然,如果 Flink 内置的窗口 API 能够满足你的开发需求,那么一定要优先使用它。 但如果你发现自己在考虑用 Flink 的窗口做些错综复杂的事情,不要害怕自己动手。

此外,ProcessFunctions 对于计算分析之外的许多其他用例也很有用。 下面的实践练习提供了一个完全不同的例子。

ProcessFunctions 的另一个常见用例是清理过时 State。如果你回想一下 Rides and Fares Exercise , 其中使用 RichCoFlatMapFunction 来计算简单 Join,那么示例方案假设 TaxiRides 和 TaxiFares 两个事件是严格匹配为一个有效 数据对(必须同时出现)并且每一组这样的有效数据对都和一个唯一的 rideId 严格对应。如果数据对中的某个 TaxiRides 事件(TaxiFares 事件) 丢失,则同一 rideId 对应的另一个出现的 TaxiFares 事件(TaxiRides 事件)对应的 State 则永远不会被清理掉。 所以这里可以使用 KeyedCoProcessFunction 的实现代替它(RichCoFlatMapFunction),并且可以使用计时器来检测和清除任何过时 的 State。

参考资料

Flink 架构

Apache Flink 是一个分布式系统,它需要计算资源来执行应用程序。Flink 集成了所有常见的集群资源管理器,例如 Hadoop YARNApache MesosKubernetes,但同时也可以作为独立集群运行。

Flink 被设计为能够很好地工作在上述每个资源管理器中,这是通过资源管理器特定(resource-manager-specific)的部署模式实现的。Flink 可以采用与当前资源管理器相适应的方式进行交互。

部署 Flink 应用程序时,Flink 会根据应用程序配置的并行性自动标识所需的资源,并从资源管理器请求这些资源。在发生故障的情况下,Flink 通过请求新资源来替换发生故障的容器。提交或控制应用程序的所有通信都是通过 REST 调用进行的,这可以简化 Flink 与各种环境中的集成。

运行任意规模应用

Flink 旨在任意规模上运行有状态流式应用。因此,应用程序被并行化为可能数千个任务,这些任务分布在集群中并发执行。所以应用程序能够充分利用无尽的 CPU、内存、磁盘和网络 IO。而且 Flink 很容易维护非常大的应用程序状态。其异步和增量的检查点算法对处理延迟产生最小的影响,同时保证精确一次状态的一致性。

Flink 用户报告了其生产环境中一些令人印象深刻的扩展性数字

  • 处理每天处理数万亿的事件,
  • 应用维护几 TB 大小的状态
  • 应用在数千个内核上运行

利用内存性能

有状态的 Flink 程序针对本地状态访问进行了优化。任务的状态始终保留在内存中,如果状态大小超过可用内存,则会保存在能高效访问的磁盘数据结构中。任务通过访问本地(通常在内存中)状态来进行所有的计算,从而产生非常低的处理延迟。Flink 通过定期和异步地对本地状态进行持久化存储来保证故障场景下精确一次的状态一致性。

img

Flink 运行时由两种类型的进程组成:一个 JobManager 和一个或者多个 **TaskManager**。

The processes involved in executing a Flink dataflow

Client 不是运行时和程序执行的一部分,而是用于准备数据流并将其发送给 JobManager。之后,客户端可以断开连接(_分离模式_),或保持连接来接收进程报告(_附加模式_)。客户端可以作为触发执行 Java/Scala 程序的一部分运行,也可以在命令行进程 ./bin/flink run ... 中运行。

可以通过多种方式启动 JobManagerTaskManager:直接在机器上作为 standalone 集群启动、在容器中启动、或者通过 YARN 等资源框架管理并启动。TaskManager 连接到 JobManager,宣布自己可用,并被分配工作。

JobManager

JobManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责,它决定

  • 何时调度下一个 task(或一组 task)
  • 对完成的 task 或执行失败做出反应
  • 协调 checkpoint
  • 协调从失败中恢复
  • 等等

JobManager 由三个不同的组件组成:

  • ResourceManager:负责 Flink 集群中的资源提供、回收、分配 - 它管理 task slots,这是 Flink 集群中资源调度的单位(请参考 TaskManager。Flink 为不同的环境和资源提供者(例如 YARN、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。在 standalone 设置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行启动新的 TaskManager。

  • Dispatcher:提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执行信息。

  • JobMaster:负责管理单个 JobGraph 的执行。Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster。

始终至少有一个 JobManager。高可用(HA)设置中可能有多个 JobManager,其中一个始终是 _leader_,其他的则是 _standby_(请参考 高可用(HA))。

TaskManager

_TaskManager_(也称为 _worker_)执行作业流的 task,并且缓存和交换数据流。

必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task _slot_。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子(请参考Tasks 和算子链)。

Tasks 和算子链

对于分布式执行,Flink 将算子的 subtasks 链接成 _tasks_。每个 task 由一个线程执行。将算子链接成 task 是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量。链行为是可以配置的;请参考链文档以获取详细信息。

下图中样例数据流用 5 个 subtask 执行,因此有 5 个并行线程。

Operator chaining into Tasks

Task Slots 和资源

每个 worker(TaskManager)都是一个 _JVM 进程_,可以在单独的线程中执行一个或多个 subtask。为了控制一个 TaskManager 中接受多少个 task,就有了所谓的 task slots(至少一个)。

每个 task slot 代表 TaskManager 中资源的固定子集。例如,具有 3 个 slot 的 TaskManager,会将其托管内存 1/3 用于每个 slot。分配资源意味着 subtask 不会与其他作业的 subtask 竞争托管内存,而是具有一定数量的保留托管内存。注意此处没有 CPU 隔离;当前 slot 仅分离 task 的托管内存。

通过调整 task slot 的数量,用户可以定义 subtask 如何互相隔离。每个 TaskManager 有一个 slot,这意味着每个 task 组都在单独的 JVM 中运行(例如,可以在单独的容器中启动)。具有多个 slot 意味着更多 subtask 共享同一 JVM。同一 JVM 中的 task 共享 TCP 连接(通过多路复用)和心跳信息。它们还可以共享数据集和数据结构,从而减少了每个 task 的开销。

A TaskManager with Task Slots and Tasks

默认情况下,Flink 允许 subtask 共享 slot,即便它们是不同的 task 的 subtask,只要是来自于同一作业即可。结果就是一个 slot 可以持有整个作业管道。允许 slot 共享有两个主要优点:

  • Flink 集群所需的 task slot 和作业中使用的最大并行度恰好一样。无需计算程序总共包含多少个 task(具有不同并行度)。
  • 容易获得更好的资源利用。如果没有 slot 共享,非密集 subtask(_source/map()_)将阻塞和密集型 subtask(_window_) 一样多的资源。通过 slot 共享,我们示例中的基本并行度从 2 增加到 6,可以充分利用分配的资源,同时确保繁重的 subtask 在 TaskManager 之间公平分配。

TaskManagers with shared Task Slots

Flink 应用程序 是从其 main() 方法产生的一个或多个 Flink 作业的任何用户程序。这些作业的执行可以在本地 JVM(LocalEnvironment)中进行,或具有多台机器的集群的远程设置(RemoteEnvironment)中进行。对于每个程序,ExecutionEnvironment 提供了一些方法来控制作业执行(例如设置并行度)并与外界交互(请参考 Flink 程序剖析 )。

Flink 应用程序的作业可以被提交到长期运行的 Flink Session 集群、专用的 Flink Job 集群Flink Application 集群。这些选项之间的差异主要与集群的生命周期和资源隔离保证有关。

  • 集群生命周期:在 Flink Session 集群中,客户端连接到一个预先存在的、长期运行的集群,该集群可以接受多个作业提交。即使所有作业完成后,集群(和 JobManager)仍将继续运行直到手动停止 session 为止。因此,Flink Session 集群的寿命不受任何 Flink 作业寿命的约束。
  • 资源隔离:TaskManager slot 由 ResourceManager 在提交作业时分配,并在作业完成时释放。由于所有作业都共享同一集群,因此在集群资源方面存在一些竞争 — 例如提交工作阶段的网络带宽。此共享设置的局限性在于,如果 TaskManager 崩溃,则在此 TaskManager 上运行 task 的所有作业都将失败;类似的,如果 JobManager 上发生一些致命错误,它将影响集群中正在运行的所有作业。
  • 其他注意事项:拥有一个预先存在的集群可以节省大量时间申请资源和启动 TaskManager。有种场景很重要,作业执行时间短并且启动时间长会对端到端的用户体验产生负面的影响 — 就像对简短查询的交互式分析一样,希望作业可以使用现有资源快速执行计算。

以前,Flink Session 集群也被称为 session 模式下的 Flink 集群。

  • 集群生命周期:在 Flink Job 集群中,可用的集群管理器(例如 YARN)用于为每个提交的作业启动一个集群,并且该集群仅可用于该作业。在这里,客户端首先从集群管理器请求资源启动 JobManager,然后将作业提交给在这个进程中运行的 Dispatcher。然后根据作业的资源请求惰性的分配 TaskManager。一旦作业完成,Flink Job 集群将被拆除。
  • 资源隔离:JobManager 中的致命错误仅影响在 Flink Job 集群中运行的一个作业。
  • 其他注意事项:由于 ResourceManager 必须应用并等待外部资源管理组件来启动 TaskManager 进程和分配资源,因此 Flink Job 集群更适合长期运行、具有高稳定性要求且对较长的启动时间不敏感的大型作业。

以前,Flink Job 集群也被称为 job (or per-job) 模式下的 Flink 集群。

Kubernetes 不支持 Flink Job 集群。 请参考 Standalone KubernetesNative Kubernetes

  • 集群生命周期:Flink Application 集群是专用的 Flink 集群,仅从 Flink 应用程序执行作业,并且 main()方法在集群上而不是客户端上运行。提交作业是一个单步骤过程:无需先启动 Flink 集群,然后将作业提交到现有的 session 集群;相反,将应用程序逻辑和依赖打包成一个可执行的作业 JAR 中,并且集群入口(ApplicationClusterEntryPoint)负责调用 main()方法来提取 JobGraph。例如,这允许你像在 Kubernetes 上部署任何其他应用程序一样部署 Flink 应用程序。因此,Flink Application 集群的寿命与 Flink 应用程序的寿命有关。
  • 资源隔离:在 Flink Application 集群中,ResourceManager 和 Dispatcher 作用于单个的 Flink 应用程序,相比于 Flink Session 集群,它提供了更好的隔离。

Flink Job 集群可以看做是 Flink Application 集群”客户端运行“的替代方案。

默认情况下,每个 Flink 集群只有一个 JobManager 实例。这会导致 _单点故障(SPOF)_:如果 JobManager 崩溃,则不能提交任何新程序,运行中的程序也会失败。

使用 JobManager 高可用模式,你可以从 JobManager 失败中恢复,从而消除单点故障。

如何启用集群高可用

JobManager 高可用是指,在任何时候都有一个 JobManager Leader,如果 Leader 出现故障,则有多个备用 JobManager 来接管领导。这解决了单点故障问题,只要有备用 JobManager 担任领导者,程序就可以继续运行。

如下是一个使用三个 JobManager 实例的例子:

img

Flink 的 高可用服务 封装了所需的服务,使一切可以正常工作:

  • 领导者选举:从 n 个候选者中选出一个领导者
  • 服务发现:检索当前领导者的地址
  • 状态持久化:继承程序恢复作业所需的持久化状态(JobGraphs、用户代码 jar、已完成的检查点)

Flink 提供了两种高可用服务实现:

  • ZooKeeper:每个 Flink 集群部署都可以使用 ZooKeeper HA 服务。它们需要一个运行的 ZooKeeper 复制组(quorum)。
  • Kubernetes:Kubernetes HA 服务只能运行在 Kubernetes 上。

高可用数据生命周期

为了恢复提交的作业,Flink 持久化元数据和 job 组件。高可用数据将一直保存,直到相应的作业执行成功、被取消或最终失败。当这些情况发生时,将删除所有高可用数据,包括存储在高可用服务中的元数据。

参考资料

Flink 简介

关键概念:源源不断的流式数据处理、事件时间、有状态流处理和状态快照

流处理

任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。

数据可以被作为 无界 或者 有界 流来处理。

  1. 无界流 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
  2. 有界流 有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。

Bounded and unbounded streams

Apache Flink 擅长处理无界和有界数据集 精确的时间控制和状态化使得 Flink 的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。

批处理是有界数据流处理的范例。在这种模式下,你可以选择在计算结果输出之前输入整个数据集,这也就意味着你可以对整个数据集的数据进行排序、统计或汇总计算后再输出结果。

流处理正相反,其涉及无界数据流。至少理论上来说,它的数据输入永远不会结束,因此程序必须持续不断地对到达的数据进行处理。

在 Flink 中,应用程序由用户自定义算子转换而来的流式 dataflows 所组成。这些流式 dataflows 形成了有向图,以一个或多个(source)开始,并以一个或多个(sink)结束。

A DataStream program, and its dataflow.

通常,程序代码中的 transformation 和 dataflow 中的算子(operator)之间是一一对应的。但有时也会出现一个 transformation 包含多个算子的情况,如上图所示。

Flink 应用程序可以消费来自消息队列或分布式日志这类流式数据源(例如 Apache Kafka 或 Kinesis)的实时数据,也可以从各种的数据源中消费有界的历史数据。同样,Flink 应用程序生成的结果流也可以发送到各种数据汇中。

Flink application with sources and sinks

并行 Dataflows

Flink 程序本质上是分布式并行程序。在程序执行期间,一个流有一个或多个流分区(Stream Partition),每个算子有一个或多个算子子任务(Operator Subtask)。每个子任务彼此独立,并在不同的线程中运行,或在不同的计算机或容器中运行。

算子子任务数就是其对应算子的并行度。在同一程序中,不同算子也可能具有不同的并行度。

A parallel dataflow

Flink 算子之间可以通过一对一(_直传_)模式或重新分发模式传输数据:

  • 一对一模式(例如上图中的 Sourcemap() 算子之间)可以保留元素的分区和顺序信息。这意味着 map() 算子的 subtask[1] 输入的数据以及其顺序与 Source 算子的 subtask[1] 输出的数据和顺序完全相同,即同一分区的数据只会进入到下游算子的同一分区。
  • 重新分发模式(例如上图中的 map()keyBy/window 之间,以及 keyBy/windowSink 之间)则会更改数据所在的流分区。当你在程序中选择使用不同的 _transformation_,每个算子子任务也会根据不同的 transformation 将数据发送到不同的目标子任务。例如以下这几种 transformation 和其对应分发数据的模式:_keyBy()_(通过散列键重新分区)、_broadcast()_(广播)或 _rebalance()(随机重新分发)。在重新分发数据的过程中,元素只有在每对输出和输入子任务之间才能保留其之间的顺序信息(例如,_keyBy/window 的 subtask[2] 接收到的 map() 的 subtask[1] 中的元素都是有序的)。因此,上图所示的 keyBy/windowSink 算子之间数据的重新分发时,不同键(key)的聚合结果到达 Sink 的顺序是不确定的。

自定义时间流处理

对于大多数流数据处理应用程序而言,能够使用处理实时数据的代码重新处理历史数据并产生确定并一致的结果非常有价值。

在处理流式数据时,我们通常更需要关注事件本身发生的顺序而不是事件被传输以及处理的顺序,因为这能够帮助我们推理出一组事件(事件集合)是何时发生以及结束的。例如电子商务交易或金融交易中涉及到的事件集合。

为了满足上述这类的实时流处理场景,我们通常会使用记录在数据流中的事件时间的时间戳,而不是处理数据的机器时钟的时间戳。

有状态流处理

Flink 中的算子可以是有状态的。这意味着如何处理一个事件可能取决于该事件之前所有事件数据的累积结果。Flink 中的状态不仅可以用于简单的场景(例如统计仪表板上每分钟显示的数据),也可以用于复杂的场景(例如训练作弊检测模型)。

Flink 应用程序可以在分布式集群上并行运行,其中每个算子的各个并行实例会在单独的线程中独立运行,并且通常情况下是会在不同的机器上运行。

有状态算子的并行实例组在存储其对应状态时通常是按照键(key)进行分片存储的。每个并行实例算子负责处理一组特定键的事件数据,并且这组键对应的状态会保存在本地。

如下图的 Flink 作业,其前三个算子的并行度为 2,最后一个 sink 算子的并行度为 1,其中第三个算子是有状态的,并且你可以看到第二个算子和第三个算子之间是全互联的(fully-connected),它们之间通过网络进行数据分发。通常情况下,实现这种类型的 Flink 程序是为了通过某些键对数据流进行分区,以便将需要一起处理的事件进行汇合,然后做统一计算处理。

State is sharded

Flink 应用程序的状态访问都在本地进行,因为这有助于其提高吞吐量和降低延迟。通常情况下 Flink 应用程序都是将状态存储在 JVM 堆上,但如果状态太大,我们也可以选择将其以结构化数据格式存储在高速磁盘中。

State is local

通过状态快照实现的容错

通过状态快照和流重放两种方式的组合,Flink 能够提供可容错的,精确一次计算的语义。这些状态快照在执行时会获取并存储分布式 pipeline 中整体的状态,它会将数据源中消费数据的偏移量记录下来,并将整个 job graph 中算子获取到该数据(记录的偏移量对应的数据)时的状态记录并存储下来。当发生故障时,Flink 作业会恢复上次存储的状态,重置数据源从状态中记录的上次消费的偏移量开始重新进行消费处理。而且状态快照在执行时会异步获取状态并存储,并不会阻塞正在进行的数据处理逻辑。

状态

只有在每一个单独的事件上进行转换操作的应用才不需要状态,换言之,每一个具有一定复杂度的流处理应用都是有状态的。任何运行基本业务逻辑的流处理应用都需要在一定时间内存储所接收的事件或中间结果,以供后续的某个时间点(例如收到下一个事件或者经过一段特定时间)进行访问并进行后续处理。

img

应用状态是 Flink 中的一等公民,Flink 提供了许多状态管理相关的特性支持,其中包括:

  • 多种状态基础类型:Flink 为多种不同的数据结构提供了相对应的状态基础类型,例如原子值(value),列表(list)以及映射(map)。开发者可以基于处理函数对状态的访问方式,选择最高效、最适合的状态基础类型。
  • 插件化的 State Backend:State Backend 负责管理应用程序状态,并在需要的时候进行 checkpoint。Flink 支持多种 state backend,可以将状态存在内存或者 RocksDB。RocksDB 是一种高效的嵌入式、持久化键值存储引擎。Flink 也支持插件式的自定义 state backend 进行状态存储。
  • 精确一次语义:Flink 的 checkpoint 和故障恢复算法保证了故障发生后应用状态的一致性。因此,Flink 能够在应用程序发生故障时,对应用程序透明,不造成正确性的影响。
  • 超大数据量状态:Flink 能够利用其异步以及增量式的 checkpoint 算法,存储数 TB 级别的应用状态。
  • 可弹性伸缩的应用:Flink 能够通过在更多或更少的工作节点上对状态进行重新分布,支持有状态应用的分布式的横向伸缩。

时间

时间语义

Flink 支持以下三种时间语义:

  • **事件时间(event time)**: 事件产生的时间,记录的是设备生产(或者存储)事件的时间
  • **摄取时间(ingestion time)**: Flink 读取事件时记录的时间
  • **处理时间(processing time)**: Flink pipeline 中具体算子处理事件的时间

为了获得可重现的结果,例如在计算过去的特定一天里第一个小时股票的最高价格时,我们应该使用事件时间。这样的话,无论什么时间去计算都不会影响输出结果。然而如果使用处理时间的话,实时应用程序的结果是由程序运行的时间所决定。多次运行基于处理时间的实时程序,可能得到的结果都不相同,也可能会导致再次分析历史数据或者测试新代码变得异常困难。

Event Time

如果想要使用事件时间,需要额外给 Flink 提供一个时间戳提取器和 Watermark 生成器,Flink 将使用它们来跟踪事件时间的进度。

Watermark

让我们通过一个简单的示例来演示为什么需要 watermarks 及其工作方式。

在此示例中,我们将看到带有混乱时间戳的事件流,如下所示。显示的数字表达的是这些事件实际发生时间的时间戳。到达的第一个事件发生在时间 4,随后发生的事件发生在更早的时间 2,依此类推:

··· 23 19 22 24 21 14 17 13 12 15 9 11 7 2 4 →

假设我们要对数据流排序,我们想要达到的目的是:应用程序应该在数据流里的事件到达时就有一个算子(我们暂且称之为排序)开始处理事件,这个算子所输出的流是按照时间戳排序好的。

让我们重新审视这些数据:

(1) 我们的排序器看到的第一个事件的时间戳是 4,但是我们不能立即将其作为已排序的流释放。因为我们并不能确定它是有序的,并且较早的事件有可能并未到达。事实上,如果站在上帝视角,我们知道,必须要等到时间戳为 2 的元素到来时,排序器才可以有事件输出。

需要一些缓冲,需要一些时间,但这都是值得的

(2) 接下来的这一步,如果我们选择的是固执的等待,我们永远不会有结果。首先,我们看到了时间戳为 4 的事件,然后看到了时间戳为 2 的事件。可是,时间戳小于 2 的事件接下来会不会到来呢?可能会,也可能不会。再次站在上帝视角,我们知道,我们永远不会看到时间戳 1。

最终,我们必须勇于承担责任,并发出指令,把带有时间戳 2 的事件作为已排序的事件流的开始

(3) 然后,我们需要一种策略,该策略定义:对于任何给定时间戳的事件,Flink 何时停止等待较早事件的到来。

这正是 watermarks 的作用 — 它们定义何时停止等待较早的事件。

Flink 中事件时间的处理取决于 _watermark 生成器_,后者将带有时间戳的特殊元素插入流中形成 _watermarks_。事件时间 t 的 watermark 代表 t 之前(很可能)都已经到达。

当 watermark 以 2 或更大的时间戳到达时,事件流的排序器应停止等待,并输出 2 作为已经排序好的流。

(4) 我们可能会思考,如何决定 watermarks 的不同生成策略

每个事件都会延迟一段时间后到达,然而这些延迟有所不同,有些事件可能比其他事件延迟得更多。一种简单的方法是假定这些延迟受某个最大延迟的限制。Flink 将此策略称为 最大无序边界 (bounded-out-of-orderness) watermark。当然,我们可以想像出更好的生成 watermark 的方法,但是对于大多数应用而言,固定延迟策略已经足够了。

延迟 VS 正确性

watermarks 给了开发者流处理的一种选择,它们使开发人员在开发应用程序时可以控制延迟和完整性之间的权衡。与批处理不同,批处理中的奢侈之处在于可以在产生任何结果之前完全了解输入,而使用流式传输,我们不被允许等待所有的时间都产生了,才输出排序好的数据,这与流相违背。

我们可以把 watermarks 的边界时间配置的相对较短,从而冒着在输入了解不完全的情况下产生结果的风险-即可能会很快产生错误结果。或者,你可以等待更长的时间,并利用对输入流的更全面的了解来产生结果。

当然也可以实施混合解决方案,先快速产生初步结果,然后在处理其他(最新)数据时向这些结果提供更新。对于有一些对延迟的容忍程度很低,但是又对结果有很严格的要求的场景下,或许是一个福音。

延迟

延迟是相对于 watermarks 定义的。Watermark(t) 表示事件流的时间已经到达了 t; watermark 之后的时间戳 ≤ t 的任何事件都被称之为延迟事件。

使用 Watermarks

如果想要使用基于带有事件时间戳的事件流,Flink 需要知道与每个事件相关的时间戳,而且流必须包含 watermark。

动手练习中使用的出租车数据源已经为我们处理了这些详细信息。但是,在您自己的应用程序中,您将必须自己进行处理,这通常是通过实现一个类来实现的,该类从事件中提取时间戳,并根据需要生成 watermarks。最简单的方法是使用 WatermarkStrategy

1
2
3
4
5
6
7
8
DataStream<Event> stream = ...

WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.timestamp);

DataStream<Event> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(strategy);

窗口

我们在操作无界数据流时,经常需要应对以下问题,我们经常把无界数据流分解成有界数据流聚合分析:

  • 每分钟的浏览量
  • 每位用户每周的会话数
  • 每个传感器每分钟的最高温度

用 Flink 计算窗口分析取决于两个主要的抽象操作:_Window Assigners_,将事件分配给窗口(根据需要创建新的窗口对象),以及 _Window Functions_,处理窗口内的数据。

Flink 的窗口 API 还具有 TriggersEvictors 的概念,Triggers 确定何时调用窗口函数,而 Evictors 则可以删除在窗口中收集的元素。

举一个简单的例子,我们一般这样使用键控事件流(基于 key 分组的输入事件流):

1
2
3
4
stream.
.keyBy(<key selector>)
.window(<window assigner>)
.reduce|aggregate|process(<window function>)

您不是必须使用键控事件流(keyed stream),但是值得注意的是,如果不使用键控事件流,我们的程序就不能 并行 处理。

1
2
3
stream.
.windowAll(<window assigner>)
.reduce|aggregate|process(<window function>)

窗口分配器

Flink 有一些内置的窗口分配器,如下所示:

Window assigners

通过一些示例来展示关于这些窗口如何使用,或者如何区分它们:

  • 滚动时间窗口
    • 每分钟页面浏览量
    • TumblingEventTimeWindows.of(Time.minutes(1))
  • 滑动时间窗口
    • 每 10 秒钟计算前 1 分钟的页面浏览量
    • SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))
  • 会话窗口
    • 每个会话的网页浏览量,其中会话之间的间隔至少为 30 分钟
    • EventTimeSessionWindows.withGap(Time.minutes(30))

以下都是一些可以使用的间隔时间 Time.milliseconds(n), Time.seconds(n), Time.minutes(n), Time.hours(n), 和 Time.days(n)

基于时间的窗口分配器(包括会话时间)既可以处理 事件时间,也可以处理 处理时间。这两种基于时间的处理没有哪一个更好,我们必须折衷。使用 处理时间,我们必须接受以下限制:

  • 无法正确处理历史数据,
  • 无法正确处理超过最大无序边界的数据,
  • 结果将是不确定的,

但是有自己的优势,较低的延迟。

使用基于计数的窗口时,请记住,只有窗口内的事件数量到达窗口要求的数值时,这些窗口才会触发计算。尽管可以使用自定义触发器自己实现该行为,但无法应对超时和处理部分窗口。

我们可能在有些场景下,想使用全局 window assigner 将每个事件(相同的 key)都分配给某一个指定的全局窗口。 很多情况下,一个比较好的建议是使用 ProcessFunction,具体介绍在这里

窗口应用函数

我们有三种最基本的操作窗口内的事件的选项:

  1. 像批量处理,ProcessWindowFunction 会缓存 Iterable 和窗口内容,供接下来全量计算;
  2. 或者像流处理,每一次有事件被分配到窗口时,都会调用 ReduceFunction 或者 AggregateFunction 来增量计算;
  3. 或者结合两者,通过 ReduceFunction 或者 AggregateFunction 预聚合的增量计算结果在触发窗口时, 提供给 ProcessWindowFunction 做全量计算。

接下来展示一段 1 和 3 的示例,每一个实现都是计算传感器的最大值。在每一个一分钟大小的事件时间窗口内, 生成一个包含 (key,end-of-window-timestamp, max_value) 的一组结果。

ProcessWindowFunction 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
DataStream<SensorReading> input = ...

input
.keyBy(x -> x.key)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(new MyWastefulMax());

public static class MyWastefulMax extends ProcessWindowFunction<
SensorReading, // 输入类型
Tuple3<String, Long, Integer>, // 输出类型
String, // 键类型
TimeWindow> { // 窗口类型

@Override
public void process(
String key,
Context context,
Iterable<SensorReading> events,
Collector<Tuple3<String, Long, Integer>> out) {

int max = 0;
for (SensorReading event : events) {
max = Math.max(event.value, max);
}
out.collect(Tuple3.of(key, context.window().getEnd(), max));
}
}

在当前实现中有一些值得关注的地方:

  • Flink 会缓存所有分配给窗口的事件流,直到触发窗口为止。这个操作可能是相当昂贵的。
  • Flink 会传递给 ProcessWindowFunction 一个 Context 对象,这个对象内包含了一些窗口信息。Context 接口 展示大致如下:
1
2
3
4
5
6
7
8
9
public abstract class Context implements java.io.Serializable {
public abstract W window();

public abstract long currentProcessingTime();
public abstract long currentWatermark();

public abstract KeyedStateStore windowState();
public abstract KeyedStateStore globalState();
}

windowStateglobalState 可以用来存储当前的窗口的 key、窗口或者当前 key 的每一个窗口信息。这在一些场景下会很有用,试想,我们在处理当前窗口的时候,可能会用到上一个窗口的信息。

增量聚合示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
DataStream<SensorReading> input = ...

input
.keyBy(x -> x.key)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(new MyReducingMax(), new MyWindowFunction());

private static class MyReducingMax implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r1 : r2;
}
}

private static class MyWindowFunction extends ProcessWindowFunction<
SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {

@Override
public void process(
String key,
Context context,
Iterable<SensorReading> maxReading,
Collector<Tuple3<String, Long, SensorReading>> out) {

SensorReading max = maxReading.iterator().next();
out.collect(Tuple3.of(key, context.window().getEnd(), max));
}
}

请注意 Iterable<SensorReading> 将只包含一个读数 – MyReducingMax 计算出的预先汇总的最大值。

晚到的事件

默认场景下,超过最大无序边界的事件会被删除,但是 Flink 给了我们两个选择去控制这些事件。

您可以使用一种称为旁路输出 的机制来安排将要删除的事件收集到侧输出流中,这里是一个示例:

1
2
3
4
5
6
7
8
9
OutputTag<Event> lateTag = new OutputTag<Event>("late"){};

SingleOutputStreamOperator<Event> result = stream.
.keyBy(...)
.window(...)
.sideOutputLateData(lateTag)
.process(...);

DataStream<Event> lateStream = result.getSideOutput(lateTag);

我们还可以指定 允许的延迟(allowed lateness) 的间隔,在这个间隔时间内,延迟的事件将会继续分配给窗口(同时状态会被保留),默认状态下,每个延迟事件都会导致窗口函数被再次调用(有时也称之为 late firing )。

默认情况下,允许的延迟为 0。换句话说,watermark 之后的元素将被丢弃(或发送到侧输出流)。

举例说明:

1
2
3
4
5
stream.
.keyBy(...)
.window(...)
.allowedLateness(Time.seconds(10))
.process(...);

当允许的延迟大于零时,只有那些超过最大无序边界以至于会被丢弃的事件才会被发送到侧输出流(如果已配置)。

深入了解窗口操作

Flink 的窗口 API 某些方面有一些奇怪的行为,可能和我们预期的行为不一致。 根据 Flink 用户邮件列表 和其他地方一些频繁被问起的问题, 以下是一些有关 Windows 的底层事实,这些信息可能会让您感到惊讶。

滑动窗口是通过复制来实现的

滑动窗口分配器可以创建许多窗口对象,并将每个事件复制到每个相关的窗口中。例如,如果您每隔 15 分钟就有 24 小时的滑动窗口,则每个事件将被复制到 4 * 24 = 96 个窗口中。

时间窗口会和时间对齐

仅仅因为我们使用的是一个小时的处理时间窗口并在 12:05 开始运行您的应用程序,并不意味着第一个窗口将在 1:05 关闭。第一个窗口将长 55 分钟,并在 1:00 关闭。

请注意,滑动窗口和滚动窗口分配器所采用的 offset 参数可用于改变窗口的对齐方式。有关详细的信息,请参见 滚动窗口滑动窗口

window 后面可以接 window

比如说:

1
2
3
4
5
6
stream
.keyBy(t -> t.key)
.window(<window assigner>)
.reduce(<reduce function>)
.windowAll(<same window assigner>)
.reduce(<same reduce function>)

可能我们会猜测以 Flink 的能力,想要做到这样看起来是可行的(前提是你使用的是 ReduceFunction 或 AggregateFunction ),但不是。

之所以可行,是因为时间窗口产生的事件是根据窗口结束时的时间分配时间戳的。例如,一个小时小时的窗口所产生的所有事件都将带有标记一个小时结束的时间戳。后面的窗口内的数据消费和前面的流产生的数据是一致的。

空的时间窗口不会输出结果

事件会触发窗口的创建。换句话说,如果在特定的窗口内没有事件,就不会有窗口,就不会有输出结果。

延迟时间会导致延迟聚合

会话窗口的实现是基于窗口的一个抽象能力,窗口可以 _聚合_。会话窗口中的每个数据在初始被消费时,都会被分配一个新的窗口,但是如果窗口之间的间隔足够小,多个窗口就会被聚合。延迟事件可以弥合两个先前分开的会话间隔,从而产生一个虽然有延迟但是更加准确地结果。

参考资料

Java 基础语法特性

注释

空白行,或者注释的内容,都会被 Java 编译器忽略掉。

Java 支持多种注释方式,下面的示例展示了各种注释的使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class HelloWorld {
/*
* JavaDoc 注释
*/
public static void main(String[] args) {
// 单行注释
/* 多行注释:
1. 注意点a
2. 注意点b
*/
System.out.println("Hello World");
}
}

基本数据类型

img

👉 扩展阅读:深入理解 Java 基本数据类型

变量和常量

Java 支持的变量类型有:

  • 局部变量 - 类方法中的变量。
  • 成员变量(也叫实例变量) - 类方法外的变量,不过没有 static 修饰。
  • 静态变量(也叫类变量) - 类方法外的变量,用 static 修饰。

特性对比:

局部变量 实例变量(也叫成员变量) 类变量(也叫静态变量)
局部变量声明在方法、构造方法或者语句块中。 实例变量声明在方法、构造方法和语句块之外。 类变量声明在方法、构造方法和语句块之外。并且以 static 修饰。
局部变量在方法、构造方法、或者语句块被执行的时候创建,当它们执行完成后,变量将会被销毁。 实例变量在对象创建的时候创建,在对象被销毁的时候销毁。 类变量在第一次被访问时创建,在程序结束时销毁。
局部变量没有默认值,所以必须经过初始化,才可以使用。 实例变量具有默认值。数值型变量的默认值是 0,布尔型变量的默认值是 false,引用类型变量的默认值是 null。变量的值可以在声明时指定,也可以在构造方法中指定。 类变量具有默认值。数值型变量的默认值是 0,布尔型变量的默认值是 false,引用类型变量的默认值是 null。变量的值可以在声明时指定,也可以在构造方法中指定。此外,静态变量还可以在静态语句块中初始化。
对于局部变量,如果是基本类型,会把值直接存储在栈;如果是引用类型,会把其对象存储在堆,而把这个对象的引用(指针)存储在栈。 实例变量存储在堆。 类变量存储在静态存储区。
访问修饰符不能用于局部变量。 访问修饰符可以用于实例变量。 访问修饰符可以用于类变量。
局部变量只在声明它的方法、构造方法或者语句块中可见。 实例变量对于类中的方法、构造方法或者语句块是可见的。一般情况下应该把实例变量设为私有。通过使用访问修饰符可以使实例变量对子类可见。 与实例变量具有相似的可见性。但为了对类的使用者可见,大多数静态变量声明为 public 类型。
实例变量可以直接通过变量名访问。但在静态方法以及其他类中,就应该使用完全限定名:ObejectReference.VariableName。 静态变量可以通过:ClassName.VariableName 的方式访问。
无论一个类创建了多少个对象,类只拥有类变量的一份拷贝。
类变量除了被声明为常量外很少使用。

变量修饰符

  • 访问级别修饰符
    • 如果变量是实例变量或类变量,可以添加访问级别修饰符(public/protected/private)
  • 静态修饰符
    • 如果变量是类变量,需要添加 static 修饰
  • final
    • 如果变量使用 final 修饰符,就表示这是一个常量,不能被修改。

数组

img

👉 扩展阅读:深入理解 Java 数组

枚举

img

👉 扩展阅读:深入理解 Java 枚举

操作符

Java 中支持的操作符类型如下:

img

👉 扩展阅读:Java 操作符

方法

img

👉 扩展阅读:深入理解 Java 方法

控制语句

img

👉 扩展阅读:Java 控制语句

异常

img

img

👉 扩展阅读:深入理解 Java 异常

泛型

img

👉 扩展阅读:深入理解 Java 泛型

反射

img

img

👉 扩展阅读:深入理解 Java 反射和动态代理

注解

img

img

img

img

👉 扩展阅读:深入理解 Java 注解

序列化

img

👉 扩展阅读:Java 序列化

Elasticsearch 优化

Elasticsearch 是当前流行的企业级搜索引擎,设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。作为一个开箱即用的产品,在生产环境上线之后,我们其实不一定能确保其的性能和稳定性。如何根据实际情况提高服务的性能,其实有很多技巧。这章我们分享从实战经验中总结出来的 elasticsearch 性能优化,主要从硬件配置优化、索引优化设置、查询方面优化、数据结构优化、集群架构优化等方面讲解。

硬件配置优化

升级硬件设备配置一直都是提高服务能力最快速有效的手段,在系统层面能够影响应用性能的一般包括三个因素:CPU、内存和 IO,可以从这三方面进行 ES 的性能优化工作。

CPU 配置

一般说来,CPU 繁忙的原因有以下几个:

  1. 线程中有无限空循环、无阻塞、正则匹配或者单纯的计算;
  2. 发生了频繁的 GC;
  3. 多线程的上下文切换;

大多数 Elasticsearch 部署往往对 CPU 要求不高。因此,相对其它资源,具体配置多少个(CPU)不是那么关键。你应该选择具有多个内核的现代处理器,常见的集群使用 2 到 8 个核的机器。如果你要在更快的 CPUs 和更多的核数之间选择,选择更多的核数更好。多个内核提供的额外并发远胜过稍微快一点点的时钟频率。

内存配置

如果有一种资源是最先被耗尽的,它可能是内存。排序和聚合都很耗内存,所以有足够的堆空间来应付它们是很重要的。即使堆空间是比较小的时候,也能为操作系统文件缓存提供额外的内存。因为 Lucene 使用的许多数据结构是基于磁盘的格式,Elasticsearch 利用操作系统缓存能产生很大效果。

64 GB 内存的机器是非常理想的,但是 32 GB 和 16 GB 机器也是很常见的。少于 8 GB 会适得其反(你最终需要很多很多的小机器),大于 64 GB 的机器也会有问题。

由于 ES 构建基于 lucene,而 lucene 设计强大之处在于 lucene 能够很好的利用操作系统内存来缓存索引数据,以提供快速的查询性能。lucene 的索引文件 segements 是存储在单文件中的,并且不可变,对于 OS 来说,能够很友好地将索引文件保持在 cache 中,以便快速访问;因此,我们很有必要将一半的物理内存留给 lucene;另一半的物理内存留给 ES(JVM heap)。

内存分配

当机器内存小于 64G 时,遵循通用的原则,50% 给 ES,50% 留给 lucene。

当机器内存大于 64G 时,遵循以下原则:

  • 如果主要的使用场景是全文检索,那么建议给 ES Heap 分配 4~32G 的内存即可;其它内存留给操作系统,供 lucene 使用(segments cache),以提供更快的查询性能。
  • 如果主要的使用场景是聚合或排序,并且大多数是 numerics,dates,geo_points 以及 not_analyzed 的字符类型,建议分配给 ES Heap 分配 4~32G 的内存即可,其它内存留给操作系统,供 lucene 使用,提供快速的基于文档的聚类、排序性能。
  • 如果使用场景是聚合或排序,并且都是基于 analyzed 字符数据,这时需要更多的 heap size,建议机器上运行多 ES 实例,每个实例保持不超过 50% 的 ES heap 设置(但不超过 32 G,堆内存设置 32 G 以下时,JVM 使用对象指标压缩技巧节省空间),50% 以上留给 lucene。

禁止 swap

禁止 swap,一旦允许内存与磁盘的交换,会引起致命的性能问题。可以通过在 elasticsearch.yml 中 bootstrap.memory_lock: true,以保持 JVM 锁定内存,保证 ES 的性能。

GC 设置

保持 GC 的现有设置,默认设置为:Concurrent-Mark and Sweep(CMS),别换成 G1 GC,因为目前 G1 还有很多 BUG。

保持线程池的现有设置,目前 ES 的线程池较 1.X 有了较多优化设置,保持现状即可;默认线程池大小等于 CPU 核心数。如果一定要改,按公式 ( ( CPU 核心数 * 3 ) / 2 ) + 1 设置;不能超过 CPU 核心数的 2 倍;但是不建议修改默认配置,否则会对 CPU 造成硬伤。

磁盘

硬盘对所有的集群都很重要,对大量写入的集群更是加倍重要(例如那些存储日志数据的)。硬盘是服务器上最慢的子系统,这意味着那些写入量很大的集群很容易让硬盘饱和,使得它成为集群的瓶颈。

在经济压力能承受的范围下,尽量使用固态硬盘(SSD)。固态硬盘相比于任何旋转介质(机械硬盘,磁带等),无论随机写还是顺序写,都会对 IO 有较大的提升。

如果你正在使用 SSDs,确保你的系统 I/O 调度程序是配置正确的。当你向硬盘写数据,I/O 调度程序决定何时把数据实际发送到硬盘。大多数默认 *nix 发行版下的调度程序都叫做 cfq(完全公平队列)。

调度程序分配时间片到每个进程。并且优化这些到硬盘的众多队列的传递。但它是为旋转介质优化的:机械硬盘的固有特性意味着它写入数据到基于物理布局的硬盘会更高效。

这对 SSD 来说是低效的,尽管这里没有涉及到机械硬盘。但是,deadline 或者 noop 应该被使用。deadline 调度程序基于写入等待时间进行优化,noop 只是一个简单的 FIFO 队列。

这个简单的更改可以带来显著的影响。仅仅是使用正确的调度程序,我们看到了 500 倍的写入能力提升。

如果你使用旋转介质(如机械硬盘),尝试获取尽可能快的硬盘(高性能服务器硬盘,15k RPM 驱动器)

使用 RAID0 是提高硬盘速度的有效途径,对机械硬盘和 SSD 来说都是如此。没有必要使用镜像或其它 RAID 变体,因为 Elasticsearch 在自身层面通过副本,已经提供了备份的功能,所以不需要利用磁盘的备份功能,同时如果使用磁盘备份功能的话,对写入速度有较大的影响。

最后,避免使用网络附加存储(NAS)。人们常声称他们的 NAS 解决方案比本地驱动器更快更可靠。除却这些声称,我们从没看到 NAS 能配得上它的大肆宣传。NAS 常常很慢,显露出更大的延时和更宽的平均延时方差,而且它是单点故障的。

索引优化设置

索引优化主要是在 Elasticsearch 的插入层面优化,Elasticsearch 本身索引速度其实还是蛮快的,具体数据,我们可以参考官方的 benchmark 数据。我们可以根据不同的需求,针对索引优化。

批量提交

当有大量数据提交的时候,建议采用批量提交(Bulk 操作);此外使用 bulk 请求时,每个请求不超过几十 M,因为太大会导致内存使用过大。

比如在做 ELK 过程中,Logstash indexer 提交数据到 Elasticsearch 中,batch size 就可以作为一个优化功能点。但是优化 size 大小需要根据文档大小和服务器性能而定。

像 Logstash 中提交文档大小超过 20MB,Logstash 会将一个批量请求切分为多个批量请求。

如果在提交过程中,遇到 EsRejectedExecutionException 异常的话,则说明集群的索引性能已经达到极限了。这种情况,要么提高服务器集群的资源,要么根据业务规则,减少数据收集速度,比如只收集 Warn、Error 级别以上的日志。

增加 Refresh 时间间隔

为了提高索引性能,Elasticsearch 在写入数据的时候,采用延迟写入的策略,即数据先写到内存中,当超过默认 1 秒(index.refresh_interval)会进行一次写入操作,就是将内存中 segment 数据刷新到磁盘中,此时我们才能将数据搜索出来,所以这就是为什么 Elasticsearch 提供的是近实时搜索功能,而不是实时搜索功能。

如果我们的系统对数据延迟要求不高的话,我们可以通过延长 refresh 时间间隔,可以有效地减少 segment 合并压力,提高索引速度。比如在做全链路跟踪的过程中,我们就将 index.refresh_interval 设置为 30s,减少 refresh 次数。再如,在进行全量索引时,可以将 refresh 次数临时关闭,即 index.refresh_interval 设置为-1,数据导入成功后再打开到正常模式,比如 30s。

在加载大量数据时候可以暂时不用 refresh 和 repliccas,index.refresh_interval 设置为-1,index.number_of_replicas 设置为 0。

修改 index_buffer_size 的设置

索引缓冲的设置可以控制多少内存分配给索引进程。这是一个全局配置,会应用于一个节点上所有不同的分片上。

1
2
indices.memory.index_buffer_size: 10%
indices.memory.min_index_buffer_size: 48mb

indices.memory.index_buffer_size 接受一个百分比或者一个表示字节大小的值。默认是 10%,意味着分配给节点的总内存的 10%用来做索引缓冲的大小。这个数值被分到不同的分片(shards)上。如果设置的是百分比,还可以设置 min_index_buffer_size (默认 48mb)和 max_index_buffer_size(默认没有上限)。

修改 translog 相关的设置

一是控制数据从内存到硬盘的操作频率,以减少硬盘 IO。可将 sync_interval 的时间设置大一些。默认为 5s。

1
index.translog.sync_interval: 5s

也可以控制 tranlog 数据块的大小,达到 threshold 大小时,才会 flush 到 lucene 索引文件。默认为 512m。

1
index.translog.flush_threshold_size: 512mb

注意 _id 字段的使用

_id 字段的使用,应尽可能避免自定义 _id,以避免针对 ID 的版本管理;建议使用 ES 的默认 ID 生成策略或使用数字类型 ID 做为主键。

注意 _all 字段及 _source 字段的使用

**_**all 字段及 _source 字段的使用,应该注意场景和需要,_all 字段包含了所有的索引字段,方便做全文检索,如果无此需求,可以禁用;_source 存储了原始的 document 内容,如果没有获取原始文档数据的需求,可通过设置 includes、excludes 属性来定义放入 _source 的字段。

合理的配置使用 index 属性

合理的配置使用 index 属性,analyzed 和 not_analyzed,根据业务需求来控制字段是否分词或不分词。只有 groupby 需求的字段,配置时就设置成 not_analyzed,以提高查询或聚类的效率。

减少副本数量

Elasticsearch 默认副本数量为 3 个,虽然这样会提高集群的可用性,增加搜索的并发数,但是同时也会影响写入索引的效率。

在索引过程中,需要把更新的文档发到副本节点上,等副本节点生效后在进行返回结束。使用 Elasticsearch 做业务搜索的时候,建议副本数目还是设置为 3 个,但是像内部 ELK 日志系统、分布式跟踪系统中,完全可以将副本数目设置为 1 个。

查询方面优化

Elasticsearch 作为业务搜索的近实时查询时,查询效率的优化显得尤为重要。

路由优化

当我们查询文档的时候,Elasticsearch 如何知道一个文档应该存放到哪个分片中呢?它其实是通过下面这个公式来计算出来的。

1
shard = hash(routing) % number_of_primary_shards

routing 默认值是文档的 id,也可以采用自定义值,比如用户 ID。

不带 routing 查询

在查询的时候因为不知道要查询的数据具体在哪个分片上,所以整个过程分为 2 个步骤:

  1. 分发:请求到达协调节点后,协调节点将查询请求分发到每个分片上。
  2. 聚合:协调节点搜集到每个分片上查询结果,再将查询的结果进行排序,之后给用户返回结果。

带 routing 查询

查询的时候,可以直接根据 routing 信息定位到某个分配查询,不需要查询所有的分配,经过协调节点排序。

向上面自定义的用户查询,如果 routing 设置为 userid 的话,就可以直接查询出数据来,效率提升很多。

Filter VS Query

尽可能使用过滤器上下文(Filter)替代查询上下文(Query)

  • Query:此文档与此查询子句的匹配程度如何?
  • Filter:此文档和查询子句匹配吗?

Elasticsearch 针对 Filter 查询只需要回答“是”或者“否”,不需要像 Query 查询一样计算相关性分数,同时 Filter 结果可以缓存。

深度翻页

在使用 Elasticsearch 过程中,应尽量避免大翻页的出现。

正常翻页查询都是从 from 开始 size 条数据,这样就需要在每个分片中查询打分排名在前面的 from+size 条数据。协同节点收集每个分配的前 from+size 条数据。协同节点一共会受到 N*(from+size) 条数据,然后进行排序,再将其中 from 到 from+size 条数据返回出去。如果 from 或者 size 很大的话,导致参加排序的数量会同步扩大很多,最终会导致 CPU 资源消耗增大。

可以通过使用 Elasticsearch scroll 和 scroll-scan 高效滚动的方式来解决这样的问题。

也可以结合实际业务特点,文档 id 大小如果和文档创建时间是一致有序的,可以以文档 id 作为分页的偏移量,并将其作为分页查询的一个条件。

脚本(script)合理使用

我们知道脚本使用主要有 3 种形式,内联动态编译方式、_script 索引库中存储和文件脚本存储的形式;一般脚本的使用场景是粗排,尽量用第二种方式先将脚本存储在 _script 索引库中,起到提前编译,然后通过引用脚本 id,并结合 params 参数使用,即可以达到模型(逻辑)和数据进行了分离,同时又便于脚本模块的扩展与维护。具体 ES 脚本的深入内容请参考 Elasticsearch 脚本模块的详解

数据结构优化

基于 Elasticsearch 的使用场景,文档数据结构尽量和使用场景进行结合,去掉没用及不合理的数据。

尽量减少不需要的字段

如果 Elasticsearch 用于业务搜索服务,一些不需要用于搜索的字段最好不存到 ES 中,这样即节省空间,同时在相同的数据量下,也能提高搜索性能。

避免使用动态值作字段,动态递增的 mapping,会导致集群崩溃;同样,也需要控制字段的数量,业务中不使用的字段,就不要索引。控制索引的字段数量、mapping 深度、索引字段的类型,对于 ES 的性能优化是重中之重。

以下是 ES 关于字段数、mapping 深度的一些默认设置:

1
2
3
index.mapping.nested_objects.limit: 10000
index.mapping.total_fields.limit: 1000
index.mapping.depth.limit: 20

Nested Object vs Parent/Child

尽量避免使用 nested 或 parent/child 的字段,能不用就不用;nested query 慢,parent/child query 更慢,比 nested query 慢上百倍;因此能在 mapping 设计阶段搞定的(大宽表设计或采用比较 smart 的数据结构),就不要用父子关系的 mapping。

如果一定要使用 nested fields,保证 nested fields 字段不能过多,目前 ES 默认限制是 50。因为针对 1 个 document,每一个 nested field,都会生成一个独立的 document,这将使 doc 数量剧增,影响查询效率,尤其是 JOIN 的效率。

1
index.mapping.nested_fields.limit: 50
对比 Nested Object Parent/Child
优点 文档存储在一起,因此读取性高 父子文档可以独立更新,互不影响
缺点 更新父文档或子文档时需要更新整个文档 为了维护 join 关系,需要占用部分内存,读取性能较差
场景 子文档偶尔更新,查询频繁 子文档更新频繁

选择静态映射,非必需时,禁止动态映射

尽量避免使用动态映射,这样有可能会导致集群崩溃,此外,动态映射有可能会带来不可控制的数据类型,进而有可能导致在查询端出现相关异常,影响业务。

此外,Elasticsearch 作为搜索引擎时,主要承载 query 的匹配和排序的功能,那数据的存储类型基于这两种功能的用途分为两类,一是需要匹配的字段,用来建立倒排索引对 query 匹配用,另一类字段是用做粗排用到的特征字段,如 ctr、点击数、评论数等等。

集群架构设计

合理的部署 Elasticsearch 有助于提高服务的整体可用性。

主节点、数据节点和协调节点分离

Elasticsearch 集群在架构拓朴时,采用主节点、数据节点和负载均衡节点分离的架构,在 5.x 版本以后,又可将数据节点再细分为“Hot-Warm”的架构模式。

Elasticsearch 的配置文件中有 2 个参数,node.master 和 node.data。这两个参数搭配使用时,能够帮助提供服务器性能。

主(master)节点

配置 node.master:truenode.data:false,该 node 服务器只作为一个主节点,但不存储任何索引数据。我们推荐每个集群运行 3 个专用的 master 节点来提供最好的弹性。使用时,你还需要将 discovery.zen.minimum_master_nodes setting 参数设置为 2,以免出现脑裂(split-brain)的情况。用 3 个专用的 master 节点,专门负责处理集群的管理以及加强状态的整体稳定性。因为这 3 个 master 节点不包含数据也不会实际参与搜索以及索引操作,在 JVM 上它们不用做相同的事,例如繁重的索引或者耗时,资源耗费很大的搜索。因此不太可能会因为垃圾回收而导致停顿。因此,master 节点的 CPU,内存以及磁盘配置可以比 data 节点少很多的。

数据(data)节点

配置 node.master:falsenode.data:true,该 node 服务器只作为一个数据节点,只用于存储索引数据,使该 node 服务器功能单一,只用于数据存储和数据查询,降低其资源消耗率。

在 Elasticsearch 5.x 版本之后,data 节点又可再细分为“Hot-Warm”架构,即分为热节点(hot node)和暖节点(warm node)。

hot 节点:

hot 节点主要是索引节点(写节点),同时会保存近期的一些频繁被查询的索引。由于进行索引非常耗费 CPU 和 IO,即属于 IO 和 CPU 密集型操作,建议使用 SSD 的磁盘类型,保持良好的写性能;我们推荐部署最小化的 3 个 hot 节点来保证高可用性。根据近期需要收集以及查询的数据量,可以增加服务器数量来获得想要的性能。

将节点设置为 hot 类型需要 elasticsearch.yml 如下配置:

1
node.attr.box_type: hot

如果是针对指定的 index 操作,可以通过 settings 设置 index.routing.allocation.require.box_type: hot 将索引写入 hot 节点。

warm 节点:

这种类型的节点是为了处理大量的,而且不经常访问的只读索引而设计的。由于这些索引是只读的,warm 节点倾向于挂载大量磁盘(普通磁盘)来替代 SSD。内存、CPU 的配置跟 hot 节点保持一致即可;节点数量一般也是大于等于 3 个。

将节点设置为 warm 类型需要 elasticsearch.yml 如下配置:

1
node.attr.box_type: warm

同时,也可以在 elasticsearch.yml 中设置 index.codec:best_compression 保证 warm 节点的压缩配置。

当索引不再被频繁查询时,可通过 index.routing.allocation.require.box_type:warm,将索引标记为 warm,从而保证索引不写入 hot 节点,以便将 SSD 磁盘资源用在刀刃上。一旦设置这个属性,ES 会自动将索引合并到 warm 节点。

协调(coordinating)节点

协调节点用于做分布式里的协调,将各分片或节点返回的数据整合后返回。该节点不会被选作主节点,也不会存储任何索引数据。该服务器主要用于查询负载均衡。在查询的时候,通常会涉及到从多个 node 服务器上查询数据,并将请求分发到多个指定的 node 服务器,并对各个 node 服务器返回的结果进行一个汇总处理,最终返回给客户端。在 ES 集群中,所有的节点都有可能是协调节点,但是,可以通过设置 node.masternode.datanode.ingest 都为 false 来设置专门的协调节点。需要较好的 CPU 和较高的内存。

  • node.master:false 和 node.data:true,该 node 服务器只作为一个数据节点,只用于存储索引数据,使该 node 服务器功能单一,只用于数据存储和数据查询,降低其资源消耗率。
  • node.master:true 和 node.data:false,该 node 服务器只作为一个主节点,但不存储任何索引数据,该 node 服务器将使用自身空闲的资源,来协调各种创建索引请求或者查询请求,并将这些请求合理分发到相关的 node 服务器上。
  • node.master:false 和 node.data:false,该 node 服务器即不会被选作主节点,也不会存储任何索引数据。该服务器主要用于查询负载均衡。在查询的时候,通常会涉及到从多个 node 服务器上查询数据,并将请求分发到多个指定的 node 服务器,并对各个 node 服务器返回的结果进行一个汇总处理,最终返回给客户端。

关闭 data 节点服务器中的 http 功能

针对 Elasticsearch 集群中的所有数据节点,不用开启 http 服务。将其中的配置参数这样设置,http.enabled:false,同时也不要安装 head, bigdesk, marvel 等监控插件,这样保证 data 节点服务器只需处理创建/更新/删除/查询索引数据等操作。

http 功能可以在非数据节点服务器上开启,上述相关的监控插件也安装到这些服务器上,用于监控 Elasticsearch 集群状态等数据信息。这样做一来出于数据安全考虑,二来出于服务性能考虑。

一台服务器上最好只部署一个 node

一台物理服务器上可以启动多个 node 服务器节点(通过设置不同的启动 port),但一台服务器上的 CPU、内存、硬盘等资源毕竟有限,从服务器性能考虑,不建议一台服务器上启动多个 node 节点。

集群分片设置

ES 一旦创建好索引后,就无法调整分片的设置,而在 ES 中,一个分片实际上对应一个 lucene 索引,而 lucene 索引的读写会占用很多的系统资源,因此,分片数不能设置过大;所以,在创建索引时,合理配置分片数是非常重要的。一般来说,我们遵循一些原则:

  1. 控制每个分片占用的硬盘容量不超过 ES 的最大 JVM 的堆空间设置(一般设置不超过 32 G,参考上面的 JVM 内存设置原则),因此,如果索引的总容量在 500 G 左右,那分片大小在 16 个左右即可;当然,最好同时考虑原则 2。
  2. 考虑一下 node 数量,一般一个节点有时候就是一台物理机,如果分片数过多,大大超过了节点数,很可能会导致一个节点上存在多个分片,一旦该节点故障,即使保持了 1 个以上的副本,同样有可能会导致数据丢失,集群无法恢复。所以,一般都设置分片数不超过节点数的 3 倍

参考资料

Elasticsearch 聚合

聚合将数据汇总为指标、统计数据或其他分析。

Elasticsearch 将聚合分为三类:

类型 说明
Metric(指标聚合) 根据字段值进行统计计算
Bucket(桶聚合) 根据字段值、范围或其他条件进行分组
Pipeline(管道聚合) 根据其他聚合结果进行聚合

聚合的用法

所有的聚合,无论它们是什么类型,都遵从以下的规则。

  • 通过 JSON 来定义聚合计算,使用 aggregationsaggs 来标记聚合计算。需要给每个聚合起一个名字,指定它的类型以及和该类型相关的选项。
  • 它们运行在查询的结果之上。和查询不匹配的文档不会计算在内,除非你使用 global 聚集将不匹配的文档囊括其中。
  • 可以进一步过滤查询的结果,而不影响聚集。

以下是聚合的基本结构:

1
2
3
4
5
6
7
8
9
10
"aggregations" : { <!-- 最外层的聚合键,也可以缩写为 aggs -->
"<aggregation_name>" : { <!-- 聚合的自定义名字 -->
"<aggregation_type>" : { <!-- 聚合的类型,指标相关的,如 max、min、avg、sum,桶相关的 terms、filter 等 -->
<aggregation_body> <!-- 聚合体:对哪些字段进行聚合,可以取字段的值,也可以是脚本计算的结果 -->
}
[,"meta" : { [<meta_data_body>] } ]? <!-- 元 -->
[,"aggregations" : { [<sub_aggregation>]+ } ]? <!-- 在聚合里面在定义子聚合 -->
}
[,"<aggregation_name_2>" : { ... } ]* <!-- 聚合的自定义名字 2 -->
}
  • 在最上层有一个 aggregations 的键,可以缩写为 aggs
  • 在下面一层,需要为聚合指定一个名字。可以在请求的返回中看到这个名字。在同一个请求中使用多个聚合时,这一点非常有用,它让你可以很容易地理解每组结果的含义。
  • 最后,必须要指定聚合的类型。

关于聚合分析的值来源,可以取字段的值,也可以是脚本计算的结果

但是用脚本计算的结果时,需要注意脚本的性能和安全性;尽管多数聚集类型允许使用脚本,但是脚本使得聚集变得缓慢,因为脚本必须在每篇文档上运行。为了避免脚本的运行,可以在索引阶段进行计算。

此外,脚本也可以被人可能利用进行恶意代码攻击,尽量使用沙盒(sandbox)内的脚本语言。

::: details 【示例】根据 my-field 字段进行 terms 聚合计算

1
2
3
4
5
6
7
8
9
10
GET /my-index-000001/_search
{
"aggs": {
"my-agg-name": {
"terms": {
"field": "my-field"
}
}
}
}

响应结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
"took": 78,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 5,
"relation": "eq"
},
"max_score": 1.0,
"hits": [...]
},
"aggregations": {
"my-agg-name": { // my-agg-name 聚合计算的结果
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": []
}
}
}

:::

::: details 用于测试的数据

/employees 索引添加测试数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
DELETE /employees
PUT /employees/
{
"mappings" : {
"properties" : {
"age" : {
"type" : "integer"
},
"gender" : {
"type" : "keyword"
},
"job" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 50
}
}
},
"name" : {
"type" : "keyword"
},
"salary" : {
"type" : "integer"
}
}
}
}

PUT /employees/_bulk
{"index":{"_id":"1"}}
{"name":"Emma","age":32,"job":"Product Manager","gender":"female","salary":35000}
{"index":{"_id":"2"}}
{"name":"Underwood","age":41,"job":"Dev Manager","gender":"male","salary":50000}
{"index":{"_id":"3"}}
{"name":"Tran","age":25,"job":"Web Designer","gender":"male","salary":18000}
{"index":{"_id":"4"}}
{"name":"Rivera","age":26,"job":"Web Designer","gender":"female","salary":22000}
{"index":{"_id":"5"}}
{"name":"Rose","age":25,"job":"QA","gender":"female","salary":18000}
{"index":{"_id":"6"}}
{"name":"Lucy","age":31,"job":"QA","gender":"female","salary":25000}
{"index":{"_id":"7"}}
{"name":"Byrd","age":27,"job":"QA","gender":"male","salary":20000}
{"index":{"_id":"8"}}
{"name":"Foster","age":27,"job":"Java Programmer","gender":"male","salary":20000}
{"index":{"_id":"9"}}
{"name":"Gregory","age":32,"job":"Java Programmer","gender":"male","salary":22000}
{"index":{"_id":"10"}}
{"name":"Bryant","age":20,"job":"Java Programmer","gender":"male","salary":9000}
{"index":{"_id":"11"}}
{"name":"Jenny","age":36,"job":"Java Programmer","gender":"female","salary":38000}
{"index":{"_id":"12"}}
{"name":"Mcdonald","age":31,"job":"Java Programmer","gender":"male","salary":32000}
{"index":{"_id":"13"}}
{"name":"Jonthna","age":30,"job":"Java Programmer","gender":"female","salary":30000}
{"index":{"_id":"14"}}
{"name":"Marshall","age":32,"job":"Javascript Programmer","gender":"male","salary":25000}
{"index":{"_id":"15"}}
{"name":"King","age":33,"job":"Java Programmer","gender":"male","salary":28000}
{"index":{"_id":"16"}}
{"name":"Mccarthy","age":21,"job":"Javascript Programmer","gender":"male","salary":16000}
{"index":{"_id":"17"}}
{"name":"Goodwin","age":25,"job":"Javascript Programmer","gender":"male","salary":16000}
{"index":{"_id":"18"}}
{"name":"Catherine","age":29,"job":"Javascript Programmer","gender":"female","salary":20000}
{"index":{"_id":"19"}}
{"name":"Boone","age":30,"job":"DBA","gender":"male","salary":30000}
{"index":{"_id":"20"}}
{"name":"Kathy","age":29,"job":"DBA","gender":"female","salary":20000}

:::

限定数据范围

ES 聚合分析的默认作用范围是 query 的查询结果集。

同时 ES 还支持以下方式改变聚合的作用范围:

  • filter - 只对当前子聚合语句生效
  • post_filter - 对聚合分析后的文档进行再次过滤
  • global - 无视 query,对全部文档进行统计

::: details 【示例】使用 query 限定聚合数据的范围

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
POST /employees/_search
{
"size": 0,
"query": {
"range": {
"age": {
"gte": 20
}
}
},
"aggs": {
"jobs": {
"terms": {
"field":"job.keyword"

}
}
}
}

:::

::: details 【示例】使用 filter 限定聚合数据的范围

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
POST /employees/_search
{
"size": 0,
"aggs": {
"older_person": {
"filter":{
"range":{
"age":{
"from":35
}
}
},
"aggs":{
"jobs":{
"terms": {
"field":"job.keyword"
}
}},
"all_jobs": {
"terms": {
"field":"job.keyword"

}
}
}
}

:::

控制返回聚合结果

::: details 【示例】仅返回聚合结果

使用 field 限定聚合返回的展示字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
POST /employees/_search
{
"aggs": {
"jobs": {
"terms": {
"field": "job.keyword"
}
}
}
}

// 找出所有的 job 类型,还能找到聚合后符合条件的结果
POST /employees/_search
{
"aggs": {
"jobs": {
"terms": {
"field": "job.keyword"
}
}
},
"post_filter": {
"match": {
"job.keyword": "Dev Manager"
}
}
}

默认情况下,包含聚合的搜索会同时返回搜索命中和聚合结果。要仅返回聚合结果,请将 size 设置为 0

1
2
3
4
5
6
7
8
9
10
11
POST /employees/_search
{
"size": 0,
"aggs": {
"jobs": {
"terms": {
"field": "job.keyword"
}
}
}
}

:::

聚合结果排序

::: details 【示例】聚合结果排序

指定 order,按照 _count_key 进行排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
POST /employees/_search
{
"size": 0,
"query": {
"range": {
"age": {
"gte": 20
}
}
},
"aggs": {
"jobs": {
"terms": {
"field": "job.keyword",
"order": [
{
"_count": "asc"
},
{
"_key": "desc"
}
]
}
}
}
}

POST /employees/_search
{
"size": 0,
"aggs": {
"jobs": {
"terms": {
"field": "job.keyword",
"order": [
{
"avg_salary": "desc"
}
]
},
"aggs": {
"avg_salary": {
"avg": {
"field": "salary"
}
}
}
}
}
}

POST /employees/_search
{
"size": 0,
"aggs": {
"jobs": {
"terms": {
"field": "job.keyword",
"order": [
{
"stats_salary.min": "desc"
}
]
},
"aggs": {
"stats_salary": {
"stats": {
"field": "salary"
}
}
}
}
}
}

:::

运行多个聚合

::: details 【示例】运行多个聚合

可以在同一请求中指定多个聚合:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
POST /employees/_search
{
"size": 0,
"query": {
"range": {
"age": {
"gte": 40
}
}
},
"aggs": {
"jobs": {
"terms": {
"field":"job.keyword"

}
},

"all":{
"global":{},
"aggs":{
"salary_avg":{
"avg":{
"field":"salary"
}
}
}
}
}
}

:::

运行子聚合

::: details 【示例】运行子聚合

Bucket 聚合支持 Bucket 或 Metric 子聚合。例如,具有 avg 子聚合的 terms 聚合会计算每个桶中文档的平均值。嵌套子聚合没有级别或深度限制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
POST /employees/_search
{
"size": 0,
"aggs": {
"jobs": {
"terms": {
"field": "job.keyword",
"size": 10
},
"aggs": {
"avg_salary": {
"avg": {
"field": "salary"
}
}
}
},
"min_salary_by_job":{
"min_bucket": {
"buckets_path": "jobs>avg_salary"
}
}
}
}

响应将子聚合结果嵌套在其父聚合下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
{
// ...
"aggregations": {
"jobs": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 6,
"buckets": [
{
"key": "Java Programmer",
"doc_count": 7,
"avg_salary": {
"value": 25571.428571428572
}
},
{
"key": "Javascript Programmer",
"doc_count": 4,
"avg_salary": {
"value": 19250.0
}
},
{
"key": "QA",
"doc_count": 3,
"avg_salary": {
"value": 21000.0
}
}
]
},
"min_salary_by_job": {
"value": 19250.0,
"keys": ["Javascript Programmer"]
}
}
}

:::

Metric(指标聚合)

指标聚合主要从不同文档的分组中提取统计数据,或者,从来自其他聚合的文档桶来提取统计数据。

这些统计数据通常来自数值型字段,如最小或者平均价格。用户可以单独获取每项统计数据,或者也可以使用 stats 聚合来同时获取它们。更高级的统计数据,如平方和或者是标准差,可以通过 extended stats 聚合来获取。

ES 支持的指标聚合类型:

类型 说明
avg 平均值聚合
boxplot 箱线图聚合
cardinality 近似计算非重复值
extended_stats 扩展统计聚合
geo_bounds 地理边界聚合
geo_centroid 根据* geo *字段的所有坐标值计算加权质心
geo_line_geo_line 根据地理数据生成可用于线性几何图形展示的数据
cartesian_bounds 笛卡尔积边界聚合
cartesian_centroid 计算所有坐标值加权质心
matrix_stats 矩阵统计聚合
max 最大值聚合
median_absolute_deviation 中位数绝对偏差聚合
min 最小值聚合
percentile_ranks 百分位排名聚合
percentiles 百分位聚合
rate 频率聚合
scripted_metric 脚本化指标聚合
stats 统计聚合
string_stats 字符串统计聚合
sum 求和聚合
t_test 校验聚合
top_hits 热门点击统计
top_metrics 热门指标聚合
value_count 值统计聚合
weighted_avg 加权平均值聚合

::: details 【示例】指标聚合示例

Metric 聚合测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// Metric 聚合,找到最低的工资
POST /employees/_search
{
"size": 0,
"aggs": {
"min_salary": {
"min": {
"field":"salary"
}
}
}
}

// Metric 聚合,找到最高的工资
POST /employees/_search
{
"size": 0,
"aggs": {
"max_salary": {
"max": {
"field":"salary"
}
}
}
}

// 多个 Metric 聚合,找到最低最高和平均工资
POST /employees/_search
{
"size": 0,
"aggs": {
"max_salary": {
"max": {
"field": "salary"
}
},
"min_salary": {
"min": {
"field": "salary"
}
},
"avg_salary": {
"avg": {
"field": "salary"
}
}
}
}

// 一个聚合,输出多值
POST /employees/_search
{
"size": 0,
"aggs": {
"stats_salary": {
"stats": {
"field": "salary"
}
}
}
}

:::

Bucket(桶聚合)

桶聚合不会像指标聚合那样计算字段的指标,而是创建文档桶。每个桶都与一个标准(取决于聚合类型)相关联,该标准确定当前上下文中的文档是否“落入”其中。换句话说,桶有效地定义了文档集。除了桶本身之外,聚合还计算并返回“落入”每个桶的文档数。

指标聚合相反,桶聚合可以保存子聚合。这些子聚合将针对其 “父” 桶聚合创建的桶进行聚合。

Elasticsearch 中支持的桶聚合类型:

类型 说明
adjacency_matrix 邻接矩阵聚合
auto_interval_date_histogram 自动间隔日期直方图聚合
categorize_text 对文本进行分类聚合
children 子文档聚合
composite 组合聚合
date_histogram 日期直方图聚合
date_range 日期范围聚合
diversified_sampler 多种采样器聚合
filter 过滤器聚合
filters 多过滤器聚合
geo_distance 地理距离聚合
geohash_grid geohash 网格
geohex_grid geohex 网格
geotile_grid geotile 网格
global 全局聚合
histogram 直方图聚合
ip_prefix IP 前缀聚合
ip_range IP 范围聚合
missing 空值聚合
multi_terms 多词项分组聚合
nested 嵌套聚合
parent 父文档聚合
random_sampler 随机采样器聚合
range 范围聚合
rare_terms 稀有多词项聚合
reverse_nested 反向嵌套聚合
sampler 采样器聚合
significant_terms 重要词项聚合
significant_text 重要文本聚合
terms 词项分组聚合
time_series 时间序列聚合
variable_width_histogram 可变宽度直方图聚合

::: details 【示例】terms 聚合查询

默认,ES 不允许对 Text 字段进行 terms 聚合查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 对 keword 进行聚合
POST /employees/_search
{
"size": 0,
"aggs": {
"jobs": {
"terms": {
"field":"job.keyword"
}
}
}
}

// 对 Text 字段进行 terms 聚合查询,失败
POST /employees/_search
{
"size": 0,
"aggs": {
"jobs": {
"terms": {
"field": "job"
}
}
}
}

// 对 Text 字段打开 fielddata,支持 terms aggregation
PUT employees/_mapping
{
"properties" : {
"job":{
"type": "text",
"fielddata": true
}
}
}

// 对 Text 字段进行 terms 分词。分词后的 terms
POST /employees/_search
{
"size": 0,
"aggs": {
"jobs": {
"terms": {
"field":"job"
}
}
}
}

POST /employees/_search
{
"size": 0,
"aggs": {
"jobs": {
"terms": {
"field":"job.keyword"
}
}
}
}

:::

::: details 【示例】更多 Bucket 聚合示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// 对 job 进行近似去重统计
POST /employees/_search
{
"size": 0,
"aggs": {
"cardinate": {
"cardinality": {
"field": "job"
}
}
}
}

// 对 gender 进行聚合
POST /employees/_search
{
"size": 0,
"aggs": {
"gender": {
"terms": {
"field":"gender"
}
}
}
}

// 指定 bucket 的 size
POST /employees/_search
{
"size": 0,
"aggs": {
"ages_5": {
"terms": {
"field":"age",
"size":3
}
}
}
}

// 指定 size,不同工种中,年纪最大的 3 个员工的具体信息
POST /employees/_search
{
"size": 0,
"aggs": {
"jobs": {
"terms": {
"field":"job.keyword"
},
"aggs":{
"old_employee":{
"top_hits":{
"size":3,
"sort":[
{
"age":{
"order":"desc"
}
}
]
}
}
}
}
}
}

// Salary Ranges 分桶,可以自己定义 key
POST /employees/_search
{
"size": 0,
"aggs": {
"salary_range": {
"range": {
"field":"salary",
"ranges":[
{
"to":10000
},
{
"from":10000,
"to":20000
},
{
"key":">20000",
"from":20000
}
]
}
}
}
}

// Salary Histogram, 工资 0 到 10 万,以 5000 一个区间进行分桶
POST /employees/_search
{
"size": 0,
"aggs": {
"salary_histrogram": {
"histogram": {
"field":"salary",
"interval":5000,
"extended_bounds":{
"min":0,
"max":100000

}
}
}
}
}

// 嵌套聚合 1,按照工作类型分桶,并统计工资信息
POST /employees/_search
{
"size": 0,
"aggs": {
"Job_salary_stats": {
"terms": {
"field": "job.keyword"
},
"aggs": {
"salary": {
"stats": {
"field": "salary"
}
}
}
}
}
}

// 多次嵌套。根据工作类型分桶,然后按照性别分桶,计算工资的统计信息
POST /employees/_search
{
"size": 0,
"aggs": {
"Job_gender_stats": {
"terms": {
"field": "job.keyword"
},
"aggs": {
"gender_stats": {
"terms": {
"field": "gender"
},
"aggs": {
"salary_stats": {
"stats": {
"field": "salary"
}
}
}
}
}
}
}
}

:::

Pipeline(管道聚合)

管道聚合处理从其他聚合而不是文档集生成的输出,从而将信息添加到输出树中。

Pipeline 聚合的分析结果会输出到原结果中,根据位置的不同,分为两类:

管道聚合可以通过使用 buckets_path 参数来指示所需指标的路径,从而引用执行计算所需的聚合。管道聚合不能具有子聚合,但根据类型,它可以引用buckets_path中的另一个管道,从而允许链接管道聚合。

以下为 Elasticsearch 支持的管道聚合类型:

类型 说明
avg_bucket 平均桶
bucket_script 桶脚本
bucket_count_ks_test 桶数 k-s 测试
bucket_correlation 桶关联
bucket_selector 桶选择器
bucket_sort 桶排序
change_point 更改点
cumulative_cardinality 累积基数
cumulative_sum 累计总和
derivative 导数
extended_stats_bucket 扩展的统计信息桶
inference_bucket 推理桶
max_bucket 最大桶
min_bucket 最小桶
moving_function 移动功能
moving_percentiles 移动百分位数
normalize 正常化
percentiles_bucket 百分位数桶
serial_differencing 序列差分
stats_bucket 统计桶
sum_bucket 总和桶

::: details 【示例】Pipeline 聚合示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
// 平均工资最低的工作类型
POST /employees/_search
{
"size": 0,
"aggs": {
"jobs": {
"terms": {
"field": "job.keyword",
"size": 10
},
"aggs": {
"avg_salary": {
"avg": {
"field": "salary"
}
}
}
},
"min_salary_by_job":{
"min_bucket": {
"buckets_path": "jobs>avg_salary"
}
}
}
}

// 平均工资最高的工作类型
POST /employees/_search
{
"size": 0,
"aggs": {
"jobs": {
"terms": {
"field": "job.keyword",
"size": 10
},
"aggs": {
"avg_salary": {
"avg": {
"field": "salary"
}
}
}
},
"max_salary_by_job":{
"max_bucket": {
"buckets_path": "jobs>avg_salary"
}
}
}
}

// 平均工资的平均工资
POST /employees/_search
{
"size": 0,
"aggs": {
"jobs": {
"terms": {
"field": "job.keyword",
"size": 10
},
"aggs": {
"avg_salary": {
"avg": {
"field": "salary"
}
}
}
},
"avg_salary_by_job":{
"avg_bucket": {
"buckets_path": "jobs>avg_salary"
}
}
}
}

// 平均工资的统计分析
POST /employees/_search
{
"size": 0,
"aggs": {
"jobs": {
"terms": {
"field": "job.keyword",
"size": 10
},
"aggs": {
"avg_salary": {
"avg": {
"field": "salary"
}
}
}
},
"stats_salary_by_job":{
"stats_bucket": {
"buckets_path": "jobs>avg_salary"
}
}
}
}

// 平均工资的百分位数
POST /employees/_search
{
"size": 0,
"aggs": {
"jobs": {
"terms": {
"field": "job.keyword",
"size": 10
},
"aggs": {
"avg_salary": {
"avg": {
"field": "salary"
}
}
}
},
"percentiles_salary_by_job":{
"percentiles_bucket": {
"buckets_path": "jobs>avg_salary"
}
}
}
}

// 按照年龄对平均工资求导
POST /employees/_search
{
"size": 0,
"aggs": {
"age": {
"histogram": {
"field": "age",
"min_doc_count": 1,
"interval": 1
},
"aggs": {
"avg_salary": {
"avg": {
"field": "salary"
}
},
"derivative_avg_salary":{
"derivative": {
"buckets_path": "avg_salary"
}
}
}
}
}
}

// Cumulative_sum
POST /employees/_search
{
"size": 0,
"aggs": {
"age": {
"histogram": {
"field": "age",
"min_doc_count": 1,
"interval": 1
},
"aggs": {
"avg_salary": {
"avg": {
"field": "salary"
}
},
"cumulative_salary":{
"cumulative_sum": {
"buckets_path": "avg_salary"
}
}
}
}
}
}

// Moving Function
POST /employees/_search
{
"size": 0,
"aggs": {
"age": {
"histogram": {
"field": "age",
"min_doc_count": 1,
"interval": 1
},
"aggs": {
"avg_salary": {
"avg": {
"field": "salary"
}
},
"moving_avg_salary":{
"moving_fn": {
"buckets_path": "avg_salary",
"window":10,
"script": "MovingFunctions.min(values)"
}
}
}
}
}
}

:::

聚合的执行流程

ES 在进行聚合分析时,协调节点会在每个分片的主分片、副分片中选一个,然后在不同分片上分别进行聚合计算,然后将每个分片的聚合结果进行汇总,返回最终结果。

由于,并非基于全量数据进行计算,所以聚合结果并非完全准确。

要解决聚合准确性问题,有两个解决方案:

  • 解决方案 1:当数据量不大时,设置 Primary Shard 为 1,这意味着在数据全集上进行聚合。
  • 解决方案 2:设置 shard_size 参数,将计算数据范围变大,进而使得 ES 的整体性能变低,精准度变高。shard_size 值的默认值是 size * 1.5 + 10

参考资料

Elasticsearch 搜索(下)

Elasticsearch 提供了基于 JSON 的 DSL(Domain Specific Language)来定义查询。

可以将 DSL 视为查询的 AST(抽象语法树),由两种类型的子句组成:

  • 叶子查询 - 在指定字段中查找特定值,例如:matchtermrange
  • 组合查询 - 组合其他叶子查询或组合查询,用于以逻辑方式组合多个查询(例如: booldis_max),或更改它们的行为(例如:constant_score)。

查询子句的行为会有所不同,具体取决于它们是在 query content 还是 filter context 中使用。

  • query context - 有相关性计算,采用相关性算法,计算文档与查询关键词之间的相关度,并根据评分(_score)大小排序。
  • filter context - 无相关性计算,可以利用缓存,性能更好。

从用法角度,Elasticsearch 查询分类大致分为:

全文查询

Full Text Search(全文搜索) 支持在非结构化文本数据中搜索与查询关键字最匹配的数据。

在 ES 中,支持以下全文搜索方式:

  • intervals - 根据匹配词的顺序和近似度返回文档。
  • match - 匹配查询,用于执行全文搜索的标准查询,包括模糊匹配和短语或邻近查询。
  • match_bool_prefix - 对检索文本分词,并根据这些分词构造一个布尔查询。除了最后一个分词之外的每个分词都进行 term 查询。最后一个分词用于 prefix 查询;其他分词都进行 term 查询。
  • match_phrase - 短语匹配查询。
  • match_phrase_prefix - 与 match_phrase 查询类似,但对最后一个单词执行通配符搜索。
  • multi_match 支持多字段 match 查询
  • combined_fields - 匹配多个字段,就像它们已索引到一个组合字段中一样。
  • query_string - 支持紧凑的 Lucene query string(查询字符串)语法,允许指定 AND|OR|NOT 条件和单个查询字符串中的多字段搜索。仅适用于专家用户。
  • simple_query_string - 更简单、更健壮的 query_string 语法版本,适合直接向用户公开。

match(匹配查询)

match 查询用于搜索单个字段。首先,会针对检索文本进行解析(分词),分词后的任何一个词项只要被匹配,文档就会被搜到。默认情况下,相当于对分词后词项进行 or 匹配操作。

:::details match 示例

1
2
3
4
5
6
7
8
9
10
GET kibana_sample_data_ecommerce/_search
{
"query": {
"match": {
"customer_full_name": {
"query": "George Hubbard"
}
}
}
}

响应结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
"took": 891, // 查询使用的毫秒数
"timed_out": false, // 是否有分片超时,也就是说是否只返回了部分结果
"_shards": {
// 总分片数、响应成功/失败数量信息
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
// 搜索结果
"total": {
// 匹配的总记录数
"value": 82,
"relation": "eq"
},
"max_score": 10.018585, // 所有匹配文档中的最大相关性分值
"hits": [
// 匹配文档列表
{
"_index": "kibana_sample_data_ecommerce", // 文档所属索引
"_type": "_doc", // 文档所属 type
"_id": "2ZUtBX4BU8KXl1YJRBrH", // 文档的唯一性标识
"_score": 10.018585, // 文档的相关性分值
"_source": {
// 文档的原始 JSON 对象
// 略
}
}
// 省略多条记录
]
}
}

:::

可以通过组合 <field>query 参数来简化匹配查询语法。下面是一个简单的示例。

:::details match 简写示例

下面的查询等价于前面的匹配查询示例:

1
2
3
4
5
6
7
8
GET kibana_sample_data_ecommerce/_search
{
"query": {
"match": {
"customer_full_name": "George Hubbard"
}
}
}

:::

在进行全文本字段检索的时候, match API 提供了 operator 和 minimum_should_match 参数:

  • operator 参数值可以为 “or” 或者 “and” 来控制检索词项间的关系,默认值为 “or”。所以上面例子中,只要书名中含有 “linux” 或者 “architecture” 的文档都可以匹配上。
  • minimum_should_match 可以指定词项的最少匹配个数,其值可以指定为某个具体的数字,但因为我们无法预估检索内容的词项数量,一般将其设置为一个百分比。

:::details minimum_should_match 示例

至少有 50% 的词项匹配的文档才会被返回:

1
2
3
4
5
6
7
8
9
10
11
12
GET kibana_sample_data_ecommerce/_search
{
"query": {
"match": {
"category": {
"query": "Women Clothing Accessories",
"operator": "or",
"minimum_should_match": "50%"
}
}
}
}

:::

match 查询提供了 fuzziness 参数,fuzziness 允许基于被查询字段的类型进行模糊匹配。请参阅 Fuzziness 的配置。

在这种情况下可以设置 prefix_lengthmax_expansions 来控制模糊匹配。如果设置了模糊选项,查询将使用 top_terms_blended_freqs_${max_expansions} 作为其重写方法,fuzzy_rewrite 参数允许控制查询将如何被重写。

默认情况下允许模糊倒转 (abba),但可以通过将 fuzzy_transpositions 设置为 false 来禁用。

:::details fuzziness 示例

1
2
3
4
5
6
7
8
9
10
11
GET kibana_sample_data_ecommerce/_search
{
"query": {
"match": {
"customer_first_name": {
"query": "Gearge",
"fuzziness": "AUTO"
}
}
}
}

:::

如果使用的分析器像 stop 过滤器一样删除查询中的所有标记,则默认行为是不匹配任何文档。可以使用 zero_terms_query 选项来改变默认行为,它接受 none(默认)和 all (相当于 match_all 查询)。

:::details zero_terms_query 示例

1
2
3
4
5
6
7
8
9
10
11
12
GET kibana_sample_data_logs/_search
{
"query": {
"match": {
"message": {
"query": "Mozilla Linux",
"operator": "and",
"zero_terms_query": "all"
}
}
}
}

:::

match_phrase(短语匹配查询)

match_phrase 查询首先会对检索内容进行分词,分词器可以自定义,同时文档还要满足以下两个条件才会被搜索到:

  1. 分词后所有词项都要出现在该字段中(相当于 and 操作)
  2. 字段中的词项顺序要一致

:::details match_phrase 示例

1
2
3
4
5
6
7
8
9
10
GET kibana_sample_data_logs/_search
{
"query": {
"match_phrase": {
"agent": {
"query": "Linux x86_64"
}
}
}
}

:::

match_phrase_prefix(短语前缀匹配查询)

查询和 match_phrase 查询类似,只不过 match_phrase_prefix 最后一个 term 会被作为前缀匹配。

:::details match_phrase_prefix 示例

匹配以 https://www.elastic.co/download 开头的短语

1
2
3
4
5
6
7
8
9
10
GET kibana_sample_data_logs/_search
{
"query": {
"match_phrase_prefix": {
"url": {
"query": "https://www.elastic.co/download"
}
}
}
}

:::

multi_match(多字段匹配查询)

multi_match 查询允许对多个字段执行相同的匹配查询。

multi_match 查询在内部执行的方式取决于 type 参数,可以设置为:

  • best_fields -(默认)将所有与查询匹配的文档作为结果返回,但是只使用评分最高的字段的评分来作为评分结果返回。
  • most_fields - 将所有与查询匹配的文档作为结果返回,并将所有匹配字段的评分累加起来作为评分结果。
  • cross_fields - 将具有相同分析器的字段视为一个大字段。在每个字段中查找每个单词。例如当需要查询英文人名的时候,可以将 first_name 和 last_name 两个字段组合起来当作 full_name 来查询。
  • phrase - 对每个字段运行 match_phrase 查询,并将最佳匹配字段的评分作为结果返回。
  • phrase_prefix - 对每个字段运行 match_phrase_prefix 查询,并将最佳匹配字段的评分作为结果返回。
  • bool_prefix - 在每个字段上创建一个 match_bool_prefix 查询,并且合并每个字段的评分作为评分结果。

:::details multi_match 示例

1
2
3
4
5
6
7
8
9
10
11
12
GET kibana_sample_data_ecommerce/_search
{
"query": {
"multi_match": {
"query": 34.98,
"fields": [
"taxful_total_price",
"taxless_total_*" # 可以使用通配符
]
}
}
}

:::

词项级别查询

Term(词项)是表达语意的最小单位。搜索和利用统计语言模型进行自然语言处理都需要处理 Term。

全文查询在执行查询之前会分析查询字符串。

与全文查询不同,词项级别查询不会分词,而是将输入作为一个整体,在倒排索引中查找准确的词项。并且使用相关度计算公式为每个包含该词项的文档进行相关度计算。一言以概之:词项查询是对词项进行精确匹配。词项查询通常用于结构化数据,如数字、日期和枚举类型。

词项查询有以下类型:

  • exists - 返回在指定字段上有值的文档。
  • fuzzy - 模糊查询,返回包含与搜索词相似的词的文档。
  • ids - 根据 ID 返回文档。此查询使用存储在 _id 字段中的文档 ID。
  • prefix - 前缀查询,用于查询某个字段中包含指定前缀的文档。
  • range - 范围查询,用于匹配在某一范围内的数值型、日期类型或者字符串型字段的文档。
  • regexp - 正则匹配查询,返回与正则表达式相匹配的词项所属的文档。
  • term - 用来查找指定字段中包含给定单词的文档。
  • terms - 与 term 相似,但可以搜索多个值。
  • terms set - 与 term 相似,但可以定义返回文档所需的匹配词数。
  • wildcard - 通配符查询,返回与通配符模式匹配的文档。

exists(字段不为空查询)

exists 返回在指定字段上有值的文档。

由于多种原因,文档字段可能不存在索引值:

  • JSON 中的字段为 null[]
  • 该字段在 mapping 中配置了 "index" : false
  • 字段值的长度超过了 mapping 中的 ignore_above 设置
  • 字段值格式错误,并且在 mapping 中定义了 ignore_malformed

:::details exists 示例

1
2
3
4
5
6
7
8
GET kibana_sample_data_ecommerce/_search
{
"query": {
"exists": {
"field": "email"
}
}
}

:::

fuzzy(模糊查询)

fuzzy 返回包含与搜索词相似的词的文档。ES 使用 Levenshtein edit distance(Levenshtein 编辑距离) 测量相似度或模糊度。

编辑距离是将一个术语转换为另一个术语所需的单个字符更改的数量。这些变化可能包括:

  • 改变一个字符:(box -> fox)
  • 删除一个字符:(black -> lack)
  • 插入一个字符:(sic -> sick
  • 反转两个相邻字符:(act → cat)

为了找到相似的词条,fuzzy query 会在指定的编辑距离内创建搜索词条的所有可能变体或扩展集。然后返回完全匹配任意扩展的文档。

注意:如果配置了 search.allow_expensive_queries ,则 fuzzy query 不能执行。

:::details fuzzy 示例

1
2
3
4
5
6
7
8
9
10
GET kibana_sample_data_ecommerce/_search
{
"query": {
"fuzzy": {
"customer_full_name": {
"value": "mary"
}
}
}
}

:::

prefix(前缀查询)

prefix 用于查询某个字段中包含指定前缀的文档。

比如查询 user.id 中含有以 ki 为前缀的关键词的文档,那么含有 kindkid 等所有以 ki 开头关键词的文档都会被匹配。

:::details prefix 示例

1
2
3
4
5
6
7
8
9
10
GET kibana_sample_data_ecommerce/_search
{
"query": {
"prefix": {
"customer_full_name": {
"value": "mar"
}
}
}
}

:::

range(范围查询)

range 用于匹配在某一范围内的数值型、日期类型或者字符串型字段的文档。比如搜索哪些书籍的价格在 50 到 100 之间、哪些书籍的出版时间在 2015 年到 2019 年之间。使用 range 查询只能查询一个字段,不能作用在多个字段上

range 查询支持的参数有以下几种 -

  • gt - 大于
  • gte - 大于等于
  • lt - 小于
  • lte - 小于等于
  • format - 如果字段是 Date 类型,可以设置日期格式化
  • time_zone - 时区
  • relation - 指示范围查询如何匹配范围字段的值。
    • INTERSECTS (Default) - 匹配与查询字段值范围相交的文档。
    • CONTAINS - 匹配完全包含查询字段值的文档。
    • WITHIN - 匹配具有完全在查询范围内的范围字段值的文档。

:::details range 示例

数值范围查询示例:

1
2
3
4
5
6
7
8
9
10
11
GET kibana_sample_data_ecommerce/_search
{
"query": {
"range": {
"taxful_total_price": {
"gt": 10,
"lte": 50
}
}
}
}

日期范围查询示例:

1
2
3
4
5
6
7
8
9
10
11
12
GET kibana_sample_data_ecommerce/_search
{
"query": {
"range": {
"order_date": {
"time_zone": "+00:00",
"gte": "2018-01-01T00:00:00",
"lte": "now"
}
}
}
}

:::

regexp(正则匹配查询)

regexp 返回与正则表达式相匹配的词项所属的文档。正则表达式 是一种使用占位符字符匹配数据模式的方法,称为运算符。

注意:如果配置了 search.allow_expensive_queries ,则 regexp query 会被禁用。

:::details regexp 示例

1
2
3
4
5
6
7
8
GET kibana_sample_data_ecommerce/_search
{
"query": {
"regexp": {
"email": ".*@.*-family.zzz"
}
}
}

:::

term(词项匹配查询)

term 用来查找指定字段中包含给定单词的文档,term 查询不被解析,只有查询词和文档中的词精确匹配才会被搜索到,应用场景为查询人名、地名等需要精准匹配的需求。

注意:应避免 term 查询对 text 字段使用查询。默认情况下,Elasticsearch 针对 text 字段的值进行解析分词,这会使查找 text 字段值的精确匹配变得困难。要搜索 text 字段值,需改用 match 查询。

:::details term 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 1. 创建一个索引
DELETE my-index-000001
PUT my-index-000001
{
"mappings": {
"properties": {
"full_text": { "type": "text" }
}
}
}

# 2. 使用 "Quick Brown Foxes!" 关键字查 "full_text" 字段
PUT my-index-000001/_doc/1
{
"full_text": "Quick Brown Foxes!"
}

# 3. 使用 term 查询
GET my-index-000001/_search?pretty
{
"query": {
"term": {
"full_text": "Quick Brown Foxes!"
}
}
}
# 因为 full_text 字段不再包含确切的 Term —— "Quick Brown Foxes!",所以 term query 搜索不到任何结果

# 4. 使用 match 查询
GET my-index-000001/_search?pretty
{
"query": {
"match": {
"full_text": "Quick Brown Foxes!"
}
}
}

DELETE my-index-000001

:warning: 注意:应避免 term 查询对 text 字段使用查询。

默认情况下,Elasticsearch 针对 text 字段的值进行解析分词,这会使查找 text 字段值的精确匹配变得困难。

要搜索 text 字段值,需改用 match 查询。

:::

terms(多词项匹配查询)

termsterm 相同,但可以搜索多个值。

terms query 查询参数:

  • **index**:索引名
  • **id**:文档 ID
  • **path**:要从中获取字段值的字段的名称,即搜索关键字
  • **routing**(选填):要从中获取 term 值的文档的自定义路由值。如果在索引文档时提供了自定义路由值,则此参数是必需的。

:::details terms 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 1. 创建一个索引
DELETE my-index-000001
PUT my-index-000001
{
"mappings": {
"properties": {
"color": { "type": "keyword" }
}
}
}

# 2. 写入一个文档
PUT my-index-000001/_doc/1
{
"color": [
"blue",
"green"
]
}

# 3. 写入另一个文档
PUT my-index-000001/_doc/2
{
"color": "blue"
}

# 3. 使用 terms query
GET my-index-000001/_search?pretty
{
"query": {
"terms": {
"color": {
"index": "my-index-000001",
"id": "2",
"path": "color"
}
}
}
}

DELETE my-index-000001

:::

wildcard(通配符查询)

wildcard 即通配符查询,返回与通配符模式匹配的文档。

? 用来匹配一个任意字符,* 用来匹配零个或者多个字符。

注意:如果配置了 search.allow_expensive_queries ,则 wildcard query 会被禁用。

:::details wildcard 示例

示例:以下搜索返回 user.id 字段包含以 ki 开头并以 y 结尾的术语的文档。这些匹配项可以包括 kiykitykimchy

1
2
3
4
5
6
7
8
9
10
11
12
GET kibana_sample_data_ecommerce/_search
{
"query": {
"wildcard": {
"email": {
"value": "*@underwood-family.zzz",
"boost": 1,
"rewrite": "constant_score"
}
}
}
}

:::

复合查询

复合查询就是把一些简单查询组合在一起实现更复杂的查询需求,除此之外,复合查询还可以控制另外一个查询的行为。

复合查询有以下类型:

  • bool - 布尔查询,可以组合多个过滤语句来过滤文档。
  • boosting - 提供调整相关性打分的能力,在 positive 块中指定匹配文档的语句,同时降低在 negative 块中也匹配的文档的得分。
  • constant_score - 使用 constant_score 可以将 query 转化为 filter,filter 可以忽略相关性算分的环节,并且 filter 可以有效利用缓存,从而提高查询的性能。
  • dis_max - 返回匹配了一个或者多个查询语句的文档,但只将最佳匹配的评分作为相关性算分返回。
  • function_score - 支持使用函数来修改查询返回的分数。

bool (布尔查询)

bool 查询可以把任意多个简单查询组合在一起,使用 must、should、must_not、filter 选项来表示简单查询之间的逻辑,每个选项都可以出现 0 次到多次,它们的含义如下:

  • must - 文档必须匹配 must 选项下的查询条件,相当于逻辑运算的 AND,且参与文档相关度的评分。
  • should - 文档可以匹配 should 选项下的查询条件也可以不匹配,相当于逻辑运算的 OR,且参与文档相关度的评分。
  • must_not - 与 must 相反,匹配该选项下的查询条件的文档不会被返回;需要注意的是,must_not 语句不会影响评分,它的作用只是将不相关的文档排除
  • filter - 和 must 一样,匹配 filter 选项下的查询条件的文档才会被返回,但是 filter 不评分,只起到过滤功能,与 must_not 相反

:::details bool 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
GET kibana_sample_data_ecommerce/_search
{
"query": {
"bool": {
"filter": {
"term": {
"type": "order"
}
},
"must_not": {
"range": {
"taxful_total_price": {
"gte": 30
}
}
},
"should": [
{
"match": {
"day_of_week": "Sunday"
}
},
{
"match": {
"category": "Clothing"
}
}
],
"minimum_should_match": 1
}
}
}

:::

boosting

boosting 提供了调整相关性打分的能力。

boosting 查询包括 positivenegativenegative_boost 三个部分。positive 中的查询评分保持不变;negative 中的查询会降低文档评分;相关性算分降低的程度将由 negative_boost 参数决定,其取值范围为:[0.0, 1.0]

:::details boosting 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
GET kibana_sample_data_ecommerce/_search
{
"query": {
"boosting": {
"positive": {
"term": {
"day_of_week": "Monday"
}
},
"negative": {
"range": {
"taxful_total_price": {
"gte": "30"
}
}
},
"negative_boost": 0.2
}
}
}

:::

constant_score

使用 constant_score 可以将 query 转化为 filter,filter 可以忽略相关性算分的环节,并且 filter 可以有效利用缓存,从而提高查询的性能。

:::details constant_score 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
GET kibana_sample_data_ecommerce/_search
{
"query": {
"constant_score": {
"filter": {
"term": {
"day_of_week": "Monday"
}
},
"boost": 1.2
}
}
}

:::

dis_max

dis_max 查询与 bool 查询有一定联系也有一定区别,dis_max 查询支持多并发查询,可返回与任意查询条件子句匹配的任何文档类型。与 bool 查询可以将所有匹配查询的分数相结合使用的方式不同,dis_max 查询只使用最佳匹配查询条件的分数。

:::details dis_max 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
GET kibana_sample_data_ecommerce/_search
{
"query": {
"dis_max": {
"queries": [
{
"term": {
"currency": "EUR"
}
},
{
"term": {
"day_of_week": "Sunday"
}
}
],
"tie_breaker": 0.7
}
}
}

:::

function_score

function_score 查询可以修改查询的文档得分,这个查询在有些情况下非常有用,比如通过评分函数计算文档得分代价较高,可以改用过滤器加自定义评分函数的方式来取代传统的评分方式。

使用 function_score 查询,用户需要定义一个查询和一至多个评分函数,评分函数会对查询到的每个文档分别计算得分。

function_score 查询提供了以下几种算分函数:

  • script_score - 利用自定义脚本完全控制算分逻辑。
  • weight - 为每一个文档设置一个简单且不会被规范化的权重。
  • random_score - 为每个用户提供一个不同的随机算分,对结果进行排序。
  • field_value_factor - 使用文档字段的值来影响算分,例如将好评数量这个字段作为考虑因数。
  • decay functions - 衰减函数,以某个字段的值为标准,距离指定值越近,算分就越高。例如我想让书本价格越接近 10 元,算分越高排序越靠前。

:::details function_score 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
GET kibana_sample_data_ecommerce/_search
{
"query": {
"function_score": {
"query": { "match_all": {} },
"boost": "5",
"functions": [
{
"filter": { "match": { "day_of_week": "Sunday" } },
"random_score": {},
"weight": 23
},
{
"filter": { "match": { "day_of_week": "Monday" } },
"weight": 42
}
],
"max_boost": 42,
"score_mode": "max",
"boost_mode": "multiply",
"min_score": 42
}
}
}

:::

推荐搜索

ES 通过 Suggester 提供了推荐搜索能力,可以用于文本纠错,文本自动补全等场景。

根据使用场景的不同,ES 提供了以下 4 种 Suggester:

  • Term Suggester - 基于单词的纠错补全。
  • Phrase Suggester - 基于短语的纠错补全。
  • Completion Suggester - 自动补全单词,输入词语的前半部分,自动补全单词。
  • Context Suggester - 基于上下文的补全提示,可以实现上下文感知推荐。

Term Suggester

Term Suggester 提供了基于单词的纠错、补全功能,其工作原理是基于编辑距离(edit distance)来运作的,编辑距离的核心思想是一个词需要改变多少个字符就可以和另一个词一致

:::details term suggester 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
GET kibana_sample_data_ecommerce/_search
{
"query": {
"match": {
"day_of_week": "Sund"
}
},
"suggest": {
"my_suggest": {
"text": "Sund",
"term": {
"suggest_mode": "missing",
"field": "day_of_week"
}
}
}
}

响应结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
// 略
},
"suggest": {
"my_suggest": [
{
"text": "Sund",
"offset": 0,
"length": 4,
"options": [
{
"text": "Sunday",
"score": 0.5,
"freq": 614
}
]
}
]
}
}

:::

Term Suggester API 有很多参数,比较常用的有以下几个 -

  • text - 指定了需要产生建议的文本,一般是用户的输入内容。
  • field - 指定从文档的哪个字段中获取建议。
  • suggest_mode - 设置建议的模式。其值有以下几个选项 -
    • missing - 如果索引中存在就不进行建议,默认的选项。
    • popular - 推荐出现频率更高的词。
    • always - 不管是否存在,都进行建议。
  • analyzer - 指定分词器来对输入文本进行分词,默认与 field 指定的字段设置的分词器一致。
  • size - 为每个单词提供的最大建议数量。
  • sort - 建议结果排序的方式,有以下两个选项 -
    • score - 先按相似性得分排序,然后按文档频率排序,最后按词项本身(字母顺序的等)排序。
    • frequency - 先按文档频率排序,然后按相似性得分排序,最后按词项本身排序。

Phrase Suggester

Phrase Suggester 在 Term Suggester 的基础上增加了一些额外的逻辑,因为是短语形式的建议,所以会考量多个 term 间的关系,比如相邻的程度、词频等

:::details phrase suggester 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
GET kibana_sample_data_logs/_search
{
"suggest": {
"text": "Firefix",
"simple_phrase": {
"phrase": {
"field": "agent",
"direct_generator": [ {
"field": "agent",
"suggest_mode": "always"
} ],
"highlight": {
"pre_tag": "<em>",
"post_tag": "</em>"
}
}
}
}
}

响应结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
"took" : 2,
"timed_out" : false,
"_shards" : // 略
"hits" : // 略
"suggest" : {
"simple_phrase" : [
{
"text" : "Firefix",
"offset" : 0,
"length" : 7,
"options" : [
{
"text" : "firefox",
"highlighted" : "<em>firefox</em>",
"score" : 0.2000096
}
]
}
]
}
}

:::

Phrase Suggester 可用的参数也是比较多的,下面介绍几个用得比较多的参数选项 -

  • max_error - 指定最多可以拼写错误的词语的个数。
  • confidence - 其作用是用来控制返回结果条数的。如果用户输入的数据(短语)得分为 N,那么返回结果的得分需要大于 N * confidenceconfidence 默认值为 1.0。
  • highlight - 高亮被修改后的词语。

Completion Suggester

Completion Suggester 提供了自动补全的功能

Completion Suggester 在实现的时候会将 analyze(将文本分词,并且去除没用的词语,例如 is、at 这样的词语) 后的数据进行编码,构建为 FST 并且和索引存放在一起。FST(**finite-state transducer**)是一种高效的前缀查询索引。由于 FST 天生为前缀查询而生,所以其非常适合实现自动补全的功能。ES 会将整个 FST 加载到内存中,所以在使用 FST 进行前缀查询的时候效率是非常高效的。

在使用 Completion Suggester 前需要定义 Mapping,对应的字段需要使用 “completion” type。

:::details completion suggester 示例

构造用于自动补全的测试数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 先删除原来的索引
DELETE music

# 新增 type 字段,类型为 completion,用于自动补全测试
PUT music
{
"mappings": {
"properties": {
"suggest": {
"type": "completion"
}
}
}
}

# 添加推荐
PUT music/_doc/1?refresh
{
"suggest" : {
"input": [ "Nevermind", "Nirvana" ],
"weight" : 34
}
}
PUT music/_doc/1?refresh
{
"suggest": [
{
"input": "Nevermind",
"weight": 10
},
{
"input": "Nirvana",
"weight": 3
}
]
}

获取推荐:

1
2
3
4
5
6
7
8
9
10
11
12
13
POST music/_search
{
"suggest": {
"song-suggest": {
"prefix": "nir",
"completion": {
"field": "suggest",
"size": 5,
"skip_duplicates": true
}
}
}
}

:::

Context Suggester

Context Suggester 是 Completion Suggester 的扩展,可以实现上下文感知推荐

ES 支持两种类型的上下文:

  • Category:任意字符串的分类。
  • Geo:地理位置信息。

在使用 Context Suggester 前需要定义 Mapping,然后在数据中加入相关的 Context 信息。

:::details context suggester 示例

构造用于 Context Suggester 的测试数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# 删除原来的索引
DELETE books_context

# 创建用于测试 Context Suggester 的索引
PUT books_context
{
"mappings": {
"properties": {
"book_id": {
"type": "keyword"
},
"name": {
"type": "text",
"analyzer": "standard"
},
"name_completion": {
"type": "completion",
"contexts": [
{
"name": "book_type",
"type": "category"
}
]
},
"author": {
"type": "keyword"
},
"intro": {
"type": "text"
},
"price": {
"type": "double"
},
"date": {
"type": "date"
}
}
},
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
}
}

# 导入测试数据
PUT books_context/_doc/4
{
"book_id": "4ee82465",
"name": "Linux Programming",
"name_completion": {
"input": ["Linux Programming"],
"contexts": {
"book_type": "program"
}
},
"author": "Richard Stones",
"intro": "Happy to Linux Programming",
"price": 10.9,
"date": "2022-06-01"
}
PUT books_context/_doc/5
{
"book_id": "4ee82466",
"name": "Linus Autobiography",
"name_completion": {
"input": ["Linus Autobiography"],
"contexts": {
"book_type": "autobiography"
}
},
"author": "Linus",
"intro": "Linus Autobiography",
"price": 14.9,
"date": "2012-06-01"
}

执行 Context Suggester 查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
POST books_context/_search
{
"suggest": {
"my_suggest": {
"prefix": "linu",
"completion": {
"field": "name_completion",
"contexts": {
"book_type": "autobiography"
}
}
}
}
}

:::

参考资料