Dunwu Blog

大道至简,知易行难

Elasticsearch 简介

Elasticsearch 是一个基于 Lucene 的搜索和数据分析工具,它提供了一个分布式服务。Elasticsearch 是遵从 Apache 开源条款的一款开源产品,是当前主流的企业级搜索引擎。

它用于全文搜索、结构化搜索、分析以及将这三者混合使用:

  • 维基百科使用 Elasticsearch 提供全文搜索并高亮关键字,以及**输入实时搜索(search-as-you-type)搜索纠错(did-you-mean)**等搜索建议功能。
  • 英国卫报使用 Elasticsearch 结合用户日志和社交网络数据提供给他们的编辑以实时的反馈,以便及时了解公众对新发表的文章的回应。
  • StackOverflow 结合全文搜索与地理位置查询,以及more-like-this功能来找到相关的问题和答案。
  • Github 使用 Elasticsearch 检索 1300 亿行的代码。

Elasticsearch 特点

  • 分布式的实时文件存储,每个字段都被索引并可被搜索;
  • 分布式的实时分析搜索引擎;
  • 可弹性扩展到上百台服务器规模,处理 PB 级结构化或非结构化数据;
  • 开箱即用(安装即可使用),它提供了许多合理的缺省值,并对初学者隐藏了复杂的搜索引擎理论。只需很少的学习既可在生产环境中使用。

Elasticsearch 发展历史

  • 2010 年 2 月 8 日,Elasticsearch 第一个公开版本发布。

  • 2010 年 5 月 14 日,发布第一个具有里程碑意义的初始版本 0.7.0 ,具有如下特征:

  • Zen Discovery 自动发现模块;

    • 支持 Groovy Client;
  • 简单的插件管理机制;

    • 更好地支持 icu 分词器;
  • 更多的管理 api。

  • 2013 年初,GitHub 抛弃了 Solr,采取 ElasticSearch 来做其 PB 级的搜索。

  • 2014 年 2 月 14 日,发布 1.0.0 版本,增加如下重要特性:

  • 支持 Snapshot/Restore API 备份恢复 API;

    • 支持聚合分析 Aggregations;
  • 支持 cat api;

    • 支持断路器;
  • 引入 Doc values。

  • 2015 年 10 月 28 日,发布 2.0.0 版本,有如下重要特性:

  • 增加了 Pipleline Aggregations;

    • query/filter 查询合并,都合并到 query 中,根据不同的上下文执行不同的查询;
  • 压缩存储可配置;

    • Rivers 模块被移除;
  • Multicast 组播发现被移除,成为一个插件,生产环境必须配置单播地址。

  • 2016 年 10 月 26 日,发布 5.0.0 版本,有如下重大特性变化:

  • Lucene 6.x 的支持,磁盘空间少一半;索引时间少一半;查询性能提升 25%;支持 IPV6;

    • Internal Engine 级别移除了用于避免同一文档并发更新的竞争锁,带来 15%-20% 的性能提升;
  • Shrink API,它可将分片数进行收缩成它的因数,如之前你是 15 个分片,你可以收缩成 5 个或者 3 个又或者 1 个,那么我们就可以想象成这样一种场景,在写入压力非常大的收集阶段,设置足够多的索引,充分利用 shard 的并行写能力,索引写完之后收缩成更少的 shard,提高查询性能;

    • 提供了第一个 Java 原生的 REST 客户端 SDK;
  • IngestNode,之前如果需要对数据进行加工,都是在索引之前进行处理,比如 logstash 可以对日志进行结构化和转换,现在直接在 es 就可以处理了;

    • 提供了 Painless 脚本,代替 Groovy 脚本;
    • 移除 site plugins,就是说 head、bigdesk 都不能直接装 es 里面了,不过可以部署独立站点(反正都是静态文件)或开发 kibana 插件;
    • 新增 Sliced Scroll 类型,现在 Scroll 接口可以并发来进行数据遍历了。每个 Scroll 请求,可以分成多个 Slice 请求,可以理解为切片,各 Slice 独立并行,利用 Scroll 重建或者遍历要快很多倍;
    • 新增了 Profile API;
    • 新增了 Rollover API;
    • 新增 Reindex;
    • 引入新的字段类型 Text/Keyword 来替换 String;
    • 限制索引请求大小,避免大量并发请求压垮 ES;
    • 限制单个请求的 shards 数量,默认 1000 个。
  • 2017 年 8 月 31 日,发布 6.0.0 版本,具有如下重要特性:

  • 稀疏性 Doc Values 的支持;

    • Index Sorting,即索引阶段的排序;
  • 顺序号的支持,每个 es 的操作都有一个顺序编号(类似增量设计);

    • 无缝滚动升级;
  • 从 6.0 开始不支持一个 index 里面存在多个 type;

    • Index-template inheritance,索引版本的继承,目前索引模板是所有匹配的都会合并,这样会造成索引模板有一些冲突问题, 6.0 将会只匹配一个,索引创建时也会进行验证;
    • Load aware shard routing, 基于负载的请求路由,目前的搜索请求是全节点轮询,那么性能最慢的节点往往会造成整体的延迟增加,新的实现方式将基于队列的耗费时间自动调节队列长度,负载高的节点的队列长度将减少,让其他节点分摊更多的压力,搜索和索引都将基于这种机制;
    • 已经关闭的索引将也支持 replica 的自动处理,确保数据可靠。
  • 2019 年 4 月 10 日,发布 7.0.0 版本,具有如下重要特性:

  • 集群连接变化:TransportClient 被废弃,es7 的 java 代码,只能使用 restclient;对于 java 编程,建议采用 High-level-rest-client 的方式操作 ES 集群;

    • ES 程序包默认打包 jdk:7.x 版本的程序包大小变成 300MB+,对比 6.x,包大了 200MB+,这正是 JDK 的大小;
  • 采用基于 Lucene 9.0;

    • 正式废除单个索引下多 Type 的支持,es6 时,官方就提到了 es7 会删除 type,并且 es6 时,已经规定每一个 index 只能有一个 type。在 es7 中,使用默认的 _doc 作为 type,官方说在 8.x 版本会彻底移除 type。api 请求方式也发送变化,如获得某索引的某 ID 的文档:GET index/_doc/id 其中 index 和 id 为具体的值;
  • 引入了真正的内存断路器,它可以更精准地检测出无法处理的请求,并防止它们使单个节点不稳定;

    • Zen2 是 Elasticsearch 的全新集群协调层,提高了可靠性、性能和用户体验,变得更快、更安全,并更易于使用。

Elasticsearch 概念

下列有一些概念是 Elasticsearch 的核心。从一开始就理解这些概念将极大地帮助简化学习 Elasticsearch 的过程。

近实时(NRT)

Elasticsearch 是一个近乎实时的搜索平台。这意味着从索引文档到可搜索文档的时间有一点延迟(通常是一秒)。

索引(Index)

索引在不同语境,有着不同的含义

  • 索引(名词):一个 索引 类似于传统关系数据库中的一个 数据库 ,是一个存储关系型文档的容器。 索引 (index) 的复数词为 indices 或 indexes 。索引实际上是指向一个或者多个物理分片逻辑命名空间
  • 索引(动词):索引一个文档 就是存储一个文档到一个 索引 (名词)中以便被检索和查询。这非常类似于 SQL 语句中的 INSERT 关键词,除了文档已存在时,新文档会替换旧文档情况之外。
  • 倒排索引:关系型数据库通过增加一个索引比如一个 B 树索引到指定的列上,以便提升数据检索速度。Elasticsearch 和 Lucene 使用了一个叫做 倒排索引 的结构来达到相同的目的。

索引的 Mapping 和 Setting

  • Mapping 定义文档字段的类型
  • Setting 定义不同的数据分布

示例:

1
2
3
4
5
6
7
8
{
"settings": { ... any settings ... },
"mappings": {
"type_one": { ... any mappings ... },
"type_two": { ... any mappings ... },
...
}
}

倒排索引

img

index template

**index template**(索引模板)帮助用户设定 Mapping 和 Setting,并按照一定的规则,自动匹配到新创建的索引之上。

  • 模板仅在一个索引被创建时,才会产生作用。修改模板不会影响已创建的索引。
  • 你可以设定多个索引模板,这些设置会被 merge 在一起。
  • 你可以指定 order 的数值,控制 merge 的过程。

当新建一个索引时

  • 应用 ES 默认的 Mapping 和 Setting
  • 应用 order 数值低的 index template 中的设定
  • 应用 order 数值高的 index template 中的设定,之前的设定会被覆盖
  • 应用创建索引是,用户所指定的 Mapping 和 Setting,并覆盖之前模板中的设定。

示例:创建默认索引模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
PUT _template/template_default
{
"index_patterns": ["*"],
"order": 0,
"version": 1,
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
}
}

PUT /_template/template_test
{
"index_patterns": ["test*"],
"order": 1,
"settings": {
"number_of_shards": 1,
"number_of_replicas": 2
},
"mappings": {
"date_detection": false,
"numeric_detection": true
}
}

# 查看索引模板
GET /_template/template_default
GET /_template/temp*

#写入新的数据,index以test开头
PUT testtemplate/_doc/1
{
"someNumber": "1",
"someDate": "2019/01/01"
}
GET testtemplate/_mapping
GET testtemplate/_settings

PUT testmy
{
"settings":{
"number_of_replicas":5
}
}

PUT testmy/_doc/1
{
"key": "value"
}

GET testmy/_settings
DELETE testmy
DELETE /_template/template_default
DELETE /_template/template_test

dynamic template

  • 根据 ES 识别的数据类型,结合字段名称,来动态设定字段类型
    • 所有的字符串类型都设定成 Keyword,或者关闭 keyword 字段。
    • is 开头的字段都设置成 boolean
    • long_ 开头的都设置成 long 类型
  • dynamic template 是定义在某个索引的 Mapping 中
  • template 有一个名称
  • 匹配规则是一个数组
  • 为匹配到字段设置 Mapping

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#Dynaminc Mapping 根据类型和字段名
DELETE my_index

PUT my_index/_doc/1
{
"firstName": "Ruan",
"isVIP": "true"
}

GET my_index/_mapping

DELETE my_index
PUT my_index
{
"mappings": {
"dynamic_templates": [
{
"strings_as_boolean": {
"match_mapping_type": "string",
"match": "is*",
"mapping": {
"type": "boolean"
}
}
},
{
"strings_as_keywords": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}
]
}
}
GET my_index/_mapping

DELETE my_index
#结合路径
PUT my_index
{
"mappings": {
"dynamic_templates": [
{
"full_name": {
"path_match": "name.*",
"path_unmatch": "*.middle",
"mapping": {
"type": "text",
"copy_to": "full_name"
}
}
}
]
}
}
GET my_index/_mapping


PUT my_index/_doc/1
{
"name": {
"first": "John",
"middle": "Winston",
"last": "Lennon"
}
}

GET my_index/_search?q=full_name:John
DELETE my_index

类型(Type)

type 是一个逻辑意义上的分类或者叫分区,允许在同一索引中建立多个 type。本质是相当于一个过滤条件,高版本将会废弃 type 概念。

6.0.0 版本及之后,废弃 type

文档(Document)

Elasticsearch 是面向文档的,文档是所有可搜索数据的最小单位

Elasticsearch 使用 JSON 作为文档的序列化格式。

在索引/类型中,可以根据需要存储任意数量的文档。

每个文档都有一个 Unique ID

  • 用户可以自己指定
  • 或通过 Elasticsearch 自动生成

文档的元数据

一个文档不仅仅包含它的数据 ,也包含元数据 —— 有关文档的信息。

  • _index:文档在哪存放
  • _type:文档表示的对象类别
  • _id:文档唯一标识
  • _source:文档的原始 Json 数据
  • _all:整合所有字段内容到该字段,已被废除
  • _version:文档的版本信息
  • _score:相关性打分

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"_index": "megacorp",
"_type": "employee",
"_id": "1",
"_version": 1,
"found": true,
"_source": {
"first_name": "John",
"last_name": "Smith",
"age": 25,
"about": "I love to go rock climbing",
"interests": ["sports", "music"]
}
}

节点(Node)

节点简介

一个运行中的 Elasticsearch 实例称为一个节点

Elasticsearch 实例本质上是一个 Java 进程。一台机器上可以运行多个 Elasticsearch 进程,但是生产环境建议一台机器上只运行一个 Elasticsearch 进程

每个节点都有名字,通过配置文件配置,或启动时通过 -E node.name=node1 指定。

每个节点在启动后,会分配一个 UID,保存在 data 目录下。

节点类型

  • 主节点(master node):每个节点都保存了集群的状态,只有 master 节点才能修改集群的状态信息(保证数据一致性)。集群状态,维护了以下信息:
    • 所有的节点信息
    • 所有的索引和其相关的 mapping 和 setting 信息
    • 分片的路由信息
  • 候选节点(master eligible node):master eligible 节点可以参加选主流程。第一个启动的节点,会将自己选举为 mater 节点。
    • 每个节点启动后,默认为 master eligible 节点,可以通过配置 node.master: false 禁止
  • 数据节点(data node):负责保存分片数据。
  • 协调节点(coordinating node):负责接收客户端的请求,将请求分发到合适的接地那,最终把结果汇集到一起。每个 Elasticsearch 节点默认都是协调节点(coordinating node)。
  • 冷/热节点(warm/hot node):针对不同硬件配置的数据节点(data node),用来实现 Hot & Warm 架构,降低集群部署的成本。
  • 机器学习节点(machine learning node):负责执行机器学习的 Job,用来做异常检测。

节点配置

配置参数 默认值 说明
node.master true 是否为主节点
node.data true 是否为数据节点
node.ingest true
node.ml true 是否为机器学习节点(需要开启 x-pack)

建议

开发环境中一个节点可以承担多种角色。但是,在生产环境中,节点应该设置为单一角色。

集群(Cluster)

集群简介

拥有相同 cluster.name 配置的 Elasticsearch 节点组成一个集群cluster.name 默认名为 elasticsearch,可以通过配置文件修改,或启动时通过 -E cluster.name=xxx 指定。

当有节点加入集群中或者从集群中移除节点时,集群将会重新平均分布所有的数据。

当一个节点被选举成为主节点时,它将负责管理集群范围内的所有变更,例如增加、删除索引,或者增加、删除节点等。 而主节点并不需要涉及到文档级别的变更和搜索等操作,所以当集群只拥有一个主节点的情况下,即使流量增加,它也不会成为瓶颈。 任何节点都可以成为主节点。

作为用户,我们可以将请求发送到集群中的任何节点 ,包括主节点。 每个节点都知道任意文档所处的位置,并且能够将我们的请求直接转发到存储我们所需文档的节点。 无论我们将请求发送到哪个节点,它都能负责从各个包含我们所需文档的节点收集回数据,并将最终结果返回給客户端。 Elasticsearch 对这一切的管理都是透明的。

集群健康

Elasticsearch 的集群监控信息中包含了许多的统计数据,其中最为重要的一项就是 集群健康 , 它在 status 字段中展示为 greenyellow 或者 red

在一个不包含任何索引的空集群中,它将会有一个类似于如下所示的返回内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"cluster_name" : "elasticsearch",
"status" : "green",
"timed_out" : false,
"number_of_nodes" : 1,
"number_of_data_nodes" : 1,
"active_primary_shards" : 5,
"active_shards" : 5,
"relocating_shards" : 0,
"initializing_shards" : 0,
"unassigned_shards" : 0,
"delayed_unassigned_shards" : 0,
"number_of_pending_tasks" : 0,
"number_of_in_flight_fetch" : 0,
"task_max_waiting_in_queue_millis" : 0,
"active_shards_percent_as_number" : 100.0
}

status 字段指示着当前集群在总体上是否工作正常。它的三种颜色含义如下:

  • **green**:所有的主分片和副本分片都正常运行。
  • **yellow**:所有的主分片都正常运行,但不是所有的副本分片都正常运行。
  • **red**:有主分片没能正常运行。

分片(Shards)

分片简介

索引实际上是指向一个或者多个物理分片逻辑命名空间

一个分片是一个底层的工作单元 ,它仅保存了全部数据中的一部分。一个分片可以视为一个 Lucene 的实例,并且它本身就是一个完整的搜索引擎。 我们的文档被存储和索引到分片内,但是应用程序是直接与索引而不是与分片进行交互。

Elasticsearch 是利用分片将数据分发到集群内各处的。分片是数据的容器,文档保存在分片内,分片又被分配到集群内的各个节点里。 当你的集群规模扩大或者缩小时, Elasticsearch 会自动的在各节点中迁移分片,使得数据仍然均匀分布在集群里。

主分片和副分片

分片分为主分片(Primary Shard)和副分片(Replica Shard)。

主分片:用于解决数据水平扩展的问题。通过主分片,可以将数据分布到集群内不同节点上。

  • 索引内任意一个文档都归属于一个主分片。
  • 主分片数在索引创建时指定,后序不允许修改,除非 Reindex

副分片(Replica Shard):用于解决数据高可用的问题。副分片是主分片的拷贝。副本分片作为硬件故障时保护数据不丢失的冗余备份,并为搜索和返回文档等读操作提供服务。

  • 副分片数可以动态调整
  • 增加副本数,还可以在一定程度上提高服务的可用性(读取的吞吐)

对于生产环境中分片的设定,需要提前做好容量规划

分片数过小

  • 无法水平扩展
  • 单个分片的数量太大,导致数据重新分配耗时

分片数过大

  • 影响搜索结果的相关性打分,影响统计结果的准确性
  • 单节点上过多的分片,会导致资源浪费,同时也会影响性能

副本(Replicas)

副本主要是针对主分片(Shards)的复制,Elasticsearch 中主分片可以拥有 0 个或多个的副本。

副本分片的主要目的就是为了故障转移。

分片副本很重要,主要有两个原因:

  • 它在分片或节点发生故障时提供高可用性。因此,副本分片永远不会在与其复制的主分片相同的节点;
  • 副本分片也可以接受搜索的请求,可以并行搜索,从而提高系统的吞吐量。

每个 Elasticsearch 分片都是 Lucene 索引。单个 Lucene 索引中可以包含最大数量的文档。截止 LUCENE-5843,限制是 2,147,483,519(= Integer.MAX_VALUE - 128)文档。您可以使用_cat/shardsAPI 监控分片大小。

参考资料

Elasticsearch 索引

索引管理操作

Elasticsearch 索引管理主要包括如何进行索引的创建、索引的删除、副本的更新、索引读写权限、索引别名的配置等等内容。

索引删除

ES 索引删除操作向 ES 集群的 http 接口发送指定索引的 delete http 请求即可,可以通过 curl 命令,具体如下:

1
curl -X DELETE http://{es_host}:{es_http_port}/{index}

如果删除成功,它会返回如下信息,具体示例如下:

1
curl -X DELETE http://10.10.10.66:9200/my_index?pretty

为了返回的信息便于读取,增加了 pretty 参数:

1
2
3
{
"acknowledged" : true
}

索引别名

ES 的索引别名就是给一个索引或者多个索引起的另一个名字,典型的应用场景是针对索引使用的平滑切换。

首先,创建索引 my_index,然后将别名 my_alias 指向它,示例如下:

1
2
PUT /my_index
PUT /my_index/_alias/my_alias

也可以通过如下形式:

1
2
3
4
5
6
POST /_aliases
{
"actions": [
{ "add": { "index": "my_index", "alias": "my_alias" }}
]
}

也可以在一次请求中增加别名和移除别名混合使用:

1
2
3
4
5
6
7
POST /_aliases
{
"actions": [
{ "remove": { "index": "my_index", "alias": "my_alias" }}
{ "add": { "index": "my_index_v2", "alias": "my_alias" }}
]
}

需要注意的是,如果别名与索引是一对一的,使用别名索引文档或者查询文档是可以的,但是如果别名和索引是一对多的,使用别名会发生错误,因为 ES 不知道把文档写入哪个索引中去或者从哪个索引中读取文档。

ES 索引别名有个典型的应用场景是平滑切换,更多细节可以查看 Elasticsearch(ES)索引零停机(无需重启)无缝平滑切换的方法

Settings 详解

Elasticsearch 索引的配置项主要分为静态配置属性动态配置属性,静态配置属性是索引创建后不能修改,而动态配置属性则可以随时修改。

ES 索引设置的 api 为 **__settings_**,完整的示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
PUT /my_index
{
"settings": {
"index": {
"number_of_shards": "1",
"number_of_replicas": "1",
"refresh_interval": "60s",
"analysis": {
"filter": {
"tsconvert": {
"type": "stconvert",
"convert_type": "t2s",
"delimiter": ","
},
"synonym": {
"type": "synonym",
"synonyms_path": "analysis/synonyms.txt"
}
},
"analyzer": {
"ik_max_word_synonym": {
"filter": [
"synonym",
"tsconvert",
"standard",
"lowercase",
"stop"
],
"tokenizer": "ik_max_word"
},
"ik_smart_synonym": {
"filter": [
"synonym",
"standard",
"lowercase",
"stop"
],
"tokenizer": "ik_smart"
}
},
"mapping": {
"coerce": "false",
"ignore_malformed": "false"
},
"indexing": {
"slowlog": {
"threshold": {
"index": {
"warn": "2s",
"info": "1s"
}
}
}
},
"provided_name": "hospital_202101070533",
"query": {
"default_field": "timestamp",
"parse": {
"allow_unmapped_fields": "false"
}
},
"requests": {
"cache": {
"enable": "true"
}
},
"search": {
"slowlog": {
"threshold": {
"fetch": {
"warn": "1s",
"info": "200ms"
},
"query": {
"warn": "1s",
"info": "500ms"
}
}
}
}
}
}
}

固定属性

  • **_index.creation_date_**:顾名思义索引的创建时间戳。
  • **_index.uuid_**:索引的 uuid 信息。
  • **_index.version.created_**:索引的版本号。

索引静态配置

  • **_index.number_of_shards**:索引的主分片数,默认值是 **5**。这个配置在索引创建后不能修改;在 es 层面,可以通过 **es.index.max_number_of_shards** 属性设置索引最大的分片数,默认为 **1024_**。
  • **_index.codec**:数据存储的压缩算法,默认值为 **LZ4**,可选择值还有 **best_compression_**,它比 LZ4 可以获得更好的压缩比(即占据较小的磁盘空间,但存储性能比 LZ4 低)。
  • **_index.routing_partition_size_**:路由分区数,如果设置了该参数,其路由算法为:( hash(_routing) + hash(_id) % index.routing_parttion_size ) % number_of_shards。如果该值不设置,则路由算法为 hash(_routing) % number_of_shardings_routing 默认值为 _id

静态配置里,有重要的部分是配置分析器(config analyzers)。

  • index.analysis

    :分析器最外层的配置项,内部主要分为 char_filter、tokenizer、filter 和 analyzer。

    • **_char_filter_**:定义新的字符过滤器件。
    • **_tokenizer_**:定义新的分词器。
    • **_filter_**:定义新的 token filter,如同义词 filter。
    • **_analyzer_**:配置新的分析器,一般是 char_filter、tokenizer 和一些 token filter 的组合。

索引动态配置

  • **_index.number_of_replicas**:索引主分片的副本数,默认值是 **1_**,该值必须大于等于 0,这个配置可以随时修改。
  • **_index.refresh_interval**:执行新索引数据的刷新操作频率,该操作使对索引的最新更改对搜索可见,默认为 **1s**。也可以设置为 **-1_** 以禁用刷新。更详细信息参考 Elasticsearch 动态修改 refresh_interval 刷新间隔设置

Mapping 详解

在 Elasticsearch 中,**Mapping**(映射),用来定义一个文档以及其所包含的字段如何被存储和索引,可以在映射中事先定义字段的数据类型、字段的权重、分词器等属性,就如同在关系型数据库中创建数据表时会设置字段的类型。

Mapping 会把 json 文档映射成 Lucene 所需要的扁平格式

一个 Mapping 属于一个索引的 Type

  • 每个文档都属于一个 Type
  • 一个 Type 有一个 Mapping 定义
  • 7.0 开始,不需要在 Mapping 定义中指定 type 信息

映射分类

在 Elasticsearch 中,映射可分为静态映射和动态映射。在关系型数据库中写入数据之前首先要建表,在建表语句中声明字段的属性,在 Elasticsearch 中,则不必如此,Elasticsearch 最重要的功能之一就是让你尽可能快地开始探索数据,文档写入 Elasticsearch 中,它会根据字段的类型自动识别,这种机制称为动态映射,而静态映射则是写入数据之前对字段的属性进行手工设置。

静态映射

静态映射是在创建索引时手工指定索引映射。静态映射和 SQL 中在建表语句中指定字段属性类似。相比动态映射,通过静态映射可以添加更详细、更精准的配置信息。

如何定义一个 Mapping

1
2
3
4
5
6
7
8
PUT /books
{
"mappings": {
"type_one": { ... any mappings ... },
"type_two": { ... any mappings ... },
...
}
}

动态映射

动态映射是一种偷懒的方式,可直接创建索引并写入文档,文档中字段的类型是 Elasticsearch 自动识别的,不需要在创建索引的时候设置字段的类型。在实际项目中,如果遇到的业务在导入数据之前不确定有哪些字段,也不清楚字段的类型是什么,使用动态映射非常合适。当 Elasticsearch 在文档中碰到一个以前没见过的字段时,它会利用动态映射来决定该字段的类型,并自动把该字段添加到映射中,根据字段的取值自动推测字段类型的规则见下表:

JSON 格式的数据 自动推测的字段类型
null 没有字段被添加
true or false boolean 类型
浮点类型数字 float 类型
数字 long 类型
JSON 对象 object 类型
数组 由数组中第一个非空值决定
string 有可能是 date 类型(若开启日期检测)、double 或 long 类型、text 类型、keyword 类型

下面举一个例子认识动态 mapping,在 Elasticsearch 中创建一个新的索引并查看它的 mapping,命令如下:

1
2
PUT books
GET books/_mapping

此时 books 索引的 mapping 是空的,返回结果如下:

1
2
3
4
5
{
"books": {
"mappings": {}
}
}

再往 books 索引中写入一条文档,命令如下:

1
2
3
4
5
6
PUT books/it/1
{
"id": 1,
"publish_date": "2019-11-10",
"name": "master Elasticsearch"
}

文档写入完成之后,再次查看 mapping,返回结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
"books": {
"mappings": {
"properties": {
"id": {
"type": "long"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"publish_date": {
"type": "date"
}
}
}
}
}

使用动态 mapping 要结合实际业务需求来综合考虑,如果将 Elasticsearch 当作主要的数据存储使用,并且希望出现未知字段时抛出异常来提醒你注意这一问题,那么开启动态 mapping 并不适用。在 mapping 中可以通过 dynamic 设置来控制是否自动新增字段,接受以下参数:

  • **true**:默认值为 true,自动添加字段。
  • **false**:忽略新的字段。
  • **strict**:严格模式,发现新的字段抛出异常。

基础类型

类型 关键字
字符串类型 string、text、keyword
数字类型 long、integer、short、byte、double、float、half_float、scaled_float
日期类型 date
布尔类型 boolean
二进制类型 binary
范围类型 range

复杂类型

类型 关键字
数组类型 array
对象类型 object
嵌套类型 nested

特殊类型

类型 关键字
地理类型 geo_point
地理图形类型 geo_shape
IP 类型 ip
范围类型 completion
令牌计数类型 token_count
附件类型 attachment
抽取类型 percolator

Mapping 属性

Elasticsearch 的 mapping 中的字段属性非常多,具体如下表格:

| 属性名 | 描述 |
| :- | :- | |
| type | 字段类型,常用的有 text、integer 等等。 |
| index | 当前字段是否被作为索引。可选值为 **_true**,默认为 true。 |
| **
store** | 是否存储指定字段,可选值为 **true** | **false**,设置 true 意味着需要开辟单独的存储空间为这个字段做存储,而且这个存储是独立于 **_source** 的存储的。 |
| **
norms** | 是否使用归一化因子,可选值为 **true** | **false**,不需要对某字段进行打分排序时,可禁用它,节省空间;_typetext 时,默认为 true_;而 typekeyword 时,默认为 false_。 |
| **
index_options
** | 索引选项控制添加到倒排索引(Inverted Index)的信息,这些信息用于搜索(Search)和高亮显示:**_docs:只索引文档编号(Doc Number);freqs:索引文档编号和词频率(term frequency);positions:索引文档编号,词频率和词位置(序号);offsets:索引文档编号,词频率,词偏移量(开始和结束位置)和词位置(序号)。默认情况下,被分析的字符串(analyzed string)字段使用 positions_,其他字段默认使用 docs_。此外,需要注意的是 index_option 是 elasticsearch 特有的设置属性;临近搜索和短语查询时,_index_option 必须设置为 offsets_,同时高亮也可使用 postings highlighter。 |
| **
term_vector
** | 索引选项控制词向量相关信息:
no:默认值,表示不存储词向量相关信息;yes:只存储词向量信息;with_positions:存储词项和词项位置;with_offsets:存储词项和字符偏移位置;with_positions_offsets**:存储词项、词项位置、字符偏移位置。_term_vector 是 lucene 层面的索引设置。 |
| similarity | 指定文档相似度算法(也可以叫评分模型):**_BM25**:ES 5 之后的默认设置。 |
| **
copy_to** | 复制到自定义 _all 字段,值是数组形式,即表明可以指定多个自定义的字段。 |
| **
analyzer** | 指定索引和搜索时的分析器,如果同时指定 search_analyzer 则搜索时会优先使用 search_analyzer_。 |
| **
search_analyzer
** | 指定搜索时的分析器,搜索时的优先级最高。 |
| null_value | 用于需要对 Null 值实现搜索的场景,只有 Keyword 类型支持此配置。 |

索引查询

多个 index、多个 type 查询

Elasticsearch 的搜索 api 支持一个索引(index)的多个类型(type)查询以及多个索引(index)的查询。

例如,我们可以搜索 twitter 索引下面所有匹配条件的所有类型中文档,如下:

1
GET /twitter/_search?q=user:shay

我们也可以搜索一个索引下面指定多个 type 下匹配条件的文档,如下:

1
GET /twitter/tweet,user/_search?q=user:banon

我们也可以搜索多个索引下匹配条件的文档,如下:

1
GET /twitter,elasticsearch/_search?q=tags:wow

此外我们也可以搜索所有索引下匹配条件的文档,用_all 表示所有索引,如下:

1
GET /_all/_search?q=tags:wow

甚至我们可以搜索所有索引及所有 type 下匹配条件的文档,如下:

1
GET /_search?q=tags:wow

URI 搜索

Elasticsearch 支持用 uri 搜索,可用 get 请求里面拼接相关的参数,并用 curl 相关的命令就可以进行测试。

如下有一个示例:

1
GET twitter/_search?q=user:kimchy

如下是上一个请求的相应实体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
"timed_out": false,
"took": 62,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1,
"max_score": 1.3862944,
"hits": [
{
"_index": "twitter",
"_type": "_doc",
"_id": "0",
"_score": 1.3862944,
"_source": {
"user": "kimchy",
"date": "2009-11-15T14:12:12",
"message": "trying out Elasticsearch",
"likes": 0
}
}
]
}
}

URI 中允许的参数:

名称 描述
q 查询字符串,映射到 query_string 查询
df 在查询中未定义字段前缀时使用的默认字段
analyzer 查询字符串时指定的分词器
analyze_wildcard 是否允许通配符和前缀查询,默认设置为 false
batched_reduce_size 应在协调节点上一次减少的分片结果数。如果请求中潜在的分片数量很大,则应将此值用作保护机制,以减少每个搜索请求的内存开销
default_operator 默认使用的匹配运算符,可以是AND或者OR,默认是OR
lenient 如果设置为 true,将会忽略由于格式化引起的问题(如向数据字段提供文本),默认为 false
explain 对于每个 hit,包含了具体如何计算得分的解释
_source 请求文档内容的参数,默认 true;设置 false 的话,不返回_source 字段,可以使用_source_include_source_exclude参数分别指定返回字段和不返回的字段
stored_fields 指定每个匹配返回的文档中的存储字段,多个用逗号分隔。不指定任何值将导致没有字段返回
sort 排序方式,可以是fieldNamefieldName:asc或者fieldName:desc的形式。fieldName 可以是文档中的实际字段,也可以是诸如_score 字段,其表示基于分数的排序。此外可以指定多个 sort 参数(顺序很重要)
track_scores 当排序时,若设置 true,返回每个命中文档的分数
track_total_hits 是否返回匹配条件命中的总文档数,默认为 true
timeout 设置搜索的超时时间,默认无超时时间
terminate_after 在达到查询终止条件之前,指定每个分片收集的最大文档数。如果设置,则在响应中多了一个 terminated_early 的布尔字段,以指示查询执行是否实际上已终止。默认为 no terminate_after
from 从第几条(索引以 0 开始)结果开始返回,默认为 0
size 返回命中的文档数,默认为 10
search_type 搜索的方式,可以是dfs_query_then_fetchquery_then_fetch。默认为query_then_fetch
allow_partial_search_results 是否可以返回部分结果。如设置为 false,表示如果请求产生部分结果,则设置为返回整体故障;默认为 true,表示允许请求在超时或部分失败的情况下获得部分结果

查询流程

在 Elasticsearch 中,查询是一个比较复杂的执行模式,因为我们不知道那些 document 会被匹配到,任何一个 shard 上都有可能,所以一个 search 请求必须查询一个索引或多个索引里面的所有 shard 才能完整的查询到我们想要的结果。

找到所有匹配的结果是查询的第一步,来自多个 shard 上的数据集在分页返回到客户端之前会被合并到一个排序后的 list 列表,由于需要经过一步取 top N 的操作,所以 search 需要进过两个阶段才能完成,分别是 query 和 fetch。

参考资料

Elasticsearch 高亮搜索及显示

Elasticsearch 的高亮(highlight)可以让您从搜索结果中的一个或多个字段中获取突出显示的摘要,以便向用户显示查询匹配的位置。当您请求突出显示(即高亮)时,响应结果的 highlight 字段中包括高亮的字段和高亮的片段。Elasticsearch 默认会用 <em></em> 标签标记关键字。

高亮参数

ES 提供了如下高亮参数:

参数 说明
boundary_chars 包含每个边界字符的字符串。默认为,! ?\ \ n。
boundary_max_scan 扫描边界字符的距离。默认为 20。
boundary_scanner 指定如何分割突出显示的片段,支持 chars、sentence、word 三种方式。
boundary_scanner_locale 用来设置搜索和确定单词边界的本地化设置,此参数使用语言标记的形式(“en-US”, “fr-FR”, “ja-JP”)
encoder 表示代码段应该是 HTML 编码的:默认(无编码)还是 HTML (HTML-转义代码段文本,然后插入高亮标记)
fields 指定检索高亮显示的字段。可以使用通配符来指定字段。例如,可以指定 comment*来获取以 comment开头的所有文本和关键字字段的高亮显示。
force_source 根据源高亮显示。默认值为 false。
fragmenter 指定文本应如何在突出显示片段中拆分:支持参数 simple 或者 span。
fragment_offset 控制要开始突出显示的空白。仅在使用 fvh highlighter 时有效。
fragment_size 字符中突出显示的片段的大小。默认为 100。
highlight_query 突出显示搜索查询之外的其他查询的匹配项。这在使用重打分查询时特别有用,因为默认情况下高亮显示不会考虑这些问题。
matched_fields 组合多个匹配结果以突出显示单个字段,对于使用不同方式分析同一字符串的多字段。所有的 matched_fields 必须将 term_vector 设置为 with_positions_offsets,但是只有将匹配项组合到的字段才会被加载,因此只有将 store 设置为 yes 才能使该字段受益。只适用于 fvh highlighter。
no_match_size 如果没有要突出显示的匹配片段,则希望从字段开头返回的文本量。默认为 0(不返回任何内容)。
number_of_fragments 返回的片段的最大数量。如果片段的数量设置为 0,则不会返回任何片段。相反,突出显示并返回整个字段内容。当需要突出显示短文本(如标题或地址),但不需要分段时,使用此配置非常方便。如果 number_of_fragments 为 0,则忽略 fragment_size。默认为 5。
order 设置为 score 时,按分数对突出显示的片段进行排序。默认情况下,片段将按照它们在字段中出现的顺序输出(order:none)。将此选项设置为 score 将首先输出最相关的片段。每个高亮应用自己的逻辑来计算相关性得分。
phrase_limit 控制文档中所考虑的匹配短语的数量。防止 fvh highlighter 分析太多的短语和消耗太多的内存。提高限制会增加查询时间并消耗更多内存。默认为 256。
pre_tags 与 post_tags 一起使用,定义用于突出显示文本的 HTML 标记。默认情况下,突出显示的文本被包装在和标记中。指定为字符串数组。
post_tags 与 pre_tags 一起使用,定义用于突出显示文本的 HTML 标记。默认情况下,突出显示的文本被包装在和标记中。指定为字符串数组。
require_field_match 默认情况下,只突出显示包含查询匹配的字段。将 require_field_match 设置为 false 以突出显示所有字段。默认值为 true。
tags_schema 设置为使用内置标记模式的样式。
type 使用的高亮模式,可选项为**_unifiedplainfvh_**。默认为 _unified_。

自定义高亮片段

如果我们想使用自定义标签,在高亮属性中给需要高亮的字段加上 pre_tagspost_tags 即可。例如,搜索 title 字段中包含关键词 javascript 的书籍并使用自定义 HTML 标签高亮关键词,查询语句如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
GET /books/_search
{
"query": {
"match": { "title": "javascript" }
},
"highlight": {
"fields": {
"title": {
"pre_tags": ["<strong>"],
"post_tags": ["</strong>"]
}
}
}
}

多字段高亮

关于搜索高亮,还需要掌握如何设置多字段搜索高亮。比如,搜索 title 字段的时候,我们期望 description 字段中的关键字也可以高亮,这时候就需要把 require_field_match 属性的取值设置为 faslerequire_field_match 的默认值为 true,只会高亮匹配的字段。多字段高亮的查询语句如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
GET /books/_search
{
"query": {
"match": { "title": "javascript" }
},
"highlight": {
"require_field_match": false,
"fields": {
"title": {},
"description": {}
}
}
}

高亮性能分析

Elasticsearch 提供了三种高亮器,分别是默认的 highlighter 高亮器postings-highlighter 高亮器fast-vector-highlighter 高亮器

默认的 highlighter 是最基本的高亮器。highlighter 高亮器实现高亮功能需要对 _source 中保存的原始文档进行二次分析,其速度在三种高亮器里最慢,优点是不需要额外的存储空间。

postings-highlighter 高亮器实现高亮功能不需要二次分析,但是需要在字段的映射中设置 index_options 参数的取值为 offsets,即保存关键词的偏移量,速度快于默认的 highlighter 高亮器。例如,配置 comment 字段使用 postings-highlighter 高亮器,映射如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
PUT /example
{
"mappings": {
"doc": {
"properties": {
"comment": {
"type": "text",
"index_options": "offsets"
}
}
}
}
}

fast-vector-highlighter 高亮器实现高亮功能速度最快,但是需要在字段的映射中设置 term_vector 参数的取值为 with_positions_offsets,即保存关键词的位置和偏移信息,占用的存储空间最大,是典型的空间换时间的做法。例如,配置 comment 字段使用 fast-vector-highlighter 高亮器,映射如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
PUT /example
{
"mappings": {
"doc": {
"properties": {
"comment": {
"type": "text",
"term_vector": "with_positions_offsets"
}
}
}
}
}

Elasticsearch 分析器

文本分析是把全文本转换为一系列单词(term/token)的过程,也叫分词。在 Elasticsearch 中,分词是通过 analyzer(分析器)来完成的,不管是索引还是搜索,都需要使用 analyzer(分析器)。分析器,分为内置分析器自定义的分析器

分析器可以进一步细分为字符过滤器Character Filters)、分词器Tokenizer)和词元过滤器Token Filters)三部分。它的执行顺序如下:

character filters -> tokenizer -> token filters

字符过滤器(Character Filters)

character filter 的输入是原始的文本 text,如果配置了多个,它会按照配置的顺序执行,目前 ES 自带的 character filter 主要有如下 3 类:

  1. html strip character filter:从文本中剥离 HTML 元素,并用其解码值替换 HTML 实体(如,将 &amp; 替换为 **__**)。
  2. mapping character filter:自定义一个 map 映射,可以进行一些自定义的替换,如常用的大写变小写也可以在该环节设置。
  3. pattern replace character filter:使用 java 正则表达式来匹配应替换为指定替换字符串的字符,此外,替换字符串可以引用正则表达式中的捕获组。

HTML strip character filter

HTML strip 如下示例:

1
2
3
4
5
6
7
8
GET /_analyze
{
"tokenizer": "keyword",
"char_filter": [
"html_strip"
],
"text": "<p>I&apos;m so <b>happy</b>!</p>"
}

经过 html_strip 字符过滤器处理后,输出如下:

1
[ \nI'm so happy!\n ]

Mapping character filter

Mapping character filter 接收键和值映射(key => value)作为配置参数,每当在预处理过程中遇到与键值映射中的键相同的字符串时,就会使用该键对应的值去替换它。

原始文本中的字符串和键值映射中的键的匹配是贪心的,在对给定的文本进行预处理过程中如果配置的键值映射存在包含关系,会优先匹配最长键。同样也可以用空字符串进行替换。

mapping char_filter 不像 html_strip 那样拆箱即可用,必须先进行配置才能使用,它有两个属性可以配置:

参数名称 参数说明
mappings 一组映射,每个元素的格式为 _key => value_。
mappings_path 一个相对或者绝对的文件路径,指向一个每行包含一个 key =>value 映射的 UTF-8 编码文本映射文件。

mapping char_filter 示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
GET /_analyze
{
"tokenizer": "keyword",
"char_filter": [
{
"type": "mapping",
"mappings": [
"٠ => 0",
"١ => 1",
"٢ => 2",
"٣ => 3",
"٤ => 4",
"٥ => 5",
"٦ => 6",
"٧ => 7",
"٨ => 8",
"٩ => 9"
]
}
],
"text": "My license plate is ٢٥٠١٥"
}

分析结果如下:

1
[ My license plate is 25015 ]

Pattern Replace character filter

Pattern Replace character filter 支持如下三个参数:

参数名称 参数说明
pattern 必填参数,一个 java 的正则表达式。
replacement 替换字符串,可以使用 $1 ... $9 语法来引用捕获组。
flags Java 正则表达式的标志,具体参考 java 的 java.util.regex.Pattern 类的标志属性。

如将输入的 text 中大于一个的空格都转变为一个空格,在 settings 时,配置示例如下:

1
2
3
4
5
6
7
8
"char_filter": {
"multi_space_2_one": {
"pattern": "[ ]+",
"type": "pattern_replace",
"replacement": " "
},
...
}

分词器(Tokenizer)

tokenizer 即分词器,也是 analyzer 最重要的组件,它对文本进行分词;一个 analyzer 必需且只可包含一个 tokenizer

ES 自带默认的分词器是 standard tokenizer,标准分词器提供基于语法的分词(基于 Unicode 文本分割算法),并且适用于大多数语言。

此外有很多第三方的分词插件,如中文分词界最经典的 ik 分词器,它对应的 tokenizer 分为 ik_smart 和 ik_max_word,一个是智能分词(针对搜索侧),一个是全切分词(针对索引侧)。

ES 默认提供的分词器 standard 对中文分词不优化,效果差,一般会安装第三方中文分词插件,通常首先 elasticsearch-analysis-ik 插件,它其实是 ik 针对的 ES 的定制版。

elasticsearch-plugin 使用

在安装 elasticsearch-analysis-ik 第三方之前,我们首先要了解 es 的插件管理工具 elasticsearch-plugin 的使用。

现在的 elasticsearch 安装完后,在安装目录的 bin 目录下会存在 elasticsearch-plugin 命令工具,用它来对 es 插件进行管理。

1
bin/elasticsearch-plugin

其实该命令的是软连接,原始路径是:

1
libexec/bin/elasticsearch-plugin

再进一步看脚本代码,你会发现,它是通过 elasticsearch-cli 执行 libexec/lib/tools/plugin-cli/elasticsearch-plugin-cli-x.x.x.jar

但一般使用者了解 elasticsearch-plugin 命令使用就可:

1
2
3
4
5
6
7
8
#  安装指定的插件到当前 ES 节点中
elasticsearch-plugin install {plugin_url}

# 显示当前 ES 节点已经安装的插件列表
elasticsearch-plugin list

# 删除已安装的插件
elasticsearch-plugin remove {plugin_name}

在安装插件时,要保证安装的插件与 ES 版本一致。

elasticsearch-analysis-ik 安装

在确定要安装的 ik 版本之后,执行如下命令:

1
./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v{X.X.X}/elasticsearch-analysis-ik-{X.X.X}.zip

执行完安装命令后,我们会发现在 plugins 中多了 analysis-ik 目录,这里面主要存放的是源码 jar 包,此外,在 config 文件里也多了 analysis-ik 目录,里面主要是 ik 相关的配置,如 IKAnalyzer.cfg.xml 配置、词典文件等。

1
2
3
#  两个新增目录路径
libexec/plugins/analysis-ik/
libexec/config/analysis-ik/

elasticsearch-analysis-ik 使用

ES 5.X 版本开始安装完的 elasticsearch-analysis-ik 提供了两个分词器,分别对应名称是 ik_max_word 和 **ik_smart**,ik_max_word 是索引侧的分词器,走全切模式,ik_smart 是搜索侧的分词器,走智能分词,属于搜索模式。

索引 mapping 设置

安装完 elasticsearch-analysis-ik 后,我们可以指定索引及指定字段设置可用的分析器(analyzer),示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
"qa": {
"mappings": {
"qa": {
"_all": {
"enabled": false
},
"properties": {
"question": {
"type": "text",
"store": true,
"similarity": "BM25",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"answer": {
"type": "text",
"store": false,
"similarity": "BM25",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
...
}
}
}
}
}

如上示例中,analyzer 指定 ik_max_word,即索引侧使用 ik 全切模式,search_analyzer 设置 ik_smart,即搜索侧使用 ik 智能分词模式。

查看 ik 分词结果

es 提供了查看分词结果的 api **analyze**,具体示例如下:

1
2
3
4
5
GET {index}/_analyze
{
"analyzer" : "ik_smart",
"text" : "es 中文分词器安装"
}

输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
{
"tokens": [
{
"token": "es",
"start_offset": 0,
"end_offset": 2,
"type": "CN_WORD",
"position": 0
},
{
"token": "中文",
"start_offset": 3,
"end_offset": 5,
"type": "CN_WORD",
"position": 1
},
{
"token": "分词器",
"start_offset": 5,
"end_offset": 8,
"type": "CN_WORD",
"position": 2
},
{
"token": "安装",
"start_offset": 8,
"end_offset": 10,
"type": "CN_WORD",
"position": 3
}
]
}

elasticsearch-analysis-ik 自定义词典

elasticsearch-analysis-ik 本质是 ik 分词器,使用者根据实际需求可以扩展自定义的词典,具体主要分为如下 2 大类,每类又分为本地配置和远程配置 2 种:

  1. 自定义扩展词典;
  2. 自定义扩展停用词典;

elasticsearch-analysis-ik 配置文件为 IKAnalyzer.cfg.xml,它位于 libexec/config/analysis-ik 目录下,具体配置结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<comment>IK Analyzer 扩展配置</comment>
<!--用户可以在这里配置自己的扩展字典 -->
<entry key="ext_dict"></entry>
<!--用户可以在这里配置自己的扩展停止词字典-->
<entry key="ext_stopwords"></entry>
<!--用户可以在这里配置远程扩展字典 -->
<!-- <entry key="remote_ext_dict">words_location</entry> -->
<!--用户可以在这里配置远程扩展停止词字典-->
<!-- <entry key="remote_ext_stopwords">words_location</entry> -->
</properties>

当然,如果开发者认为 ik 默认的词表有问题,也可以进行调整,文件都在 libexec/config/analysis-ik 下,如 main.dic 为主词典,stopword.dic 为停用词表。

词元过滤器(Token Filters)

token filters 叫词元过滤器,或词项过滤器,对 tokenizer 分出的词进行过滤处理。常用的有转小写、停用词处理、同义词处理等等。一个 analyzer 可包含 0 个或多个词项过滤器,按配置顺序进行过滤

以同义词过滤器的使用示例,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
PUT /test_index
{
"settings": {
"index": {
"analysis": {
"analyzer": {
"synonym": {
"tokenizer": "standard",
"filter": [ "my_stop", "synonym" ]
}
},
"filter": {
"my_stop": {
"type": "stop",
"stopwords": [ "bar" ]
},
"synonym": {
"type": "synonym",
"lenient": true,
"synonyms": [ "foo, bar => baz" ]
}
}
}
}
}
}

同义词

Elasticsearch 同义词通过专有的同义词过滤器(synonym token filter)来进行工作,它允许在分析(analysis)过程中方便地处理同义词,一般是通过配置文件配置同义词。此外,同义词可以再建索引时(index-time synonyms)或者检索时(search-time synonyms)使用。

同义词(synonym)配置语法

如上例子所示,es 同义词配置的 filter 语法具体如下选项:

  • **_type_**:指定 synonym,表示同义词 filter;

  • **_synonyms_path_**:指定同义词配置文件路径;

  • **expand**:该参数决定映射行为的模式,默认为 true,表示扩展模式,具体示例如下:

    • expand == true 时,

      1
      ipod, i-pod, i pod

      等价于:

      1
      ipod, i-pod, i pod => ipod, i-pod, i pod

      expand == false 时,

      1
      ipod, i-pod, i pod

      仅映射第一个单词,等价于:

      1
      ipod, i-pod, i pod => ipod
  • **_lenient_**:如果值为 true 时,遇到那些无法解析的同义词规则时,忽略异常。默认为 false。

同义词文档格式

elasticsearch 的同义词有如下两种形式:

  • 单向同义词:

    1
    ipod, i-pod, i pod => ipod
  • 双向同义词:

    1
    马铃薯, 土豆, potato

单向同义词不管索引还是检索时,箭头左侧的词都会映射成箭头右侧的词;

双向同义词是索引时,都建立同义词的倒排索引,检索时,同义词之间都会进行倒排索引的匹配。

同义词的文档化时,需要注意的是,同一个词在不同的同义词关系中出现时,其它同义词之间不具有传递性,这点需要注意。

假设如上示例中,如果“马铃薯”和其它两个同义词分成两行写:

1
2
马铃薯,土豆
马铃薯,potato

此时,elasticsearch 中不会将“土豆”和“potato”视为同义词关系,所以多个同义词要写在一起,这往往是开发中经常容易疏忽的点。

参考资料

Flink 运维

(1)使用 docker 命令拉取镜像

1
docker pull flink

(2)编写 docker-compose.yml,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
version: '2.1'
services:
jobmanager:
image: flink
expose:
- '6123'
ports:
- '8081:8081'
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager

taskmanager:
image: flink
expose:
- '6121'
- '6122'
depends_on:
- jobmanager
command: taskmanager
links:
- 'jobmanager:jobmanager'
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager

(3)执行 docker-compose,命令如下:

1
docker-compose up -d

(4)打开浏览器,访问 http://127.0.0.1:8081

基础配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# jobManager 的IP地址
jobmanager.rpc.address: localhost

# JobManager 的端口号
jobmanager.rpc.port: 6123

# JobManager JVM heap 内存大小
jobmanager.heap.size: 1024m

# TaskManager JVM heap 内存大小
taskmanager.heap.size: 1024m

# 每个 TaskManager 提供的任务 slots 数量大小
taskmanager.numberOfTaskSlots: 1

# 程序默认并行计算的个数
parallelism.default: 1
# 文件系统来源
# fs.default-scheme

高可用配置

1
2
3
4
5
6
7
8
9
10
11
# 可以选择 'NONE' 或者 'zookeeper'.
# high-availability: zookeeper

# 文件系统路径,让 Flink 在高可用性设置中持久保存元数据
# high-availability.storageDir: hdfs:///flink/ha/

# zookeeper 集群中仲裁者的机器 ip 和 port 端口号
# high-availability.zookeeper.quorum: localhost:2181

# 默认是 open,如果 zookeeper security 启用了该值会更改成 creator
# high-availability.zookeeper.client.acl: open

容错和 checkpoint 配置

1
2
3
4
5
6
7
8
9
10
11
# 用于存储和检查点状态
# state.backend: filesystem

# 存储检查点的数据文件和元数据的默认目录
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints

# savepoints 的默认目标目录(可选)
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints

# 用于启用/禁用增量 checkpoints 的标志
# state.backend.incremental: false

Web UI 配置

1
2
3
4
5
6
7
# 基于 Web 的运行时监视器侦听的地址.
#jobmanager.web.address: 0.0.0.0

# Web 的运行时监视器端口
rest.port: 8081
# 是否从基于 Web 的 jobmanager 启用作业提交
# jobmanager.web.submit.enable: false

高级配置

1
2
3
4
5
6
7
8
9
10
11
12
13
# io.tmp.dirs: /tmp

# 是否应在 TaskManager 启动时预先分配 TaskManager 管理的内存
# taskmanager.memory.preallocate: false

# 类加载解析顺序,是先检查用户代码 jar(“child-first”)还是应用程序类路径(“parent-first”)。 默认设置指示首先从用户代码 jar 加载类
# classloader.resolve-order: child-first

# 用于网络缓冲区的 JVM 内存的分数。 这决定了 TaskManager 可以同时拥有多少流数据交换通道以及通道缓冲的程度。 如果作业被拒绝或者您收到系统没有足够缓冲区的警告,请增加此值或下面的最小/最大值。 另请注意,“taskmanager.network.memory.min”和“taskmanager.network.memory.max”可能会覆盖此分数

# taskmanager.network.memory.fraction: 0.1
# taskmanager.network.memory.min: 67108864
# taskmanager.network.memory.max: 1073741824
1
2
3
4
5
6
7
8
9
10
11
# 指示是否从 Kerberos ticket 缓存中读取
# security.kerberos.login.use-ticket-cache: true

# 包含用户凭据的 Kerberos 密钥表文件的绝对路径
# security.kerberos.login.keytab: /path/to/kerberos/keytab

# 与 keytab 关联的 Kerberos 主体名称
# security.kerberos.login.principal: flink-user

# 以逗号分隔的登录上下文列表,用于提供 Kerberos 凭据(例如,`Client,KafkaClient`使用凭证进行 ZooKeeper 身份验证和 Kafka 身份验证)
# security.kerberos.login.contexts: Client,KafkaClient

Zookeeper 安全配置

1
2
3
4
5
# 覆盖以下配置以提供自定义 ZK 服务名称
# zookeeper.sasl.service-name: zookeeper

# 该配置必须匹配 "security.kerberos.login.contexts" 中的列表(含有一个)
# zookeeper.sasl.login-context-name: Client

参考资料

ZooKeeper Java Api

ZooKeeper 是 Apache 的顶级项目。ZooKeeper 为分布式应用提供了高效且可靠的分布式协调服务,提供了诸如统一命名服务、配置管理和分布式锁等分布式的基础服务。在解决分布式数据一致性方面,ZooKeeper 并没有直接采用 Paxos 算法,而是采用了名为 ZAB 的一致性协议

ZooKeeper 主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储。但是 ZooKeeper 并不是用来专门存储数据的,它的作用主要是用来维护和监控存储数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理

很多大名鼎鼎的框架都基于 ZooKeeper 来实现分布式高可用,如:Dubbo、Kafka 等。

ZooKeeper 官方支持 Java 和 C 的 Client API。ZooKeeper 社区为大多数语言(.NET,python 等)提供非官方 API。

ZooKeeper 官方客户端

ZooKeeper 客户端简介

客户端和服务端交互遵循以下基本步骤:

  1. 客户端连接 ZooKeeper 服务端集群任意工作节点,该节点为客户端分配会话 ID。
  2. 为了保持通信,客户端需要和服务端保持心跳(实质上就是 ping )。否则,ZooKeeper 服务会话超时时间内未收到客户端请求,会将会话视为过期。这种情况下,客户端如果要通信,就需要重新连接。
  3. 只要会话 ID 处于活动状态,就可以执行读写 znode 操作。
  4. 所有任务完成后,客户端断开与 ZooKeeper 服务端集群的连接。如果客户端长时间不活动,则 ZooKeeper 集合将自动断开客户端。

ZooKeeper 官方客户端的核心是 ZooKeeper。它在其构造函数中提供了连接 ZooKeeper 服务的配置选项,并提供了访问 ZooKeeper 数据的方法。

其主要操作如下:

  • connect - 连接 ZooKeeper 服务
  • create - 创建 znode
  • exists - 检查 znode 是否存在及其信息
  • getACL / setACL- 获取/设置一个 znode 的 ACL
  • getData / setData- 获取/设置一个 znode 的数据
  • getChildren - 获取特定 znode 中的所有子节点
  • delete - 删除特定的 znode 及其所有子项
  • close - 关闭连接

ZooKeeper 官方客户端的使用方法是在 maven 项目的 pom.xml 中添加:

1
2
3
4
5
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.0</version>
</dependency>

创建连接

ZooKeeper 类通过其构造函数提供连接 ZooKeeper 服务的功能。其构造函数的定义如下:

1
ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)

参数说明:

  • connectionString - ZooKeeper 集群的主机列表。
  • sessionTimeout - 会话超时时间(以毫秒为单位)。
  • watcher - 实现监视机制的回调。当被监控的 znode 状态发生变化时,ZooKeeper 服务端的 WatcherManager 会主动调用传入的 Watcher ,推送状态变化给客户端。

【示例】连接 ZooKeeper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.jupiter.api.*;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
* ZooKeeper 官方客户端测试例
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @since 2022-02-19
*/
@DisplayName("ZooKeeper 官方客户端测试例")
public class ZooKeeperTest {

/**
* ZooKeeper 连接实例
*/
private static ZooKeeper zk;

/**
* 创建 ZooKeeper 连接
*/
@BeforeAll
public static void init() throws IOException, InterruptedException {
final String HOST = "localhost:2181";
CountDownLatch latch = new CountDownLatch(1);
zk = new ZooKeeper(HOST, 5000, watcher -> {
if (watcher.getState() == Watcher.Event.KeeperState.SyncConnected) {
latch.countDown();
}
});
latch.await();
}

/**
* 关闭 ZooKeeper 连接
*/
@AfterAll
public static void destroy() throws InterruptedException {
if (zk != null) {
zk.close();
}
}

/**
* 建立连接
*/
@Test
public void getState() {
ZooKeeper.States state = zk.getState();
Assertions.assertTrue(state.isAlive());
}

}

说明:

添加一个 connect 方法,用于创建一个 ZooKeeper 对象,用于连接到 ZooKeeper 服务。

这里 CountDownLatch 用于停止(等待)主进程,直到客户端与 ZooKeeper 集合连接。

ZooKeeper 对象通过监听器回调来监听连接状态。一旦客户端与 ZooKeeper 建立连接,监听器回调就会被调用;并且监听器回调函数调用 CountDownLatchcountDown 方法来释放锁,在主进程中 await

节点增删改查

判断节点是否存在

ZooKeeper 类提供了 exists 方法来检查 znode 的存在。如果指定的 znode 存在,则返回一个 znode 的元数据。

exists 方法的签名如下:

1
exists(String path, boolean watcher)
  • path- Znode 路径
  • watcher - 布尔值,用于指定是否监视指定的 znode

【示例】

1
2
Stat stat = zk.exists("/", true);
Assertions.assertNotNull(stat);

创建节点

ZooKeeper 类的 create 方法用于在 ZooKeeper 中创建一个新节点(znode)。

create 方法的签名如下:

1
create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
  • path - Znode 路径。例如,/myapp1,/myapp2,/myapp1/mydata1,myapp2/mydata1/myanothersubdata
  • data - 要存储在指定 znode 路径中的数据
  • acl - 要创建的节点的访问控制列表。ZooKeeper API 提供了一个静态接口 ZooDefs.Ids 来获取一些基本的 acl 列表。例如,ZooDefs.Ids.OPEN_ACL_UNSAFE 返回打开 znode 的 acl 列表。
  • createMode - 节点的类型,即临时,顺序或两者。这是一个枚举

【示例】

1
2
3
4
5
6
private static final String path = "/mytest";

String text = "My first zookeeper app";
zk.create(path, text.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Stat stat = zk.exists(path, true);
Assertions.assertNotNull(stat);

删除节点

ZooKeeper 类提供了 delete 方法来删除指定的 znode。

delete 方法的签名如下:

1
delete(String path, int version)
  • path - Znode 路径。
  • version - znode 的当前版本。

让我们创建一个新的 Java 应用程序来了解 ZooKeeper API 的 delete 功能。创建文件 ZKDelete.java 。在 main 方法中,使用 ZooKeeperConnection 对象创建一个 ZooKeeper 对象 zk 。然后,使用指定的路径和版本号调用 zk 对象的 delete 方法。

删除 znode 的完整程序代码如下:

【示例】

1
2
3
zk.delete(path, zk.exists(path, true).getVersion());
Stat stat = zk.exists(path, true);
Assertions.assertNull(stat);

获取节点数据

ZooKeeper 类提供 getData 方法来获取附加在指定 znode 中的数据及其状态。 getData 方法的签名如下:

1
getData(String path, Watcher watcher, Stat stat)
  • path - Znode 路径。
  • watcher - 监听器类型的回调函数。当指定的 znode 的数据改变时,ZooKeeper 集合将通过监听器回调进行通知。这是一次性通知。
  • stat - 返回 znode 的元数据。

【示例】

1
2
3
4
byte[] data = zk.getData(path, false, null);
String text1 = new String(data);
Assertions.assertEquals(text, text1);
System.out.println(text1);

设置节点数据

ZooKeeper 类提供 setData 方法来修改指定 znode 中附加的数据。 setData 方法的签名如下:

1
setData(String path, byte[] data, int version)
  • path- Znode 路径
  • data - 要存储在指定 znode 路径中的数据。
  • version- znode 的当前版本。每当数据更改时,ZooKeeper 会更新 znode 的版本号。

【示例】

1
2
3
4
5
6
7
8
String text = "含子节点的节点";
zk.create(path, text.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create(path + "/1", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create(path + "/2", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
List<String> actualList = zk.getChildren(path, false);
for (String child : actualList) {
System.out.println(child);
}

获取子节点

ZooKeeper 类提供 getChildren 方法来获取特定 znode 的所有子节点。 getChildren 方法的签名如下:

1
getChildren(String path, Watcher watcher)
  • path - Znode 路径。
  • watcher - 监听器类型的回调函数。当指定的 znode 被删除或 znode 下的子节点被创建/删除时,ZooKeeper 集合将进行通知。这是一次性通知。

【示例】

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void getChildren() throws InterruptedException, KeeperException {
byte[] data = "My first zookeeper app".getBytes();
zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create(path + "/1", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create(path + "/2", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
List<String> actualList = zk.getChildren(path, false);
List<String> expectedList = CollectionUtil.newArrayList("1", "2");
Assertions.assertTrue(CollectionUtil.containsAll(expectedList, actualList));
for (String child : actualList) {
System.out.println(child);
}
}

Curator 客户端

Curator 客户端简介

Curator 客户端的使用方法是在 maven 项目的 pom.xml 中添加:

1
2
3
4
5
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>

创建连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.junit.jupiter.api.*;

import java.nio.charset.StandardCharsets;

public class CuratorTest {

/**
* Curator ZooKeeper 连接实例
*/
private static CuratorFramework client = null;
private static final String path = "/mytest";

/**
* 创建连接
*/
@BeforeAll
public static void init() {
// 重试策略
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build(); //指定命名空间后,client 的所有路径操作都会以 /workspace 开头
client.start();
}

/**
* 关闭连接
*/
@AfterAll
public static void destroy() {
if (client != null) {
client.close();
}
}

}

节点增删改查

判断节点是否存在

1
2
Stat stat = client.checkExists().forPath(path);
Assertions.assertNull(stat);

判读服务状态

1
2
CuratorFrameworkState state = client.getState();
Assertions.assertEquals(CuratorFrameworkState.STARTED, state);

创建节点

1
2
3
4
5
6
// 创建节点
String text = "Hello World";
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT) //节点类型
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path, text.getBytes(StandardCharsets.UTF_8));

删除节点

1
2
3
4
5
client.delete()
.guaranteed() // 如果删除失败,会继续执行,直到成功
.deletingChildrenIfNeeded() // 如果有子节点,则递归删除
.withVersion(stat.getVersion()) // 传入版本号,如果版本号错误则拒绝删除操作,并抛出 BadVersion 异常
.forPath(path);

获取节点数据

1
2
3
byte[] data = client.getData().forPath(path);
Assertions.assertEquals(text, new String(data));
System.out.println("修改前的节点数据:" + new String(data));

设置节点数据

1
2
3
4
String text2 = "try again";
client.setData()
.withVersion(client.checkExists().forPath(path).getVersion())
.forPath(path, text2.getBytes(StandardCharsets.UTF_8));

获取子节点

1
2
3
4
5
6
List<String> children = client.getChildren().forPath(path);
for (String s : children) {
System.out.println(s);
}
List<String> expectedList = CollectionUtil.newArrayList("1", "2");
Assertions.assertTrue(CollectionUtil.containsAll(expectedList, children));

监听事件

创建一次性监听

和 Zookeeper 原生监听一样,使用 usingWatcher 注册的监听是一次性的,即监听只会触发一次,触发后就销毁。

【示例】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 设置监听器
client.getData().usingWatcher(new CuratorWatcher() {
public void process(WatchedEvent event) {
System.out.println("节点 " + event.getPath() + " 发生了事件:" + event.getType());
}
}).forPath(path);

// 第一次修改
client.setData()
.withVersion(client.checkExists().forPath(path).getVersion())
.forPath(path, "第一次修改".getBytes(StandardCharsets.UTF_8));

// 第二次修改
client.setData()
.withVersion(client.checkExists().forPath(path).getVersion())
.forPath(path, "第二次修改".getBytes(StandardCharsets.UTF_8));

输出

1
节点 /mytest 发生了事件:NodeDataChanged

说明

修改两次数据,但是监听器只会监听第一次修改。

创建永久监听

Curator 还提供了创建永久监听的 API,其使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 设置监听器
CuratorCache curatorCache = CuratorCache.builder(client, path).build();
PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework framework, PathChildrenCacheEvent event) throws Exception {
System.out.println("节点 " + event.getData().getPath() + " 发生了事件:" + event.getType());
}
};
CuratorCacheListener listener = CuratorCacheListener.builder()
.forPathChildrenCache(path, client,
pathChildrenCacheListener)
.build();
curatorCache.listenable().addListener(listener);
curatorCache.start();

// 第一次修改
client.setData()
.withVersion(client.checkExists().forPath(path).getVersion())
.forPath(path, "第一次修改".getBytes(StandardCharsets.UTF_8));

// 第二次修改
client.setData()
.withVersion(client.checkExists().forPath(path).getVersion())
.forPath(path, "第二次修改".getBytes(StandardCharsets.UTF_8));

监听子节点

这里以监听 /hadoop 下所有子节点为例,实现方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 创建节点
String text = "Hello World";
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT) //节点类型
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path, text.getBytes(StandardCharsets.UTF_8));
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT) //节点类型
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path + "/1", text.getBytes(StandardCharsets.UTF_8));
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT) //节点类型
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path + "/2", text.getBytes(StandardCharsets.UTF_8));

// 设置监听器
// 第三个参数代表除了节点状态外,是否还缓存节点内容
PathChildrenCache childrenCache = new PathChildrenCache(client, path, true);
/*
* StartMode 代表初始化方式:
* NORMAL: 异步初始化
* BUILD_INITIAL_CACHE: 同步初始化
* POST_INITIALIZED_EVENT: 异步并通知,初始化之后会触发 INITIALIZED 事件
*/
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

List<ChildData> childDataList = childrenCache.getCurrentData();
System.out.println("当前数据节点的子节点列表:");
childDataList.forEach(x -> System.out.println(x.getPath()));

childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
switch (event.getType()) {
case INITIALIZED:
System.out.println("childrenCache 初始化完成");
break;
case CHILD_ADDED:
// 需要注意的是: 即使是之前已经存在的子节点,也会触发该监听,因为会把该子节点加入 childrenCache 缓存中
System.out.println("增加子节点:" + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("删除子节点:" + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("被修改的子节点的路径:" + event.getData().getPath());
System.out.println("修改后的数据:" + new String(event.getData().getData()));
break;
}
}
});

// 第一次修改
client.setData()
.forPath(path + "/1", "第一次修改".getBytes(StandardCharsets.UTF_8));

// 第二次修改
client.setData()
.forPath(path + "/1", "第二次修改".getBytes(StandardCharsets.UTF_8));

ACL 权限管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class AclOperation {

private CuratorFramework client = null;
private static final String zkServerPath = "192.168.0.226:2181";
private static final String nodePath = "/mytest/hdfs";

@Before
public void prepare() {
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.authorization("digest", "heibai:123456".getBytes()) //等价于 addauth 命令
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
client.start();
}

/**
* 新建节点并赋予权限
*/
@Test
public void createNodesWithAcl() throws Exception {
List<ACL> aclList = new ArrayList<>();
// 对密码进行加密
String digest1 = DigestAuthenticationProvider.generateDigest("heibai:123456");
String digest2 = DigestAuthenticationProvider.generateDigest("ying:123456");
Id user01 = new Id("digest", digest1);
Id user02 = new Id("digest", digest2);
// 指定所有权限
aclList.add(new ACL(Perms.ALL, user01));
// 如果想要指定权限的组合,中间需要使用 | ,这里的|代表的是位运算中的 按位或
aclList.add(new ACL(Perms.DELETE | Perms.CREATE, user02));

// 创建节点
byte[] data = "abc".getBytes();
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(aclList, true)
.forPath(nodePath, data);
}


/**
* 给已有节点设置权限,注意这会删除所有原来节点上已有的权限设置
*/
@Test
public void SetAcl() throws Exception {
String digest = DigestAuthenticationProvider.generateDigest("admin:admin");
Id user = new Id("digest", digest);
client.setACL()
.withACL(Collections.singletonList(new ACL(Perms.READ | Perms.DELETE, user)))
.forPath(nodePath);
}

/**
* 获取权限
*/
@Test
public void getAcl() throws Exception {
List<ACL> aclList = client.getACL().forPath(nodePath);
ACL acl = aclList.get(0);
System.out.println(acl.getId().getId()
+ "是否有删读权限:" + (acl.getPerms() == (Perms.READ | Perms.DELETE)));
}

@After
public void destroy() {
if (client != null) {
client.close();
}
}
}

参考资料

ZooKeeper 命令

ZooKeeper 命令用于在 ZooKeeper 服务上执行操作。

启动服务和启动命令行

1
2
3
4
5
# 启动服务
bin/zkServer.sh start

# 启动命令行,不指定服务地址则默认连接到localhost:2181
bin/zkCli.sh -server hadoop001:2181

查看节点列表

ls 命令

ls 命令用于查看某个路径下目录列表。

【语法】

1
ls path

说明:

  • path:代表路径。

【示例】

1
2
[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, brokers, storm, zookeeper, admin, ...]

ls2 命令

ls2 命令用于查看某个路径下目录列表,它比 ls 命令列出更多的详细信息。

【语法】

1
ls2 path

说明:

  • path:代表路径。

【示例】

1
2
3
4
5
6
7
8
9
10
11
12
13
[zk: localhost:2181(CONNECTED) 1] ls2 /
[cluster, controller_epoch, brokers, storm, zookeeper, admin, ....]
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x130
cversion = 19
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 11

节点的增删改查

get 命令

get 命令用于获取节点数据和状态信息。

【语法】

1
get path [watch]

说明:

  • path:代表路径。
  • **[watch]**:对节点进行事件监听。

【示例】

1
2
3
4
5
6
7
8
9
10
11
12
13
[zk: localhost:2181(CONNECTED) 31] get /hadoop
123456 #节点数据
cZxid = 0x14b
ctime = Fri May 24 17:03:06 CST 2019
mZxid = 0x14b
mtime = Fri May 24 17:03:06 CST 2019
pZxid = 0x14b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0

说明:

节点各个属性如下表。其中一个重要的概念是 Zxid(ZooKeeper Transaction Id),ZooKeeper 节点的每一次更改都具有唯一的 Zxid,如果 Zxid1 小于 Zxid2,则 Zxid1 的更改发生在 Zxid2 更改之前。

状态属性 说明
cZxid 数据节点创建时的事务 ID
ctime 数据节点创建时的时间
mZxid 数据节点最后一次更新时的事务 ID
mtime 数据节点最后一次更新时的时间
pZxid 数据节点的子节点最后一次被修改时的事务 ID
cversion 子节点的更改次数
dataVersion 节点数据的更改次数
aclVersion 节点的 ACL 的更改次数
ephemeralOwner 如果节点是临时节点,则表示创建该节点的会话的 SessionID;如果节点是持久节点,则该属性值为 0
dataLength 数据内容的长度
numChildren 数据节点当前的子节点个数

stat 命令

stat 命令用于查看节点状态信息。它的返回值和 get 命令类似,但不会返回节点数据。

【语法】

1
stat path [watch]
  • path:代表路径。
  • **[watch]**:对节点进行事件监听。

【示例】

1
2
3
4
5
6
7
8
9
10
11
12
[zk: localhost:2181(CONNECTED) 32] stat /hadoop
cZxid = 0x14b
ctime = Fri May 24 17:03:06 CST 2019
mZxid = 0x14b
mtime = Fri May 24 17:03:06 CST 2019
pZxid = 0x14b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0

create 命令

create 命令用于创建节点并赋值。

【语法】

1
create [-s] [-e] path data acl

说明:

  • **[-s][-e]**:-s 和 -e 都是可选的,-s 代表顺序节点,-e 代表临时节点,注意其中 -s 和 -e 可以同时使用的,并且临时节点不能再创建子节点。
    • 默认情况下,所有 znode 都是持久的。
    • 顺序节点保证 znode 路径将是唯一的。
    • 临时节点会在会话过期或客户端断开连接时被自动删除。
  • path:指定要创建节点的路径,比如 /hadoop
  • data:要在此节点存储的数据。
  • acl:访问权限相关,默认是 world,相当于全世界都能访问。

【示例】创建持久节点

1
2
[zk: localhost:2181(CONNECTED) 4] create /hadoop 123456
Created /hadoop

【示例】创建有序节点,此时创建的节点名为指定节点名 + 自增序号:

1
2
3
4
5
6
[zk: localhost:2181(CONNECTED) 23] create -s /a  "aaa"
Created /a0000000022
[zk: localhost:2181(CONNECTED) 24] create -s /b "bbb"
Created /b0000000023
[zk: localhost:2181(CONNECTED) 25] create -s /c "ccc"
Created /c0000000024

【示例】创建临时节点:

1
2
[zk: localhost:2181(CONNECTED) 26] create -e /tmp  "tmp"
Created /tmp

set 命令

set 命令用于修改节点存储的数据。

【语法】

1
set path data [version]

说明:

  • path:节点路径。
  • data:需要存储的数据。
  • **[version]**:可选项,版本号(可用作乐观锁)。

【示例】

1
2
3
4
5
6
7
8
9
10
11
12
[zk: localhost:2181(CONNECTED) 33] set /hadoop 345
cZxid = 0x14b
ctime = Fri May 24 17:03:06 CST 2019
mZxid = 0x14c
mtime = Fri May 24 17:13:05 CST 2019
pZxid = 0x14b
cversion = 0
dataVersion = 1 # 注意更改后此时版本号为 1,默认创建时为 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0

也可以基于版本号进行更改,此时类似于乐观锁机制,当你传入的数据版本号 (dataVersion) 和当前节点的数据版本号不符合时,zookeeper 会拒绝本次修改:

1
2
[zk: localhost:2181(CONNECTED) 34] set /hadoop 678 0
version No is not valid : /hadoop #无效的版本号

delete 命令

delete 命令用于删除某节点。

【语法】

1
delete path [version]

说明:

  • path:节点路径。
  • **[version]**:可选项,版本号(同 set 命令)。和更新节点数据一样,也可以传入版本号,当你传入的数据版本号 (dataVersion) 和当前节点的数据版本号不符合时,zookeeper 不会执行删除操作。

【示例】

1
2
3
4
[zk: localhost:2181(CONNECTED) 36] delete /hadoop 0
version No is not valid : /hadoop #无效的版本号
[zk: localhost:2181(CONNECTED) 37] delete /hadoop 1
[zk: localhost:2181(CONNECTED) 38]

delete 命令不能删除带有子节点的节点。如果想要删除节点及其子节点,可以使用 deleteall path

监听器

针对每个节点的操作,都会有一个监听者(watcher)。

  • 当监听的某个对象(znode)发生了变化,则触发监听事件。
  • zookeeper 中的监听事件是一次性的,触发后立即销毁。
  • 父节点,子节点的增删改都能够触发其监听者(watcher)
  • 针对不同类型的操作,触发的 watcher 事件也不同:
    • 父节点 Watcher 事件
      • 创建父节点触发:NodeCreated
      • 修改节点数据触发:NodeDatachanged
      • 删除节点数据触发:NodeDeleted
    • 子节点 Watcher 事件
      • 创建子节点触发:NodeChildrenChanged
      • 删除子节点触发:NodeChildrenChanged
      • 修改子节点不触发事件

get path

使用 get path -w 注册的监听器能够在节点内容发生改变的时候,向客户端发出通知。需要注意的是 zookeeper 的触发器是一次性的 (One-time trigger),即触发一次后就会立即失效。

1
2
3
4
[zk: localhost:2181(CONNECTED) 4] get /hadoop -w
[zk: localhost:2181(CONNECTED) 5] set /hadoop 45678
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/hadoop #节点值改变

get path [watch] 在当前版本已废弃

stat path

使用 stat path -w 注册的监听器能够在节点状态发生改变的时候,向客户端发出通知。

1
2
3
4
[zk: localhost:2181(CONNECTED) 7] stat path -w
[zk: localhost:2181(CONNECTED) 8] set /hadoop 112233
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/hadoop #节点值改变

stat path [watch] 在当前版本已废弃

ls\ls2 path

使用 ls path -wls2 path -w 注册的监听器能够监听该节点下所有子节点的增加和删除操作。

1
2
3
4
5
[zk: localhost:2181(CONNECTED) 9] ls /hadoop -w
[]
[zk: localhost:2181(CONNECTED) 10] create /hadoop/yarn "aaa"
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/hadoop

ls path [watch] 和 ls2 path [watch] 在当前版本已废弃

辅助命令

使用 help 可以查看所有命令帮助信息。

使用 history 可以查看最近 10 条历史记录。

zookeeper 四字命令

命令 功能描述
conf 打印服务配置的详细信息。
cons 列出连接到此服务器的所有客户端的完整连接/会话详细信息。包括接收/发送的数据包数量,会话 ID,操作延迟,上次执行的操作等信息。
dump 列出未完成的会话和临时节点。这只适用于 Leader 节点。
envi 打印服务环境的详细信息。
ruok 测试服务是否处于正确状态。如果正确则返回“imok”,否则不做任何相应。
stat 列出服务器和连接客户端的简要详细信息。
wchs 列出所有 watch 的简单信息。
wchc 按会话列出服务器 watch 的详细信息。
wchp 按路径列出服务器 watch 的详细信息。

更多四字命令可以参阅官方文档:https://zookeeper.apache.org/doc/current/zookeeperAdmin.html

使用前需要使用 yum install nc 安装 nc 命令,使用示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[root@hadoop001 bin]# echo stat | nc localhost 2181
Zookeeper version: 3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03,
built on 06/29/2018 04:05 GMT
Clients:
/0:0:0:0:0:0:0:1:50584[1](queued=0,recved=371,sent=371)
/0:0:0:0:0:0:0:1:50656[0](queued=0,recved=1,sent=0)
Latency min/avg/max: 0/0/19
Received: 372
Sent: 371
Connections: 2
Outstanding: 0
Zxid: 0x150
Mode: standalone
Node count: 167

参考资料

ZooKeeper ACL

为了避免存储在 Zookeeper 上的数据被其他程序或者人为误修改,Zookeeper 提供了 ACL(Access Control Lists) 进行权限控制。

ACL 权限可以针对节点设置相关读写等权限,保障数据安全性。

ZooKeeper ACL 提供了以下几种命令行:

  • getAcl 命令:获取某个节点的 acl 权限信息。
  • setAcl 命令:设置某个节点的 acl 权限信息。
  • addauth 命令:输入认证授权信息,注册时输入明文密码,加密形式保存。

ACL 组成

Zookeeper 的 acl 通过 [scheme:id:permissions] 来构成权限列表。

  • scheme:代表采用的某种权限机制,包括 world、auth、digest、ip、super 几种。
    • world:默认模式,所有客户端都拥有指定的权限。world 下只有一个 id 选项,就是 anyone,通常组合写法为 world:anyone:[permissons]
    • auth:只有经过认证的用户才拥有指定的权限。通常组合写法为 auth:user:password:[permissons],使用这种模式时,你需要先进行登录,之后采用 auth 模式设置权限时,userpassword 都将使用登录的用户名和密码;
    • digest:只有经过认证的用户才拥有指定的权限。通常组合写法为 auth:user:BASE64(SHA1(password)):[permissons],这种形式下的密码必须通过 SHA1 和 BASE64 进行双重加密;
    • ip:限制只有特定 IP 的客户端才拥有指定的权限。通常组成写法为 ip:182.168.0.168:[permissions]
    • super:代表超级管理员,拥有所有的权限,需要修改 Zookeeper 启动脚本进行配置。
  • id:代表允许访问的用户。
  • permissions:权限组合字符串,由 cdrwa 组成,其中每个字母代表支持不同权限。可选项如下:
    • CREATE:允许创建子节点;
    • READ:允许从节点获取数据并列出其子节点;
    • WRITE:允许为节点设置数据;
    • DELETE:允许删除子节点;
    • ADMIN:允许为节点设置权限。

设置与查看权限

想要给某个节点设置权限 (ACL),有以下两个可选的命令:

1
2
3
4
5
# 1.给已有节点赋予权限
setAcl path acl

# 2.在创建节点时候指定权限
create [-s] [-e] path data acl

查看指定节点的权限命令如下:

1
getAcl path

添加认证信息

可以使用如下所示的命令为当前 Session 添加用户认证信息,等价于登录操作。

1
2
3
4
5
# 格式
addauth scheme auth

#示例:添加用户名为test,密码为root的用户认证信息
addauth digest test:root

权限设置示例

world 模式

world 是一种默认的模式,即创建时如果不指定权限,则默认的权限就是 world。

1
2
3
4
5
6
7
8
9
[zk: localhost:2181(CONNECTED) 32] create /mytest abc
Created /mytest
[zk: localhost:2181(CONNECTED) 4] getAcl /mytest
'world,'anyone # 默认的权限
: cdrwa
[zk: localhost:2181(CONNECTED) 34] setAcl /mytest world:anyone:cwda # 修改节点,不允许所有客户端读
....
[zk: localhost:2181(CONNECTED) 6] get /mytest
org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /mytest # 无权访问

auth 模式

1
2
3
4
5
6
7
8
9
10
11
[zk: localhost:2181(CONNECTED) 36] addauth digest test:root # 登录
[zk: localhost:2181(CONNECTED) 37] setAcl /mytest auth::cdrwa # 设置权限
[zk: localhost:2181(CONNECTED) 38] getAcl /mytest # 查看权限信息
'digest,'heibai:sCxtVJ1gPG8UW/jzFHR0A1ZKY5s= # 用户名和密码 (密码经过加密处理),注意返回的权限类型是 digest
: cdrwa

# 用户名和密码都是使用登录的用户名和密码,即使你在创建权限时候进行指定也是无效的
[zk: localhost:2181(CONNECTED) 39] setAcl /mytest auth:root:root:cdrwa #指定用户名和密码为 root
[zk: localhost:2181(CONNECTED) 40] getAcl /mytest
'digest,'heibai:sCxtVJ1gPG8UW/jzFHR0A1ZKY5s= #无效,使用的用户名和密码依然还是 test
: cdrwa

digest 模式

1
2
3
4
[zk:44] create /spark "spark" digest:heibai:sCxtVJ1gPG8UW/jzFHR0A1ZKY5s=:cdrwa  #指定用户名和加密后的密码
[zk:45] getAcl /spark #获取权限
'digest,'heibai:sCxtVJ1gPG8UW/jzFHR0A1ZKY5s= # 返回的权限类型是 digest
: cdrwa

到这里你可以发现使用 auth 模式设置的权限和使用 digest 模式设置的权限,在最终结果上,得到的权限模式都是 digest。某种程度上,你可以把 auth 模式理解成是 digest 模式的一种简便实现。因为在 digest 模式下,每次设置都需要书写用户名和加密后的密码,这是比较繁琐的,采用 auth 模式就可以避免这种麻烦。

ip 模式

限定只有特定的 ip 才能访问。

1
2
3
[zk: localhost:2181(CONNECTED) 46] create /hive "hive" ip:192.168.0.108:cdrwa
[zk: localhost:2181(CONNECTED) 47] get /hive
Authentication is not valid : /hive # 当前主机已经不能访问

这里可以看到当前主机已经不能访问,想要能够再次访问,可以使用对应 IP 的客户端,或使用下面介绍的 super 模式。

super 模式

需要修改启动脚本 zkServer.sh,并在指定位置添加超级管理员账户和密码信息:

1
"-Dzookeeper.DigestAuthenticationProvider.superDigest=heibai:sCxtVJ1gPG8UW/jzFHR0A1ZKY5s="

修改完成后需要使用 zkServer.sh restart 重启服务,此时再次访问限制 IP 的节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[zk: localhost:2181(CONNECTED) 0] get /hive  #访问受限
Authentication is not valid : /hive
[zk: localhost:2181(CONNECTED) 1] addauth digest heibai:heibai # 登录 (添加认证信息)
[zk: localhost:2181(CONNECTED) 2] get /hive #成功访问
hive
cZxid = 0x158
ctime = Sat May 25 09:11:29 CST 2019
mZxid = 0x158
mtime = Sat May 25 09:11:29 CST 2019
pZxid = 0x158
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0

参考资料

Memcached 快速入门

Memcached 简介

Memcached 是一个自由开源的,高性能,分布式内存对象缓存系统。

Memcached 是一种基于内存的 key-value 存储,用来存储小块的任意数据(字符串、对象)。这些数据可以是数据库调用、API 调用或者是页面渲染的结果。

Memcached 简洁而强大。它的简洁设计便于快速开发,减轻开发难度,解决了大数据量缓存的很多问题。它的 API 兼容大部分流行的开发语言。本质上,它是一个简洁的 key-value 存储系统。

Memcached 特性

memcached 作为高速运行的分布式缓存服务器,具有以下的特点。

  • 协议简单
  • 基于 libevent 的事件处理
  • 内置内存存储方式
  • memcached 不互相通信的分布式

Memcached 命令

可以通过 telnet 命令并指定主机 ip 和端口来连接 Memcached 服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
telnet 127.0.0.1 11211

Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
set foo 0 0 3 保存命令
bar 数据
STORED 结果
get foo 取得命令
VALUE foo 0 3 数据
bar 数据
END 结束行
quit 退出

Java 连接 Memcached

使用 Java 程序连接 Memcached,需要在你的 classpath 中添加 Memcached jar 包。

本站 jar 包下载地址:spymemcached-2.10.3.jar

Google Code jar 包下载地址:spymemcached-2.10.3.jar(需要科学上网)。

以下程序假定 Memcached 服务的主机为 127.0.0.1,端口为 11211。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import net.spy.memcached.MemcachedClient;
import java.net.*;


public class MemcachedJava {
public static void main(String[] args) {
try{
// 本地连接 Memcached 服务
MemcachedClient mcc = new MemcachedClient(new InetSocketAddress("127.0.0.1", 11211));
System.out.println("Connection to server sucessful.");

// 关闭连接
mcc.shutdown();

}catch(Exception ex){
System.out.println( ex.getMessage() );
}
}
}

参考资料

Jetty 快速入门

Jetty 简介

jetty 是什么?

jetty 是轻量级的 web 服务器和 servlet 引擎。

它的最大特点是:可以很方便的作为嵌入式服务器

它是 eclipse 的一个开源项目。不用怀疑,就是你常用的那个 eclipse。

它是使用 Java 开发的,所以天然对 Java 支持良好。

官方网址

github 源码地址

什么是嵌入式服务器?

以 jetty 来说明,就是只要引入 jetty 的 jar 包,可以通过直接调用其 API 的方式来启动 web 服务。

用过 Tomcat、Resin 等服务器的朋友想必不会陌生那一套安装、配置、部署的流程吧,还是挺繁琐的。使用 jetty,就不需要这些过程了。

jetty 非常适用于项目的开发、测试,因为非常快捷。如果想用于生产环境,则需要谨慎考虑,它不一定能像成熟的 Tomcat、Resin 等服务器一样支持企业级 Java EE 的需要。

Jetty 的使用

我觉得嵌入式启动方式的一个好处在于:可以直接运行项目,无需每次部署都得再配置服务器。

jetty 的嵌入式启动使用有两种方式:

API 方式

maven 插件方式

API 方式

添加 maven 依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
<version>9.3.2.v20150730</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-annotations</artifactId>
<version>9.3.2.v20150730</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>apache-jsp</artifactId>
<version>9.3.2.v20150730</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>apache-jstl</artifactId>
<version>9.3.2.v20150730</version>
<scope>test</scope>
</dependency>

官方的启动代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class SplitFileServer
{
public static void main( String[] args ) throws Exception
{
// 创建Server对象,并绑定端口
Server server = new Server();
ServerConnector connector = new ServerConnector(server);
connector.setPort(8090);
server.setConnectors(new Connector[] { connector });

// 创建上下文句柄,绑定上下文路径。这样启动后的url就会是:http://host:port/context
ResourceHandler rh0 = new ResourceHandler();
ContextHandler context0 = new ContextHandler();
context0.setContextPath("/");

// 绑定测试资源目录(在本例的配置目录dir0的路径是src/test/resources/dir0)
File dir0 = MavenTestingUtils.getTestResourceDir("dir0");
context0.setBaseResource(Resource.newResource(dir0));
context0.setHandler(rh0);

// 和上面的例子一样
ResourceHandler rh1 = new ResourceHandler();
ContextHandler context1 = new ContextHandler();
context1.setContextPath("/");
File dir1 = MavenTestingUtils.getTestResourceDir("dir1");
context1.setBaseResource(Resource.newResource(dir1));
context1.setHandler(rh1);

// 绑定两个资源句柄
ContextHandlerCollection contexts = new ContextHandlerCollection();
contexts.setHandlers(new Handler[] { context0, context1 });
server.setHandler(contexts);

// 启动
server.start();

// 打印dump时的信息
System.out.println(server.dump());

// join当前线程
server.join();
}
}

直接运行 Main 方法,就可以启动 web 服务。

注:以上代码在 eclipse 中运行没有问题,如果想在 Intellij 中运行还需要为它指定配置文件。

如果想了解在 Eclipse 和 Intellij 都能运行的通用方法可以参考我的 github 代码示例。

我的实现也是参考 springside 的方式。

代码行数有点多,不在这里贴代码了。

完整参考代码

Maven 插件方式

如果你熟悉 maven,那么实在太简单了

注: Maven 版本必须在 3.3 及以上版本。

(1) 添加 maven 插件

1
2
3
4
5
<plugin>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId>
<version>9.3.12.v20160915</version>
</plugin>

(2) 执行 maven 命令:

1
mvn jetty:run

讲真,就是这么简单。jetty 默认会为你创建一个 web 服务,地址为 127.0.0.1:8080。

当然,你也可以在插件中配置你的 webapp 环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<plugin>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId>
<version>9.3.12.v20160915</version>

<configuration>
<webAppSourceDirectory>${project.basedir}/src/staticfiles</webAppSourceDirectory>

<!-- 配置webapp -->
<webApp>
<contextPath>/</contextPath>
<descriptor>${project.basedir}/src/over/here/web.xml</descriptor>
<jettyEnvXml>${project.basedir}/src/over/here/jetty-env.xml</jettyEnvXml>
</webApp>

<!-- 配置classes -->
<classesDirectory>${project.basedir}/somewhere/else</classesDirectory>
<scanClassesPattern>
<excludes>
<exclude>**/Foo.class</exclude>
</excludes>
</scanClassesPattern>
<scanTargets>
<scanTarget>src/mydir</scanTarget>
<scanTarget>src/myfile.txt</scanTarget>
</scanTargets>

<!-- 扫描target目录下的资源文件 -->
<scanTargetPatterns>
<scanTargetPattern>
<directory>src/other-resources</directory>
<includes>
<include>**/*.xml</include>
<include>**/*.properties</include>
</includes>
<excludes>
<exclude>**/myspecial.xml</exclude>
<exclude>**/myspecial.properties</exclude>
</excludes>
</scanTargetPattern>
</scanTargetPatterns>
</configuration>
</plugin>

官方给的 jetty-env.xml 范例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Mort Bay Consulting//DTD Configure//EN" "http://jetty.mortbay.org/configure.dtd">

<Configure class="org.eclipse.jetty.webapp.WebAppContext">

<!-- Add an EnvEntry only valid for this webapp -->
<New id="gargle" class="org.eclipse.jetty.plus.jndi.EnvEntry">
<Arg>gargle</Arg>
<Arg type="java.lang.Double">100</Arg>
<Arg type="boolean">true</Arg>
</New>

<!-- Add an override for a global EnvEntry -->
<New id="wiggle" class="org.eclipse.jetty.plus.jndi.EnvEntry">
<Arg>wiggle</Arg>
<Arg type="java.lang.Double">55.0</Arg>
<Arg type="boolean">true</Arg>
</New>

<!-- an XADataSource -->
<New id="mydatasource99" class="org.eclipse.jetty.plus.jndi.Resource">
<Arg>jdbc/mydatasource99</Arg>
<Arg>
<New class="com.atomikos.jdbc.SimpleDataSourceBean">
<Set name="xaDataSourceClassName">org.apache.derby.jdbc.EmbeddedXADataSource</Set>
<Set name="xaDataSourceProperties">databaseName=testdb99;createDatabase=create</Set>
<Set name="UniqueResourceName">mydatasource99</Set>
</New>
</Arg>
</New>

</Configure>

Jetty 的架构

Jetty 架构简介

img

Jetty Server 就是由多个 Connector(连接器)、多个 Handler(处理器),以及一个线程池组成。

跟 Tomcat 一样,Jetty 也有 HTTP 服务器和 Servlet 容器的功能,因此 Jetty 中的 Connector 组件和 Handler 组件分别来实现这两个功能,而这两个组件工作时所需要的线程资源都直接从一个全局线程池 ThreadPool 中获取。

Jetty Server 可以有多个 Connector 在不同的端口上监听客户请求,而对于请求处理的 Handler 组件,也可以根据具体场景使用不同的 Handler。这样的设计提高了 Jetty 的灵活性,需要支持 Servlet,则可以使用 ServletHandler;需要支持 Session,则再增加一个 SessionHandler。也就是说我们可以不使用 Servlet 或者 Session,只要不配置这个 Handler 就行了。

为了启动和协调上面的核心组件工作,Jetty 提供了一个 Server 类来做这个事情,它负责创建并初始化 Connector、Handler、ThreadPool 组件,然后调用 start 方法启动它们。

Jetty 和 Tomcat 架构区别

对比一下 Tomcat 的整体架构图,你会发现 Tomcat 在整体上跟 Jetty 很相似,它们的第一个区别是 Jetty 中没有 Service 的概念,Tomcat 中的 Service 包装了多个连接器和一个容器组件,一个 Tomcat 实例可以配置多个 Service,不同的 Service 通过不同的连接器监听不同的端口;而 Jetty 中 Connector 是被所有 Handler 共享的。

第二个区别是,在 Tomcat 中每个连接器都有自己的线程池,而在 Jetty 中所有的 Connector 共享一个全局的线程池。

Connector 组件

跟 Tomcat 一样,Connector 的主要功能是对 I/O 模型和应用层协议的封装。I/O 模型方面,最新的 Jetty 9 版本只支持 NIO,因此 Jetty 的 Connector 设计有明显的 Java NIO 通信模型的痕迹。至于应用层协议方面,跟 Tomcat 的 Processor 一样,Jetty 抽象出了 Connection 组件来封装应用层协议的差异。

服务端在 NIO 通信上主要完成了三件事情:监听连接、I/O 事件查询以及数据读写。因此 Jetty 设计了Acceptor、SelectorManager 和 Connection 来分别做这三件事情

Acceptor

Acceptor 用于接受请求。跟 Tomcat 一样,Jetty 也有独立的 Acceptor 线程组用于处理连接请求。在 Connector 的实现类 ServerConnector 中,有一个 _acceptors 的数组,在 Connector 启动的时候, 会根据 _acceptors 数组的长度创建对应数量的 Acceptor,而 Acceptor 的个数可以配置。

1
2
3
4
5
for (int i = 0; i < _acceptors.length; i++)
{
Acceptor a = new Acceptor(i);
getExecutor().execute(a);
}

AcceptorServerConnector 中的一个内部类,同时也是一个 RunnableAcceptor 线程是通过 getExecutor() 得到的线程池来执行的,前面提到这是一个全局的线程池。

Acceptor 通过阻塞的方式来接受连接,这一点跟 Tomcat 也是一样的。

1
2
3
4
5
6
7
8
9
10
11
public void accept(int acceptorID) throws IOException
{
ServerSocketChannel serverChannel = _acceptChannel;
if (serverChannel != null && serverChannel.isOpen())
{
// 这里是阻塞的
SocketChannel channel = serverChannel.accept();
// 执行到这里时说明有请求进来了
accepted(channel);
}
}

接受连接成功后会调用 accepted() 函数,accepted() 函数中会将 SocketChannel 设置为非阻塞模式,然后交给 Selector 去处理,因此这也就到了 Selector 的地界了。

1
2
3
4
5
6
7
8
private void accepted(SocketChannel channel) throws IOException
{
channel.configureBlocking(false);
Socket socket = channel.socket();
configure(socket);
// _manager 是 SelectorManager 实例,里面管理了所有的 Selector 实例
_manager.accept(channel);
}

SelectorManager

Jetty 的 SelectorSelectorManager 类管理,而被管理的 Selector 叫作 ManagedSelectorSelectorManager 内部有一个 ManagedSelector 数组,真正干活的是 ManagedSelector。咱们接着上面分析,看看在 SelectorManageraccept 方法里做了什么。

1
2
3
4
5
6
7
public void accept(SelectableChannel channel, Object attachment)
{
// 选择一个 ManagedSelector 来处理 Channel
final ManagedSelector selector = chooseSelector();
// 提交一个任务 Accept 给 ManagedSelector
selector.submit(selector.new Accept(channel, attachment));
}

SelectorManager 从本身的 Selector 数组中选择一个 Selector 来处理这个 Channel,并创建一个任务 Accept 交给 ManagedSelector,ManagedSelector 在处理这个任务主要做了两步:

第一步,调用 Selector 的 register 方法把 Channel 注册到 Selector 上,拿到一个 SelectionKey。

1
_key = _channel.register(selector, SelectionKey.OP_ACCEPT, this);

第二步,创建一个 EndPoint 和 Connection,并跟这个 SelectionKey(Channel)绑在一起:

1
2
3
4
5
6
7
8
9
10
11
12
13
private void createEndPoint(SelectableChannel channel, SelectionKey selectionKey) throws IOException
{
//1. 创建 Endpoint
EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey);

//2. 创建 Connection
Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment());

//3. 把 Endpoint、Connection 和 SelectionKey 绑在一起
endPoint.setConnection(connection);
selectionKey.attach(endPoint);

}

这里需要你特别注意的是,ManagedSelector 并没有直接调用 EndPoint 的方法去处理数据,而是通过调用 EndPoint 的方法返回一个 Runnable,然后把这个 Runnable 扔给线程池执行,所以你能猜到,这个 Runnable 才会去真正读数据和处理请求。

Connection

这个 Runnable 是 EndPoint 的一个内部类,它会调用 Connection 的回调方法来处理请求。Jetty 的 Connection 组件类比就是 Tomcat 的 Processor,负责具体协议的解析,得到 Request 对象,并调用 Handler 容器进行处理。下面我简单介绍一下它的具体实现类 HttpConnection 对请求和响应的处理过程。

请求处理:HttpConnection 并不会主动向 EndPoint 读取数据,而是向在 EndPoint 中注册一堆回调方法:

1
getEndPoint().fillInterested(_readCallback);

这段代码就是告诉 EndPoint,数据到了你就调我这些回调方法 _readCallback 吧,有点异步 I/O 的感觉,也就是说 Jetty 在应用层面模拟了异步 I/O 模型。

而在回调方法 _readCallback 里,会调用 EndPoint 的接口去读数据,读完后让 HTTP 解析器去解析字节流,HTTP 解析器会将解析后的数据,包括请求行、请求头相关信息存到 Request 对象里。

响应处理:Connection 调用 Handler 进行业务处理,Handler 会通过 Response 对象来操作响应流,向流里面写入数据,HttpConnection 再通过 EndPoint 把数据写到 Channel,这样一次响应就完成了。

到此你应该了解了 Connector 的工作原理,下面我画张图再来回顾一下 Connector 的工作流程。

img

  1. Acceptor 监听连接请求,当有连接请求到达时就接受连接,一个连接对应一个 Channel,Acceptor 将 Channel 交给 ManagedSelector 来处理。

  2. ManagedSelector 把 Channel 注册到 Selector 上,并创建一个 EndPoint 和 Connection 跟这个 Channel 绑定,接着就不断地检测 I/O 事件。

  3. I/O 事件到了就调用 EndPoint 的方法拿到一个 Runnable,并扔给线程池执行。

  4. 线程池中调度某个线程执行 Runnable。

  5. Runnable 执行时,调用回调函数,这个回调函数是 Connection 注册到 EndPoint 中的。

  6. 回调函数内部实现,其实就是调用 EndPoint 的接口方法来读数据。

  7. Connection 解析读到的数据,生成请求对象并交给 Handler 组件去处理。

Handler 组件

Jetty 的 Handler 设计是它的一大特色,Jetty 本质就是一个 Handler 管理器,Jetty 本身就提供了一些默认 Handler 来实现 Servlet 容器的功能,你也可以定义自己的 Handler 来添加到 Jetty 中,这体现了“微内核 + 插件”的设计思想。

Handler 就是一个接口,它有一堆实现类,Jetty 的 Connector 组件调用这些接口来处理 Servlet 请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface Handler extends LifeCycle, Destroyable
{
// 处理请求的方法
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException;

// 每个 Handler 都关联一个 Server 组件,被 Server 管理
public void setServer(Server server);
public Server getServer();

// 销毁方法相关的资源
public void destroy();
}

方法说明:

  • Handlerhandle 方法跟 Tomcat 容器组件的 service 方法一样,它有 ServletRequestServeletResponse 两个参数。
  • 因为任何一个 Handler 都需要关联一个 Server 组件,Handler 需要被 Server 组件来管理。Handler 通过 setServergetServer 方法绑定 Server
  • Handler 会加载一些资源到内存,因此通过设置 destroy 方法来销毁。

Handler 继承关系

Handler 只是一个接口,完成具体功能的还是它的子类。那么 Handler 有哪些子类呢?它们的继承关系又是怎样的?这些子类是如何实现 Servlet 容器功能的呢?

img

在 AbstractHandler 之下有 AbstractHandlerContainer,为什么需要这个类呢?这其实是个过渡,为了实现链式调用,一个 Handler 内部必然要有其他 Handler 的引用,所以这个类的名字里才有 Container,意思就是这样的 Handler 里包含了其他 Handler 的引用。

HandlerWrapper 和 HandlerCollection 都是 Handler,但是这些 Handler 里还包括其他 Handler 的引用。不同的是,HandlerWrapper 只包含一个其他 Handler 的引用,而 HandlerCollection 中有一个 Handler 数组的引用。

HandlerWrapper 有两个子类:Server 和 ScopedHandler。

  • Server 比较好理解,它本身是 Handler 模块的入口,必然要将请求传递给其他 Handler 来处理,为了触发其他 Handler 的调用,所以它是一个 HandlerWrapper。
  • ScopedHandler 也是一个比较重要的 Handler,实现了“具有上下文信息”的责任链调用。为什么我要强调“具有上下文信息”呢?那是因为 Servlet 规范规定 Servlet 在执行过程中是有上下文的。那么这些 Handler 在执行过程中如何访问这个上下文呢?这个上下文又存在什么地方呢?答案就是通过 ScopedHandler 来实现的。

HandlerCollection 其实维护了一个 Handler 数组。这是为了同时支持多个 Web 应用,如果每个 Web 应用有一个 Handler 入口,那么多个 Web 应用的 Handler 就成了一个数组,比如 Server 中就有一个 HandlerCollection,Server 会根据用户请求的 URL 从数组中选取相应的 Handler 来处理,就是选择特定的 Web 应用来处理请求。

Handler 可以分成三种类型:

  • 第一种是协调 Handler,这种 Handler 负责将请求路由到一组 Handler 中去,比如 HandlerCollection,它内部持有一个 Handler 数组,当请求到来时,它负责将请求转发到数组中的某一个 Handler。
  • 第二种是过滤器 Handler,这种 Handler 自己会处理请求,处理完了后再把请求转发到下一个 Handler,比如图上的 HandlerWrapper,它内部持有下一个 Handler 的引用。需要注意的是,所有继承了 HandlerWrapper 的 Handler 都具有了过滤器 Handler 的特征,比如 ContextHandler、SessionHandler 和 WebAppContext 等。
  • 第三种是内容 Handler,说白了就是这些 Handler 会真正调用 Servlet 来处理请求,生成响应的内容,比如 ServletHandler。如果浏览器请求的是一个静态资源,也有相应的 ResourceHandler 来处理这个请求,返回静态页面。

实现 Servlet 规范

ServletHandler、ContextHandler 以及 WebAppContext 等,它们实现了 Servlet 规范。

Servlet 规范中有 Context、Servlet、Filter、Listener 和 Session 等,Jetty 要支持 Servlet 规范,就需要有相应的 Handler 来分别实现这些功能。因此,Jetty 设计了 3 个组件:ContextHandler、ServletHandler 和 SessionHandler 来实现 Servle 规范中规定的功能,而WebAppContext 本身就是一个 ContextHandler,另外它还负责管理 ServletHandler 和 SessionHandler。

ContextHandler 会创建并初始化 Servlet 规范里的 ServletContext 对象,同时 ContextHandler 还包含了一组能够让你的 Web 应用运行起来的 Handler,可以这样理解,Context 本身也是一种 Handler,它里面包含了其他的 Handler,这些 Handler 能处理某个特定 URL 下的请求。比如,ContextHandler 包含了一个或者多个 ServletHandler。

ServletHandler 实现了 Servlet 规范中的 Servlet、Filter 和 Listener 的功能。ServletHandler 依赖 FilterHolder、ServletHolder、ServletMapping、FilterMapping 这四大组件。FilterHolder 和 ServletHolder 分别是 Filter 和 Servlet 的包装类,每一个 Servlet 与路径的映射会被封装成 ServletMapping,而 Filter 与拦截 URL 的映射会被封装成 FilterMapping。

SessionHandler 用来管理 Session。除此之外 WebAppContext 还有一些通用功能的 Handler,比如 SecurityHandler 和 GzipHandler,同样从名字可以知道这些 Handler 的功能分别是安全控制和压缩 / 解压缩。

WebAppContext 会将这些 Handler 构建成一个执行链,通过这个链会最终调用到我们的业务 Servlet。

Jetty 的线程策略

传统 Selector 编程模型

常规的 NIO 编程思路是,将 I/O 事件的侦测和请求的处理分别用不同的线程处理。具体过程是:

启动一个线程,在一个死循环里不断地调用 select 方法,检测 Channel 的 I/O 状态,一旦 I/O 事件达到,比如数据就绪,就把该 I/O 事件以及一些数据包装成一个 Runnable,将 Runnable 放到新线程中去处理。

在这个过程中按照职责划分,有两个线程在干活,一个是 I/O 事件检测线程,另一个是 I/O 事件处理线程。这样的好处是它们互不干扰和阻塞对方。

Jetty 的 Selector 编程模型

将 I/O 事件检测和业务处理这两种工作分开的思路也有缺点:当 Selector 检测读就绪事件时,数据已经被拷贝到内核中的缓存了,同时 CPU 的缓存中也有这些数据了,我们知道 CPU 本身的缓存比内存快多了,这时当应用程序去读取这些数据时,如果用另一个线程去读,很有可能这个读线程使用另一个 CPU 核,而不是之前那个检测数据就绪的 CPU 核,这样 CPU 缓存中的数据就用不上了,并且线程切换也需要开销。

因此 Jetty 的 Connector 做了一个大胆尝试,那就是把 I/O 事件的生产和消费放到同一个线程来处理,如果这两个任务由同一个线程来执行,如果执行过程中线程不阻塞,操作系统会用同一个 CPU 核来执行这两个任务,这样就能利用 CPU 缓存了。

ManagedSelector

ManagedSelector 的本质就是一个 Selector,负责 I/O 事件的检测和分发。为了方便使用,Jetty 在 Java 原生的 Selector 上做了一些扩展,就变成了 ManagedSelector,我们先来看看它有哪些成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{
// 原子变量,表明当前的 ManagedSelector 是否已经启动
private final AtomicBoolean _started = new AtomicBoolean(false);

// 表明是否阻塞在 select 调用上
private boolean _selecting = false;

// 管理器的引用,SelectorManager 管理若干 ManagedSelector 的生命周期
private final SelectorManager _selectorManager;

//ManagedSelector 不止一个,为它们每人分配一个 id
private final int _id;

// 关键的执行策略,生产者和消费者是否在同一个线程处理由它决定
private final ExecutionStrategy _strategy;

//Java 原生的 Selector
private Selector _selector;

//"Selector 更新任务 " 队列
private Deque<SelectorUpdate> _updates = new ArrayDeque<>();
private Deque<SelectorUpdate> _updateable = new ArrayDeque<>();

...
}

这些成员变量中其他的都好理解,就是“Selector 更新任务”队列_updates和执行策略_strategy可能不是很直观。

SelectorUpdate 接口

为什么需要一个“Selector 更新任务”队列呢,对于 Selector 的用户来说,我们对 Selector 的操作无非是将 Channel 注册到 Selector 或者告诉 Selector 我对什么 I/O 事件感兴趣,那么这些操作其实就是对 Selector 状态的更新,Jetty 把这些操作抽象成 SelectorUpdate 接口。

1
2
3
4
5
6
7
/**
* A selector update to be done when the selector has been woken.
*/
public interface SelectorUpdate
{
void update(Selector selector);
}

这意味着如果你不能直接操作 ManageSelector 中的 Selector,而是需要向 ManagedSelector 提交一个任务类,这个类需要实现 SelectorUpdate 接口 update 方法,在 update 方法里定义你想要对 ManagedSelector 做的操作。

比如 Connector 中 Endpoint 组件对读就绪事件感兴趣,它就向 ManagedSelector 提交了一个内部任务类 ManagedSelector.SelectorUpdate:

1
_selector.submit(_updateKeyAction);

这个_updateKeyAction就是一个 SelectorUpdate 实例,它的 update 方法实现如下:

1
2
3
4
5
6
7
8
9
private final ManagedSelector.SelectorUpdate _updateKeyAction = new ManagedSelector.SelectorUpdate()
{
@Override
public void update(Selector selector)
{
// 这里的 updateKey 其实就是调用了 SelectionKey.interestOps(OP_READ);
updateKey();
}
};

我们看到在 update 方法里,调用了 SelectionKey 类的 interestOps 方法,传入的参数是OP_READ,意思是现在我对这个 Channel 上的读就绪事件感兴趣了。

那谁来负责执行这些 update 方法呢,答案是 ManagedSelector 自己,它在一个死循环里拉取这些 SelectorUpdate 任务类逐个执行。

Selectable 接口

那 I/O 事件到达时,ManagedSelector 怎么知道应该调哪个函数来处理呢?其实也是通过一个任务类接口,这个接口就是 Selectable,它返回一个 Runnable,这个 Runnable 其实就是 I/O 事件就绪时相应的处理逻辑。

1
2
3
4
5
6
7
8
public interface Selectable
{
// 当某一个 Channel 的 I/O 事件就绪后,ManagedSelector 会调用的回调函数
Runnable onSelected();

// 当所有事件处理完了之后 ManagedSelector 会调的回调函数,我们先忽略。
void updateKey();
}

ManagedSelector 在检测到某个 Channel 上的 I/O 事件就绪时,也就是说这个 Channel 被选中了,ManagedSelector 调用这个 Channel 所绑定的附件类的 onSelected 方法来拿到一个 Runnable。

这句话有点绕,其实就是 ManagedSelector 的使用者,比如 Endpoint 组件在向 ManagedSelector 注册读就绪事件时,同时也要告诉 ManagedSelector 在事件就绪时执行什么任务,具体来说就是传入一个附件类,这个附件类需要实现 Selectable 接口。ManagedSelector 通过调用这个 onSelected 拿到一个 Runnable,然后把 Runnable 扔给线程池去执行。

那 Endpoint 的 onSelected 是如何实现的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public Runnable onSelected()
{
int readyOps = _key.readyOps();

boolean fillable = (readyOps & SelectionKey.OP_READ) != 0;
boolean flushable = (readyOps & SelectionKey.OP_WRITE) != 0;

// return task to complete the job
Runnable task= fillable
? (flushable
? _runCompleteWriteFillable
: _runFillable)
: (flushable
? _runCompleteWrite
: null);

return task;
}

上面的代码逻辑很简单,就是读事件到了就读,写事件到了就写。

ExecutionStrategy

铺垫了这么多,终于要上主菜了。前面我主要介绍了 ManagedSelector 的使用者如何跟 ManagedSelector 交互,也就是如何注册 Channel 以及 I/O 事件,提供什么样的处理类来处理 I/O 事件,接下来我们来看看 ManagedSelector 是如何统一管理和维护用户注册的 Channel 集合。再回到今天开始的讨论,ManagedSelector 将 I/O 事件的生产和消费看作是生产者消费者模式,为了充分利用 CPU 缓存,生产和消费尽量放到同一个线程处理,那这是如何实现的呢?Jetty 定义了 ExecutionStrategy 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ExecutionStrategy
{
// 只在 HTTP2 中用到,简单起见,我们先忽略这个方法。
public void dispatch();

// 实现具体执行策略,任务生产出来后可能由当前线程执行,也可能由新线程来执行
public void produce();

// 任务的生产委托给 Producer 内部接口,
public interface Producer
{
// 生产一个 Runnable(任务)
Runnable produce();
}
}

我们看到 ExecutionStrategy 接口比较简单,它将具体任务的生产委托内部接口 Producer,而在自己的 produce 方法里来实现具体执行逻辑,也就是生产出来的任务要么由当前线程执行,要么放到新线程中执行。Jetty 提供了一些具体策略实现类:ProduceConsume、ProduceExecuteConsume、ExecuteProduceConsume 和 EatWhatYouKill。它们的区别是:

  • ProduceConsume:任务生产者自己依次生产和执行任务,对应到 NIO 通信模型就是用一个线程来侦测和处理一个 ManagedSelector 上所有的 I/O 事件,后面的 I/O 事件要等待前面的 I/O 事件处理完,效率明显不高。通过图来理解,图中绿色表示生产一个任务,蓝色表示执行这个任务。

img

  • ProduceExecuteConsume:任务生产者开启新线程来运行任务,这是典型的 I/O 事件侦测和处理用不同的线程来处理,缺点是不能利用 CPU 缓存,并且线程切换成本高。同样我们通过一张图来理解,图中的棕色表示线程切换。

img

  • ExecuteProduceConsume:任务生产者自己运行任务,但是该策略可能会新建一个新线程以继续生产和执行任务。这种策略也被称为“吃掉你杀的猎物”,它来自狩猎伦理,认为一个人不应该杀死他不吃掉的东西,对应线程来说,不应该生成自己不打算运行的任务。它的优点是能利用 CPU 缓存,但是潜在的问题是如果处理 I/O 事件的业务代码执行时间过长,会导致线程大量阻塞和线程饥饿。

img

  • EatWhatYouKill:这是 Jetty 对 ExecuteProduceConsume 策略的改良,在线程池线程充足的情况下等同于 ExecuteProduceConsume;当系统比较忙线程不够时,切换成 ProduceExecuteConsume 策略。为什么要这么做呢,原因是 ExecuteProduceConsume 是在同一线程执行 I/O 事件的生产和消费,它使用的线程来自 Jetty 全局的线程池,这些线程有可能被业务代码阻塞,如果阻塞得多了,全局线程池中的线程自然就不够用了,最坏的情况是连 I/O 事件的侦测都没有线程可用了,会导致 Connector 拒绝浏览器请求。于是 Jetty 做了一个优化,在低线程情况下,就执行 ProduceExecuteConsume 策略,I/O 侦测用专门的线程处理,I/O 事件的处理扔给线程池处理,其实就是放到线程池的队列里慢慢处理。

分析了这几种线程策略,我们再来看看 Jetty 是如何实现 ExecutionStrategy 接口的。答案其实就是实现 produce 接口生产任务,一旦任务生产出来,ExecutionStrategy 会负责执行这个任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private class SelectorProducer implements ExecutionStrategy.Producer
{
private Set<SelectionKey> _keys = Collections.emptySet();
private Iterator<SelectionKey> _cursor = Collections.emptyIterator();

@Override
public Runnable produce()
{
while (true)
{
// 如何 Channel 集合中有 I/O 事件就绪,调用前面提到的 Selectable 接口获取 Runnable, 直接返回给 ExecutionStrategy 去处理
Runnable task = processSelected();
if (task != null)
return task;

// 如果没有 I/O 事件就绪,就干点杂活,看看有没有客户提交了更新 Selector 的任务,就是上面提到的 SelectorUpdate 任务类。
processUpdates();
updateKeys();

// 继续执行 select 方法,侦测 I/O 就绪事件
if (!select())
return null;
}
}
}

SelectorProducer 是 ManagedSelector 的内部类,SelectorProducer 实现了 ExecutionStrategy 中的 Producer 接口中的 produce 方法,需要向 ExecutionStrategy 返回一个 Runnable。在这个方法里 SelectorProducer 主要干了三件事情

  1. 如果 Channel 集合中有 I/O 事件就绪,调用前面提到的 Selectable 接口获取 Runnable,直接返回给 ExecutionStrategy 去处理。
  2. 如果没有 I/O 事件就绪,就干点杂活,看看有没有客户提交了更新 Selector 上事件注册的任务,也就是上面提到的 SelectorUpdate 任务类。
  3. 干完杂活继续执行 select 方法,侦测 I/O 就绪事件。

参考资料