Dunwu Blog

大道至简,知易行难

MQ 面试

消息队列(Message Queue,简称 MQ)技术是应用间交换信息的一种技术。

消息队列主要解决异步处理、应用间耦合,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

目前主流的 MQ 有:Kafka、RabbitMQ、RocketMQ、ActiveMQ,而部分数据库如 Redis、MySQL 以及 phxsql 也可实现消息队列的功能。

注意:_为了简便,下文中除了文章标题,一律使用 MQ 简称_。

MQ 简介

【基础】什么是 MQ?

:::details 要点

消息队列(Message Queue,简称 MQ)技术是应用间交换信息的一种技术。

消息队列主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

MQ 是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

MQ 的数据可驻留在内存或磁盘上,直到它们被应用程序读取。通过 MQ,应用程序可独立地执行,它们不需要知道彼此的位置,不需要等待接收程序接收此消息。在分布式计算环境中,为了集成分布式应用,开发者需要对异构网络环境下的分布式应用提供有效的通信手段。为了管理需要共享的信息,对应用提供公共的信息交换机制是重要的。

目前主流的 MQ 有:Kafka、RabbitMQ、RocketMQ、ActiveMQ。

:::

【基础】为什么需要 MQ?

:::details 要点

MQ 的常见应用场景有:

  • 异步处理
  • 系统解耦
  • 流量削峰
  • 系统间通信
  • 传输缓冲
  • 最终一致性

异步处理

MQ 可以将系统间的处理流程异步化,减少等待响应的时间,从而提高整体并发吞吐量。一般,MQ 异步处理应用于非核心流程,例如:短信/邮件通知、数据推送、上报数据到监控中心、日志中心等。

假设这样一个场景,用户向系统 A 发起请求,系统 A 处理计算只需要 10ms,然后通知系统 BCD 写库,系统 BCD 写库耗时分别为:100ms200ms300ms。最终总耗时为: 10ms+100ms+200ms+300ms=610ms。此外,加上请求和响应的网络传输时间,从用户角度看,可能要等待将近 1s 才能得到结果。

如果使用 MQ,系统 A 接到请求后,耗时 10ms 处理计算,然后向系统 BCD 连续发送消息,假设耗时 5ms。那么 这一过程的总耗时为 3ms + 5ms = 8ms,这相比于 610 ms,大大缩短了响应时间。至于系统 BCD 的写库操作,只要自行消费 MQ 后处理即可,用户无需关注。

系统解耦

通过 MQ,可以消除系统间的强耦合。它的好处在于:

  • 消息的消费者系统可以随意增加,无需修改生产者系统的代码。
  • 生产者系统、消费者系统彼此不会影响对方的流程。
    • 如果生产者系统宕机,消费者系统收不到消息,就不会有下一步的动作。
    • 如果消费者系统宕机,生产者系统让然可以正常发送消息,不影响流程。

不同系统如果要建立通信,传统的做法是:调用接口。

如果需要和新的系统建立通信或删除已建立的通信,都需要修改代码,这种方案显然耦合度很高。

如果使用 MQ,系统间的通信只需要通过发布/订阅(Pub/Sub)模型即可,彼此没有直接联系,也就不需要相互感知,从而达到 解耦

流量削峰

上下游系统 处理能力存在差距的时候,利用 MQ 做一个 “漏斗” 模型,进行 流控。把 MQ 当成可靠的 消息缓冲池,进行一定程度的 消息堆积;在下游有能力处理的时候,再发送消息。

MQ 的流量削峰常用于高并发场景(例如:秒杀、团抢等业务场景),它是缓解瞬时暴增流量的核心手段之一。

如果没有 MQ,两个系统之间通过 协商滑动窗口限流/降级/熔断 等复杂的方案也能实现 流控。但 系统复杂性 指数级增长,势必在上游或者下游做存储,并且要处理 定时拥塞 等一系列问题。而且每当有 处理能力有差距 的时候,都需要 单独 开发一套逻辑来维护这套逻辑。

假设某个系统读写数据库的稳定性能为每秒处理 1000 条数据。平常情况下,远远达不到这么大的处理量。假设,因为因为做活动,系统的瞬时请求量剧增,达到每秒 10000 个并发请求,数据库根本承受不了,可能直接就把数据库给整崩溃了,这样系统服务就不可用了。

如果使用 MQ,每秒写入 10000 条请求,但是系统 A 每秒只从 MQ 中消费 1000 条请求,然后写入数据库。这样,就不会超过数据库的承受能力,而是把请求积压在 MQ 中。只要高峰期一过,系统 A 就会很快把积压的消息给处理掉。

系统间通信

消息队列一般都内置了 高效的通信机制,因此也可以用于单纯的 消息通讯,比如实现 点对点消息队列 或者 聊天室 等。

生产者/消费者 模式,只需要关心消息是否 送达队列,至于谁希望订阅和需要消费,是 下游 的事情,无疑极大地减少了开发和联调的工作量。

传输缓冲

(1)MQ 常被用于做海量数据的传输缓冲。

例如,Kafka 常被用于做为各种日志数据、采集数据的数据中转。然后,Kafka 将数据转发给 Logstash、Elasticsearch 中,然后基于 Elasticsearch 来做日志中心,提供检索、聚合、分析日志的能力。开发者可以通过 Kibana 集成 Elasticsearch 数据进行可视化展示,或自行进行定制化开发。

img

(2)MQ 也可以被用于流式处理。

例如,Kafka 几乎已经是流计算的数据采集端的标准组件。而流计算通过实时数据处理能力,提供了更为快捷的聚合计算能力,被大量应用于链路监控、实时监控、实时数仓、实时大屏、风控、推荐等应用领域。

最终一致性

最终一致性 不是 消息队列 的必备特性,但确实可以依靠 消息队列 来做 最终一致性 的事情。

  • 先写消息再操作,确保操作完成后再修改消息状态。定时任务补偿机制 实现消息 可靠发送接收、业务操作的可靠执行,要注意 消息重复幂等设计
  • 所有不保证 100% 不丢消息 的消息队列,理论上无法实现 最终一致性

Kafka 一类的设计,在设计层面上就有 丢消息 的可能(比如 定时刷盘,如果掉电就会丢消息)。哪怕只丢千分之一的消息,业务也必须用其他的手段来保证结果正确。

:::

【基础】MQ 有哪些通信模型?

:::details 要点

MQ 通信模型大致有以下类型:

  • 点对点 - 点对点方式是最为传统和常见的通讯方式,它支持一对一、一对多、多对多、多对一等多种配置方式,支持树状、网状等多种拓扑结构。
  • 多点广播 - MQ 适用于不同类型的应用。其中重要的,也是正在发展中的是”多点广播”应用,即能够将消息发送到多个目标站点 (Destination List)。可以使用一条 MQ 指令将单一消息发送到多个目标站点,并确保为每一站点可靠地提供信息。MQ 不仅提供了多点广播的功能,而且还拥有智能消息分发功能,在将一条消息发送到同一系统上的多个用户时,MQ 将消息的一个复制版本和该系统上接收者的名单发送到目标 MQ 系统。目标 MQ 系统在本地复制这些消息,并将它们发送到名单上的队列,从而尽可能减少网络的传输量。
  • 发布/订阅 (Publish/Subscribe) - 发布/订阅模式使消息的分发可以突破目的队列地理位置的限制,使消息按照特定的主题甚至内容进行分发,用户或应用程序可以根据主题或内容接收到所需要的消息。发布/订阅模式使得发送者和接收者之间的耦合关系变得更为松散,发送者不必关心接收者的目的地址,而接收者也不必关心消息的发送地址,而只是根据消息的主题进行消息的收发。
  • 集群 (Cluster) - 为了简化点对点通讯模式中的系统配置,MQ 提供 Cluster(集群) 的解决方案。集群类似于一个域 (Domain),集群内部的队列管理器之间通讯时,不需要两两之间建立消息通道,而是采用集群 (Cluster) 通道与其它成员通讯,从而大大简化了系统配置。此外,集群中的队列管理器之间能够自动进行负载均衡,当某一队列管理器出现故障时,其它队列管理器可以接管它的工作,从而大大提高系统的高可靠性。

:::

【基础】获取 MQ 消息有哪些模式?

:::details 要点

消息引擎获取消息有两种模式:

  • push 模式 - MQ 推送数据给消费者
  • pull 模式 - 消费者主动向 MQ 请求数据

Kafka 消费者(Consumer)以 pull 方式从 Broker 拉取消息。相比于 push 方式,pull 方式灵活度和扩展性更好,因为消费的主动性由消费者自身控制。

push 模式的优缺点:

  • 缺点:由 broker 决定消息推送的速率,对于不同消费速率的 consumer 就不太好处理了。push 模式下,当 broker 推送的速率远大于 consumer 消费的速率时,consumer 恐怕就要崩溃了。

push 模式的优缺点:

  • 优点:consumer 可以根据自己的消费能力自主的决定消费策略
  • 缺点:如果 broker 没有可供消费的消息,将导致 consumer 不断在循环中轮询,直到新消息到达。为了避免这点,Kafka 有个参数可以让 consumer 阻塞直到新消息到达

:::

【中级】引入 MQ 带来哪些问题?

:::details 要点

任何技术都会有利有弊,MQ 给整体系统架构带来很多好处,但也会付出一定的代价。

MQ 主要引入了以下问题:

  • 系统可用性降低:引入了 MQ 后,通信需要基于 MQ 完成,如果 MQ 宕机,则服务不可用。因此,MQ 要保证是高可用的。
  • 系统复杂度提高:使用 MQ,需要关注一些新的问题:
    • 如何保证消息没有 重复消费
    • 如何处理 消息丢失 的问题?
    • 如何保证传递 消息的顺序性
    • 如何处理大量 消息积压 的问题?
  • 一致性问题:假设系统 A 处理完直接返回成功的结果给用户,用户认为请求成功。但如果此时,系统 BCD 中只要有任意一个写库失败,那么数据就不一致了。这种情况如何处理?

:::

重复消费

【中级】MQ 为什么会存在重复消费问题?

:::details 要点

重复消费问题通常不是 MQ 来处理,而是由开发来处理的。

以 Kafka 举例,Kafka 每个 Partition 都是一个有序的、不可变的记录序列,不断追加到结构化的提交日志中。Partition 中为每条记录分配一个连续的 id 号,称为偏移量(Offset),用于唯一标识 Partition 内的记录。

Kafka 的客户端和 Broker 都会保存 Offset。客户端消费消息后,每隔一段时间,就把已消费的 Offset 提交给 Kafka Broker,表示已消费。

在这个过程中,如果客户端应用消费消息后,因为宕机、重启等情况而没有提交已消费的 Offset 。当系统恢复后,会继续消费消息,由于 Offset 未提交,就会出现重复消费的问题。

:::

【中级】如何保证消息不被重复消费?

:::details 要点

应对重复消费问题,需要在业务层面,通过 幂等性设计 来解决。

幂等(idempotent、idempotence)是一个数学与计算机学概念,指的是:一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。

MQ 重复消费不可怕,可怕的是没有应对机制,可以借鉴的思路有:

  • 如果是写关系型数据库,可以先根据主键查询,判断数据是否已存在,存在则更新,不存在则插入;
  • 如果是写 Redis,由于 set 操作天然具有幂等性,所以什么都不用做;
  • 如果是根据消息做较复杂的逻辑处理,可以在消息中加入全局唯一 ID,例如:订单 ID 等。在客户端存储中(Mysql、Redis 等)保存已消费消息的 ID。一旦接受到新消息,先判断消息中的 ID 是否在已消费消息 ID 表中存在,存在则不再处理,不存在则处理。

在实际开发中,可以参考上面的例子,结合现实场景,设计合理的幂等性方案。

:::

消息丢失

【高级】如何保证消息不丢失?

:::details 要点

要保证消息不丢失,首先要弄清楚 MQ 消息在哪些环节可能出现丢失的情况,才能对症下药。

实际上,MQ 消息在以下场景都可能会出现丢失:

  • 生产方丢失数据
  • MQ Server 丢失数据
  • 消费方丢失数据

下面以 Kafka 为例,讲解在传输的不同场景下如何保证消息不丢失。

消费方丢失数据

唯一可能导致消费方丢失数据的情况是:消费方设置了自动提交 Offset。一旦设置了自动提交 Offset,接受到消息后就会自动提交 Offset 给 Kafka ,Kafka 就认为消息已被消费。如果此时,消费方尚未来得及处理消息就挂了,那么消息就丢了。

解决方法就是:消费方关闭自动提交 Offset,处理完消息后手动提交 Offset。但这种情况下可能会出现重复消费的情形,需要自行保证幂等性。

MQ Server 丢失数据

当 Kafka 某个 Broker 宕机,需要重新选举 Partition 的 Leader。若此时其他的 Follower 尚未同步 Leader 的数据,那么新选某个 Follower 为 Leader 后,就丢失了部分数据。

为此,一般要求至少设置 4 个参数:

  • 冗余 - 通过副本机制保证冗余。
    • 给 Topic 设置 replication.factor 参数 - 这个值必须大于 1,要求每个 Partition 必须有至少 2 个副本。
    • 在 Kafka 服务端设置 min.insync.replicas 参数 - 这个值必须大于 1,这是要求一个 Leader 需要和至少一个 Follower 保持通信,这样才能确保 Leader 挂了还有替补。
  • 强一致性 - 在 Producer 端设置 acks=all,这意味着:要求每条数据,必须是写入所有 replica 之后,才能认为写入成功了。保证强一致性需要付出一定的代价,通常只有业务场景真的需要保证万无一失才会这么设置。
  • 失败重试 - 在 Producer 端设置 retries=MAX(很大很大很大的一个值,无限次重试的意思),这意味着要求一旦写入失败,就无限重试,卡在这里了。

生产方丢失数据

如果按照上述的思路设置了 acks=all,生产方一定不会丢数据。

要求是,你的 Leader 接收到消息,所有的 Follower 都同步到了消息之后,才认为本生产消息成功了。如果未满足这个条件,生产者会自动不断的重试,重试无限次。

:::

消息的顺序性

【高级】如何保证消息的顺序性?

:::details 要点

要保证 MQ 的顺序性,势必要付出一定的代价,所以实施方案前,要先明确业务场景是不是有必要保证消息的顺序性。只有那些明确对消息处理顺序有要求的业务场景才值得去保证消息顺序性。

方案一

一个 Topic,一个 Partition,一个 Consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。

方案二

  • 写入数据到 Partition 时指定一个全局唯一的 ID,例如订单 ID。发送方保证相同 ID 的消息有序的发送到同一个 Partition。
  • 基于上一点,消费方从 Kafka Partition 中消费消息时,此刻一定是顺序的。但如果消费方式以并发方式消费消息,顺序就可能会被打乱。为此,还有做到以下几点:
    • 消费方维护 N 个缓存队列,具有相同 ID 的数据都写入同一个队列中;
    • 创建 N 个线程,每个线程只负责从指定的一个队列中取数据。

:::

消息积压

【高级】如何解决消息积压?

:::details 要点

假设一个 MQ 消费者可以一秒处理 1000 条消息,三个 MQ 消费者可以一秒处理 3000 条消息,那么一分钟的处理量是 18 万条。如果 MQ 中积压了几百万到上千万的数据,即使消费者恢复了,也需要大概很长的时间才能恢复过来。

对于产线环境来说,漫长的等待是不可接受的,所以面临这种窘境时,只能临时紧急扩容以应对了,具体操作步骤和思路如下:

  • 先修复 Consumer 的问题,确保其恢复消费速度,然后将现有 Consumer 都停掉。
  • 新建一个 Topic,Partition 是原来的 10 倍,临时建立好原先 10 倍的 Queue 数量。
  • 然后写一个临时的分发数据的 Consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 Queue。
  • 接着临时征用 10 倍的机器来部署 Consumer ,每一批 Consumer 消费一个临时 Queue 的数据。这种做法相当于是临时将 Queue 资源和 Consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
  • 等快速消费完积压数据之后,得恢复原先部署的架构重新用原先的 consumer 机器来消费消息。

:::

MQ 高可用

【高级】如何保证 MQ 的高可用?

:::details 要点

不同 MQ 实现高可用的原理各不相同。因为 Kafka 比较具有代表性,所以这里以 Kafka 为例。

Kafka 的核心概念

了解 Kafka,必须先了解 Kafka 的核心概念:

  • Broker - Kafka 集群包含一个或多个节点,这种节点被称为 Broker。

  • Topic - 每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(不同 Topic 的消息是物理隔离的;同一个 Topic 的消息保存在一个或多个 Broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)。对于每一个 Topic, Kafka 集群都会维持一个分区日志。

  • Partition - 了提高 Kafka 的吞吐率,每个 Topic 包含一个或多个 Partition,每个 Partition 在物理上对应一个文件夹,该文件夹下存储这个 Partition 的所有消息和索引文件。

    • Kafka 日志的分区(Partition)分布在 Kafka 集群的节点上。每个节点在处理数据和请求时,共享这些分区。每一个分区都会在已配置的节点上进行备份,确保容错性。

img

Kafka 的副本机制

Kafka 是如何实现高可用的呢?

Kafka 在 0.8 以前的版本中,如果一个 Broker 宕机了,其上面的 Partition 都不能用了,这自然不是高可用的。

为了实现高可用,Kafka 引入了复制功能,简单来说,就是副本机制( Replicate ):

每个 Partition 都有一个 Leader,零个或多个 Follower。Leader 和 Follower 都是 Broker,每个 Broker 都会成为某些分区的 Leader 和某些分区的 Follower,因此集群的负载是平衡的。

  • Leader 处理一切对 Partition (分区)的读写请求
  • 而 Follower 只需被动的同步 Leader 上的数据

同一个 Topic 的不同 Partition 会分布在多个 Broker 上,而且一个 Partition 还会在其他的 Broker 上面进行备份,Producer 在发布消息到某个 Partition 时,先找到该 Partition 的 Leader,然后向这个 Leader 推送消息;每个 Follower 都从 Leader 拉取消息,拉取消息成功之后,向 Leader 发送一个 ACK 确认。

img

FAQ

问:为什么让 Leader 处理一切对对 Partition (分区)的读写请求?

答:因为如果允许所有 Broker 都可以处理读写请求,就可能产生数据一致性问题。

Kafka 选举 Leader

由上文可知,Partition 在多个 Broker 上存在副本。

如果某个 Follower 宕机,啥事儿没有,正常工作。

如果 Leader 宕机了,会从 Follower 中重新选举一个新的 Leader。

:::

MQ 架构

【高级】Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?

:::details 要点

ActiveMQ

ActiveMQ 是由 Apache 出品,ActiveMQ 是一个完全支持JMS1.1J2EE 1.4 规范的 JMS Provider 实现。它非常快速,支持 多种语言的客户端协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。

(a) 主要特性

  1. 服从 JMS 规范JMS 规范提供了良好的标准和保证,包括:同步异步 的消息分发,一次和仅一次的消息分发,消息接收订阅 等等。遵从 JMS 规范的好处在于,不论使用什么 JMS 实现提供者,这些基础特性都是可用的;
  2. 连接灵活性ActiveMQ 提供了广泛的 连接协议,支持的协议有:HTTP/SIP 多播SSLTCPUDP 等等。对众多协议的支持让 ActiveMQ 拥有了很好的灵活性;
  3. 支持的协议种类多OpenWireSTOMPRESTXMPPAMQP
  4. 持久化插件和安全插件ActiveMQ 提供了 多种持久化 选择。而且,ActiveMQ 的安全性也可以完全依据用户需求进行 自定义鉴权授权
  5. 支持的客户端语言种类多:除了 Java 之外,还有:C/C++.NETPerlPHPPythonRuby
  6. 代理集群:多个 ActiveMQ 代理 可以组成一个 集群 来提供服务;
  7. 异常简单的管理ActiveMQ 是以开发者思维被设计的。所以,它并不需要专门的管理员,因为它提供了简单又使用的管理特性。有很多中方法可以 监控 ActiveMQ 不同层面的数据,包括使用在 JConsole 或者在 ActiveMQWeb Console 中使用 JMX。通过处理 JMX 的告警消息,通过使用 命令行脚本,甚至可以通过监控各种类型的 日志

(b) 部署环境

ActiveMQ 可以运行在 Java 语言所支持的平台之上。使用 ActiveMQ 需要:

  • Java JDK
  • ActiveMQ 安装包

(c) 优点

  1. 跨平台 (JAVA 编写与平台无关,ActiveMQ 几乎可以运行在任何的 JVM 上);
  2. 可以用 JDBC:可以将 数据持久化 到数据库。虽然使用 JDBC 会降低 ActiveMQ 的性能,但是数据库一直都是开发人员最熟悉的存储介质;
  3. 支持 JMS 规范:支持 JMS 规范提供的 统一接口;
  4. 支持 自动重连错误重试机制
  5. 有安全机制:支持基于 shirojaas 等多种 安全配置机制,可以对 Queue/Topic 进行 认证和授权
  6. 监控完善:拥有完善的 监控,包括 Web ConsoleJMXShell 命令行,JolokiaRESTful API
  7. 界面友善:提供的 Web Console 可以满足大部分情况,还有很多 第三方的组件 可以使用,比如 hawtio

(d) 缺点

  1. 社区活跃度不及 RabbitMQ 高;
  2. 根据其他用户反馈,会出莫名其妙的问题,会 丢失消息
  3. 目前重心放到 activemq 6.0 产品 Apollo,对 5.x 的维护较少;
  4. 不适合用于 上千个队列 的应用场景;

RabbitMQ

RabbitMQ2007 年发布,是一个在 AMQP (高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

(a) 主要特性

  1. 可靠性:提供了多种技术可以让你在 性能可靠性 之间进行 权衡。这些技术包括 持久性机制投递确认发布者证实高可用性机制
  2. 灵活的路由:消息在到达队列前是通过 交换机 进行 路由 的。RabbitMQ 为典型的路由逻辑提供了 多种内置交换机 类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做 RabbitMQ插件 来使用;
  3. 消息集群:在相同局域网中的多个 RabbitMQ 服务器可以 聚合 在一起,作为一个独立的逻辑代理来使用;
  4. 队列高可用:队列可以在集群中的机器上 进行镜像,以确保在硬件问题下还保证 消息安全
  5. 支持多种协议:支持 多种消息队列协议
  6. 支持多种语言:用 Erlang 语言编写,支持只要是你能想到的 所有编程语言
  7. 管理界面RabbitMQ 有一个易用的 用户界面,使得用户可以 监控管理 消息 Broker 的许多方面;
  8. 跟踪机制:如果 消息异常RabbitMQ 提供消息跟踪机制,使用者可以找出发生了什么;
  9. 插件机制:提供了许多 插件,来从多方面进行扩展,也可以编写自己的插件。

(b) 部署环境

RabbitMQ 可以运行在 Erlang 语言所支持的平台之上,包括 SolarisBSDLinuxMacOSXTRU64Windows 等。使用 RabbitMQ 需要:

  • ErLang 语言包
  • RabbitMQ 安装包

(c) 优点

  1. 由于 Erlang 语言的特性,消息队列性能较好,支持 高并发
  2. 健壮、稳定、易用、跨平台、支持 多种语言、文档齐全;
  3. 有消息 确认机制持久化机制,可靠性高;
  4. 高度可定制的 路由
  5. 管理界面 较丰富,在互联网公司也有较大规模的应用,社区活跃度高。

(d) 缺点

  1. 尽管结合 Erlang 语言本身的并发优势,性能较好,但是不利于做 二次开发和维护
  2. 实现了 代理架构,意味着消息在发送到客户端之前可以在 中央节点 上排队。此特性使得 RabbitMQ 易于使用和部署,但是使得其 运行速度较慢,因为中央节点 增加了延迟消息封装后 也比较大;
  3. 需要学习 比较复杂接口和协议,学习和维护成本较高。

RocketMQ

RocketMQ 出自 阿里 的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进,消息可靠性上Kafka 更好。RocketMQ 在阿里内部  被广泛应用在 订单交易充值流计算消息推送日志流式处理binglog 分发 等场景。

(a) 主要特性

  1. 基于 队列模型:具有 高性能高可靠高实时分布式 等特点;
  2. ProducerConsumer队列 都支持 分布式
  3. Producer 向一些队列轮流发送消息,队列集合 称为 TopicConsumer 如果做 广播消费,则一个 Consumer 实例消费这个 Topic 对应的 所有队列;如果做 集群消费,则 多个 Consumer 实例 平均消费 这个 Topic 对应的队列集合;
  4. 能够保证 严格的消息顺序
  5. 提供丰富的 消息拉取模式
  6. 高效的订阅者 水平扩展能力;
  7. 实时消息订阅机制
  8. 亿级 消息堆积 能力;
  9. 较少的外部依赖。

(b) 部署环境

RocketMQ 可以运行在 Java 语言所支持的平台之上。使用 RocketMQ 需要:

  • Java JDK
  • 安装 gitMaven
  • RocketMQ 安装包

(c) 优点

  1. 单机 支持 1 万以上 持久化队列
  2. RocketMQ 的所有消息都是 持久化的,先写入系统 PAGECACHE,然后 刷盘,可以保证 内存磁盘 都有一份数据,而 访问 时,直接 从内存读取
  3. 模型简单,接口易用(JMS 的接口很多场合并不太实用);
  4. 性能非常好,可以允许 大量堆积消息Broker 中;
  5. 支持 多种消费模式,包括 集群消费广播消费等;
  6. 各个环节 分布式扩展设计,支持 主从高可用
  7. 开发度较活跃,版本更新很快。

(d) 缺点

  1. 支持的 客户端语言 不多,目前是 JavaC++,其中 C++ 还不成熟;
  2. RocketMQ 社区关注度及成熟度也不及前两者;
  3. 没有 Web 管理界面,提供了一个 CLI (命令行界面) 管理工具带来 查询管理诊断各种问题
  4. 没有在 MQ 核心里实现 JMS 等接口;

Kafka

Apache Kafka 是一个 分布式消息发布订阅 系统。它最初由 LinkedIn 公司基于独特的设计实现为一个 分布式的日志提交系统 (a distributed commit log),之后成为 Apache 项目的一部分。Kafka 性能高效可扩展良好 并且 可持久化。它的 分区特性可复制可容错 都是其不错的特性。

(a) 主要特性

  1. 快速持久化:可以在 O(1) 的系统开销下进行 消息持久化
  2. 高吞吐:在一台普通的服务器上既可以达到 10W/s吞吐速率
  3. 完全的分布式系统BrokerProducerConsumer 都原生自动支持 分布式,自动实现 负载均衡
  4. 支持 同步异步 复制两种 高可用机制
  5. 支持 数据批量发送拉取
  6. **零拷贝技术(zero-copy)**:减少 IO 操作步骤,提高 系统吞吐量
  7. 数据迁移扩容 对用户透明;
  8. 无需停机 即可扩展机器;
  9. 其他特性:丰富的 消息拉取模型、高效 订阅者水平扩展、实时的 消息订阅、亿级的 消息堆积能力、定期删除机制;

(b) 部署环境

使用 Kafka 需要:

  • Java JDK
  • Kafka 安装包

(c) 优点

  1. 客户端语言丰富:支持 Java.NetPHPRubyPythonGo 等多种语言;
  2. 高性能:单机写入 TPS 约在 100 万条/秒,消息大小 10 个字节;
  3. 提供 完全分布式架构,并有 replica 机制,拥有较高的 可用性可靠性,理论上支持 消息无限堆积
  4. 支持批量操作;
  5. 消费者 采用 Pull 方式获取消息。消息有序通过控制 能够保证所有消息被消费且仅被消费 一次
  6. 有优秀的第三方 Kafka Web 管理界面 Kafka-Manager
  7. 日志领域 比较成熟,被多家公司和多个开源项目使用。

(d) 缺点

  1. Kafka 单机超过 64队列/分区 时,Load 时会发生明显的飙高现象。队列 越多,负载 越高,发送消息 响应时间变长
  2. 使用 短轮询方式实时性 取决于 轮询间隔时间
  3. 消费失败 不支持重试
  4. 支持 消息顺序,但是 一台代理宕机 后,就会产生 消息乱序
  5. 社区更新较慢。

技术选型

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,比 RocketMQ、Kafka 低一个数量级 同 ActiveMQ 10 万级,支撑高吞吐 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic 数量对吞吐量的影响 topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性 ms 级 微秒级,这是 RabbitMQ 的一大特点,延迟最低 ms 级 延迟在 ms 级以内
可用性 高,基于主从架构实现高可用 同 ActiveMQ 非常高,分布式架构 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到 0 丢失 同 RocketMQ
功能支持 MQ 领域的功能极其完备 基于 erlang 开发,并发能力很强,性能极好,延时很低 MQ 功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

综上,各种对比之后,有如下建议:

  • 一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;
  • 后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;
  • 不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。
  • 所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。
  • 如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

:::

【高级】什么是 JMS?

:::details 要点

提到 MQ,就顺便提一下 JMS 。

JMS(JAVA Message Service,java 消息服务)API 是一个消息服务的标准/规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

在 EJB 架构中,有消息 bean 可以无缝的与 JMS 消息服务集成。在 J2EE 架构模式中,有消息服务者模式,用于实现消息与应用直接的解耦。

JMS 消息模型

在 JMS 标准中,有两种消息模型:

  • P2P(Point to Point)
  • Pub/Sub(Publish/Subscribe)
P2P 模式

P2P 模式包含三个角色:MQ(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

P2P 的特点

  • 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在 MQ 中)
  • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
  • 接收者在成功接收消息之后需向队列应答成功

如果希望发送的每个消息都会被成功处理的话,那么需要 P2P 模式。

Pub/sub 模式

包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到 Topic,系统将这些消息传递给多个订阅者。

Pub/Sub 的特点

  • 每个消息可以有多个消费者
  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
  • 为了消费消息,订阅者必须保持运行的状态。

为了缓和这样严格的时间相关性,JMS 允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用 Pub/Sub 模型。

JMS 消息消费

在 JMS 中,消息的产生和消费都是异步的。对于消费来说,JMS 的消息者可以通过两种方式来消费消息。

  • 同步 - 订阅者或接收者通过 receive 方法来接收消息,receive 方法在接收到消息之前(或超时之前)将一直阻塞;
  • 异步 - 订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的 onMessage 方法。

JNDI - Java 命名和目录接口,是一种标准的 Java 命名系统接口。可以在网络上查找和访问服务。通过指定一个资源名称,该名称对应于数据库或命名服务中的一个记录,同时返回资源连接建立所必须的信息。

JNDI 在 JMS 中起到查找和访问发送目标或消息来源的作用。

:::

参考资料

RocketMQ 快速入门

Apache RocketMQ 是一个分布式 MQ 和流处理平台,具有低延迟、高性能和可靠性、万亿级容量和灵活的可扩展性。

RocketMQ 由阿里巴巴孵化,被捐赠给 Apache,成为 Apache 的顶级项目。

RocketMQ 概念

img

消息模型(Message Model)

RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个 Topic 的消息,每个 Topic 的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个 Topic 中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个 Consumer 实例构成。

消息(Message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ 中每个消息拥有唯一的 Message ID,且可以携带具有业务标识的 Key。系统提供了通过 Message ID 和 Key 查询消息的功能。

消息还可以具有可选 Tag 和额外的键值对。例如,您可以为消息设置业务密钥,并在代理服务器上查找消息以诊断开发期间的问题。

标签(Tag)

为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。消费者可以根据 Tag 实现对不同子主题的不同消费逻辑,实现更好的扩展性。

Tag 相当于子主题,为用户提供了额外的灵活性。对于 Tag,来自同一业务模块的具有不同目的的消息可以具有相同的主题和不同的 Tag。

主题(Topic)

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位。

代理服务器(Broker Server)

消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

名称服务(Name Server)

名称服务充当路由消息的提供者。生产者或消费者能够通过名称服务查找各主题相应的 Broker IP 列表。多个 Namesrv 实例组成集群,但相互独立,没有信息交换。

消息生产者(Producer)

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。

生产者组(Producer Group)

同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

警告:考虑到提供的 Producer 在发送消息方面足够强大,每个 Producer 组只允许一个实例,以避免不必要的生成器实例初始化

消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

消费者组(Consumer Group)

同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的 Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

拉取式消费(Pull Consumer)

Consumer 消费的一种类型,应用通常主动调用 Consumer 的拉消息方法从 Broker 服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

推动式消费(Push Consumer)

Consumer 消费的一种类型,该模式下 Broker 收到数据后会主动推送给消费端,该消费模式一般实时性较高。

集群消费(Clustering)

集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。

广播消费(Broadcasting)

广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。

普通顺序消息(Normal Ordered Message)

普通顺序消费模式下,消费者通过同一个消息队列( Topic 分区,称作 Message Queue) 收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。

严格顺序消息(Strictly Ordered Message)

严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

RocketMQ 特性

订阅与发布

消息的发布是指某个生产者向某个 topic 发送消息;消息的订阅是指某个消费者关注了某个 topic 中带有某些 tag 的消息,进而从该 topic 消费数据。

消息顺序

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ 可以严格的保证消息有序。

顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个 Topic 下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

  • 全局顺序 对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
  • 分区顺序 对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。 适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

消息过滤

RocketMQ 的消费者可以根据 Tag 进行消息过滤,也支持自定义属性过滤。消息过滤目前是在 Broker 端实现的,优点是减少了对于 Consumer 无用消息的网络传输,缺点是增加了 Broker 的负担、而且实现相对复杂。

消息可靠性

RocketMQ 支持消息的高可靠,影响消息可靠性的几种情况:

  1. Broker 非正常关闭
  2. Broker 异常 Crash
  3. OS Crash
  4. 机器掉电,但是能立即恢复供电情况
  5. 机器无法开机(可能是 cpu、主板、内存等关键设备损坏)
  6. 磁盘设备损坏

1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,RocketMQ 在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。

5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ 在这两种情况下,通过异步复制,可保证 99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与 Money 相关的应用。注:RocketMQ 从 3.0 版本开始支持同步双写。

至少一次

至少一次(At least Once)指每个消息必须投递一次。Consumer 先 Pull 消息到本地,消费完成后,才向服务器返回 ack,如果没有消费一定不会 ack 消息,所以 RocketMQ 可以很好的支持此特性。

回溯消费

回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在向 Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒。

事务消息

RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

定时消息

定时消息(延迟队列)是指消息发送到 broker 后,不会立即被消费,等待特定时间投递给真正的 topic。 broker 有配置项 messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18 个 level。可以配置自定义 messageDelayLevel。注意,messageDelayLevel 是 broker 的属性,不属于某个 topic。发消息时,设置 delayLevel 等级即可:msg.setDelayLevel(level)。level 有以下三种情况:

  • level == 0,消息为非延迟消息
  • 1<=level<=maxLevel,消息延迟特定时间,例如 level==1,延迟 1s
  • level > maxLevel,则 level== maxLevel,例如 level==20,延迟 2h

定时消息会暂存在名为 SCHEDULE_TOPIC_XXXX 的 topic 中,并根据 delayTimeLevel 存入特定的 queue,queueId = delayTimeLevel – 1,即一个 queue 只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker 会调度地消费 SCHEDULE_TOPIC_XXXX,将消息写入真实的 topic。

需要注意的是,定时消息会在第一次写入和调度写入真实 topic 时都会计数,因此发送数量、tps 都会变高。

消息重试

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败通常可以认为有以下几种情况:

  • 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过 10 秒后再重试。
  • 由于依赖的下游应用服务不可用,例如 db 连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用 sleep 30s,再消费下一条消息,这样可以减轻 Broker 重试消息的压力。

RocketMQ 会为每个消费组都设置一个 Topic 名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个 Topic 的重试队列是针对消费组,而不是针对每个 Topic 设置的),用于暂时保存因为各种异常而导致 Consumer 端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试消息的处理是先保存至 Topic 名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行 Delay 后重新保存至“%RETRY%+consumerGroup”的重试队列中。

消息重投

生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway 没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在 RocketMQ 中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer 负载变化也会导致重复消息。如下方法可以设置消息重试策略:

  • retryTimesWhenSendFailed:同步发送失败重投次数,默认为 2,因此生产者会最多尝试发送 retryTimesWhenSendFailed + 1 次。不会选择上次失败的 broker,尝试向其他 broker 发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投。
  • retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他 broker,仅在同一个 broker 上做重试,不保证消息不丢。
  • retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或 slave 不可用(返回状态非 SEND_OK),是否尝试发送到其他 broker,默认 false。十分重要消息可以开启。

量控制

生产者流控,因为 broker 处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。

生产者流控:

  • commitLog 文件被锁时间超过 osPageCacheBusyTimeOutMills 时,参数默认为 1000ms,返回流控。
  • 如果开启 transientStorePoolEnable == true,且 broker 为异步刷盘的主机,且 transientStorePool 中资源不足,拒绝当前 send 请求,返回流控。
  • broker 每隔 10ms 检查 send 请求队列头部请求的等待时间,如果超过 waitTimeMillsInSendQueue,默认 200ms,拒绝当前 send 请求,返回流控。
  • broker 通过拒绝 send 请求方式实现流量控制。

注意,生产者流控,不会尝试消息重投。

消费者流控:

  • 消费者本地缓存消息数超过 pullThresholdForQueue 时,默认 1000。
  • 消费者本地缓存消息大小超过 pullThresholdSizeForQueue 时,默认 100MB。
  • 消费者本地缓存消息跨度超过 consumeConcurrentlyMaxSpan 时,默认 2000。

消费者流控的结果是降低拉取频率。

死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在 RocketMQ 中,可以通过使用 console 控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

RocketMQ 组件

img

RocketMQ 由四部分组成:NameServer、Broker、Producer、Consumer。其中任意一个组成都可以水平扩展为集群模式,以避免单点故障问题。

NameServer(命名服务器)

NameServer 是一个 Topic 路由注册中心,其角色类似 Kafka 中的 zookeeper,支持 Broker 的动态注册与发现。每个 NameServer 记录完整的路由信息,提供相应的读写服务,支持快速存储扩展。

NameServer 主要包括两个功能:

  • Broker 管理,NameServer 接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查 Broker 是否还存活;
  • 路由信息管理,每个 NameServer 将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。然后 Producer 和 Conumser 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费。

NameServer 通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker 是向每一台 NameServer 注册自己的路由信息,所以每一个 NameServer 实例上面都保存一份完整的路由信息。当某个 NameServer 因某种原因下线了,Broker 仍然可以向其它 NameServer 同步其路由信息,Producer、Consumer 仍然可以动态感知 Broker 的路由的信息。

NameServer 是一个功能齐全的服务器,主要包括两个功能:

  1. Broker 管理 - NameServer 接受来自 Broker 集群的注册,并提供心跳机制来检查 Broker 节点是否存活。
  2. 路由管理 - 每个 NameServer 将保存有关 Broker 集群的完整路由信息和客户端查询的查询队列。

RocketMQ 客户端(Producer/Consumer)将从 NameServer 查询队列路由信息。

将 NameServer 地址列表提供给客户端有四种方法:

  1. 编程方式 - 类似:producer.setNamesrvAddr("ip:port")
  2. Java 选项 - 使用 rocketmq.namesrv.addr 参数
  3. 环境变量 - 设置环境变量 NAMESRV_ADDR
  4. HTTP 端点

更详细信息可以参考官方文档:here

Broker(代理)

Broker 主要负责消息的存储、投递和查询以及服务高可用保证。

Broker 同时支持推拉模型,包含容错机制(2 副本或 3 副本),并提供强大的峰值填充和按原始时间顺序累积数千亿消息的能力。此外,Broker 提供了灾难恢复、丰富的指标统计和警报机制,这些都是传统 MQ 所缺乏的。

为了实现这些功能,Broker 包含了以下几个重要子模块:

  • Remoting Module:整个 Broker 的实体,负责处理来自 clients 端的请求。
  • Client Manager:负责管理客户端(Producer/Consumer)和维护 Consumer 的 Topic 订阅信息。
  • Store Service:提供方便简单的 API 接口处理消息存储到物理硬盘和查询功能。
  • HA Service:高可用服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能。
  • Index Service:根据特定的 Message key 对投递到 Broker 的消息进行索引服务,以提供消息的快速查询。

img

Producer(生产者)

Producers 支持分布式集群方式部署。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

Consumer(消费者)

Consumer 支持分布式集群方式部署。支持以 push 推,pull 拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

RocketMQ 安装

环境要求

  • 推荐 64 位操作系统:Linux/Unix/Mac
  • 64bit JDK 1.8+
  • Maven 3.2.x
  • Git

下载解压

进入官方下载地址:https://rocketmq.apache.org/dowloading/releases/,选择合适版本

建议选择 binary 版本。

解压到本地:

1
2
> unzip rocketmq-all-4.2.0-source-release.zip
> cd rocketmq-all-4.2.0/

启动 Name Server

1
2
3
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

启动 Broker

1
2
3
> nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
> tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success...

收发消息

执行收发消息操作之前,不许告诉客户端命名服务器的位置。在 RocketMQ 中有多种方法来实现这个目的。这里,我们使用最简单的方法——设置环境变量 NAMESRV_ADDR

1
2
3
4
5
6
> export NAMESRV_ADDR=localhost:9876
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

关闭服务器

1
2
3
4
5
6
7
> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK

RocketMQ 入门级示例

首先在项目中引入 maven 依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>

Producer

Producer 在 RocketMQ 中负责发送消息。

RocketMQ 有三种消息发送方式:

  • 可靠的同步发送
  • 可靠的异步发送
  • 单项发送

可靠的同步发送

可靠的同步传输用于广泛的场景,如重要的通知消息,短信通知,短信营销系统等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}

可靠的异步发送

异步传输通常用于响应时间敏感的业务场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
//Launch the instance.
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}

单向传输

单向传输用于需要中等可靠性的情况,例如日志收集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class OnewayProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);

}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}

Consumer

Consumer 在 RocketMQ 中负责接收消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setNamesrvAddr(RocketConfig.HOST);

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

consumer.registerMessageListener(new MessageListenerOrderly() {

AtomicLong consumeTimes = new AtomicLong(0);

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;

}
});

consumer.start();

System.out.printf("Consumer Started.%n");
}
}

RocketMQ 官方示例

参考资料

ActiveMQ 快速入门

JMS 基本概念

JMSJava 消息服务(Java Message Service)API,是一个 Java 平台中关于面向消息中间件的 API。它用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对 JMS 提供支持。

消息模型

JMS 有两种消息模型:

  • Point-to-Point(P2P)
  • Publish/Subscribe(Pub/Sub)

P2P 的特点

img

在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列 javax.jms.Queue 相关联。

每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)。

发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。

接收者在成功接收消息之后需向队列应答成功。

如果你希望发送的每个消息都应该被成功处理的话,那么你需要 P2P 模式。

Pub/Sub 的特点

img

发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题 javax.jms.Topic 关联。

每个消息可以有多个消费者。

发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。

为了缓和这样严格的时间相关性,JMS 允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

如果你希望发送的消息可以不被做任何处理、或者被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用 Pub/Sub 模型。

JMS 编程模型

img

ConnectionFactory

创建 Connection 对象的工厂,针对两种不同的 jms 消息模型,分别有 QueueConnectionFactoryTopicConnectionFactory 两种。可以通过 JNDI 来查找 ConnectionFactory 对象。

Connection

Connection 表示在客户端和 JMS 系统之间建立的链接(对 TCP/IP socket 的包装)。Connection 可以产生一个或多个Session。跟 ConnectionFactory 一样,Connection 也有两种类型:QueueConnectionTopicConnection

Destination

Destination 是一个包装了消息目标标识符的被管对象。消息目标是指消息发布和接收的地点,或者是队列 Queue ,或者是主题 Topic 。JMS 管理员创建这些对象,然后用户通过 JNDI 发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的 Queue,以及发布者/订阅者模型的 Topic

Session

Session 表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息,生产者来发送消息,消费者来接收消息。同样,Session 也分 QueueSessionTopicSession

MessageConsumer

MessageConsumerSession 创建,并用于将消息发送到 Destination。消费者可以同步地(阻塞模式),或(非阻塞)接收 QueueTopic 类型的消息。同样,消息生产者分两种类型:QueueSenderTopicPublisher

MessageProducer

MessageProducerSession 创建,用于接收被发送到 Destination 的消息。MessageProducer 有两种类型:QueueReceiverTopicSubscriber。可分别通过 sessioncreateReceiver(Queue)createSubscriber(Topic) 来创建。当然,也可以 sessioncreatDurableSubscriber 方法来创建持久化的订阅者。

Message

是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分:

  • 消息头(必须):包含用于识别和为消息寻找路由的操作设置。
  • 一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
  • 一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。

消息接口非常灵活,并提供了许多方式来定制消息的内容。

Common Point-to-Point Publish-Subscribe
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopicSession
MessageProducer QueueSender TopicPublisher
MessageSender QueueReceiver, QueueBrowser TopicSubscriber

安装

安装条件

JDK1.7 及以上版本

本地配置了 JAVA_HOME 环境变量。

下载

支持 Windows/Unix/Linux/Cygwin

ActiveMQ 官方下载地址

Windows 下运行

(1)解压压缩包到本地;

(2)打开控制台,进入安装目录的 bin 目录下;

1
cd [activemq_install_dir]

(3)执行 activemq start 来启动 ActiveMQ

1
bin\activemq start

测试安装是否成功

(1)ActiveMQ 默认监听端口为 61616

1
netstat -an|find “61616”

(2)访问 http://127.0.0.1:8161/admin/

(3)输入用户名、密码

1
2
Login: admin
Passwort: admin

(4)点击导航栏的 Queues 菜单

(5)添加一个队列(queue)

项目中的应用

引入依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.1</version>
</dependency>

Sender.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class Sender {
private static final int SEND_NUMBER = 4;

public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory;
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
Session session;
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// MessageProducer:消息发送者
MessageProducer producer;
// TextMessage message;
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
// 得到消息生成者【发送者】
producer = session.createProducer(destination);
// 设置不持久化,此处学习,实际根据项目决定
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 构造消息,此处写死,项目就是参数,或者方法获取
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}

public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("ActiveMq 发送的消息" + i);
// 发送消息到目的地方
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
}
}
}

Receiver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class Receiver {
public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory;
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
Session session;
// Destination :消息的目的地;消息发送给谁.
Destination destination;
// 消费者,消息接收者
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
consumer = session.createConsumer(destination);
while (true) {
//设置接收者接收消息的时间,为了便于测试,这里谁定为100s
TextMessage message = (TextMessage) consumer.receive(100000);
if (null != message) {
System.out.println("收到消息" + message.getText());
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}

运行

先运行 Receiver.java 进行消息监听,再运行 Send.java 发送消息。

输出

Send 的输出内容

1
2
3
4
发送消息:Activemq 发送消息0
发送消息:Activemq 发送消息1
发送消息:Activemq 发送消息2
发送消息:Activemq 发送消息3

Receiver 的输出内容

1
2
3
4
收到消息ActiveMQ 发送消息0
收到消息ActiveMQ 发送消息1
收到消息ActiveMQ 发送消息2
收到消息ActiveMQ 发送消息3

资源

Flink ETL

Apache Flink 的一种常见应用场景是 ETL(抽取、转换、加载)管道任务。从一个或多个数据源获取数据,进行一些转换操作和信息补充,将结果存储起来。在这个教程中,我们将介绍如何使用 Flink 的 DataStream API 实现这类应用。

这里注意,Flink 的 Table 和 SQL API 完全可以满足很多 ETL 使用场景。但无论你最终是否直接使用 DataStream API,对这里介绍的基本知识有扎实的理解都是有价值的。

无状态的转换

本节涵盖了 map()flatmap(),这两种算子可以用来实现无状态转换的基本操作。

map()

在第一个练习中,你将过滤出租车行程数据中的事件。在同一代码仓库中,有一个 GeoUtils 类,提供了一个静态方法 GeoUtils.mapToGridCell(float lon, float lat),它可以将位置坐标(经度,维度)映射到 100x100 米的对应不同区域的网格单元。

现在让我们为每个出租车行程时间的数据对象增加 startCellendCell 字段。你可以创建一个继承 TaxiRideEnrichedRide 类,添加这些字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static class EnrichedRide extends TaxiRide {
public int startCell;
public int endCell;

public EnrichedRide() {}

public EnrichedRide(TaxiRide ride) {
this.rideId = ride.rideId;
this.isStart = ride.isStart;
...
this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
}

public String toString() {
return super.toString() + "," +
Integer.toString(this.startCell) + "," +
Integer.toString(this.endCell);
}
}

然后你可以创建一个应用来转换这个流

1
2
3
4
5
6
7
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));

DataStream<EnrichedRide> enrichedNYCRides = rides
.filter(new RideCleansingSolution.NYCFilter())
.map(new Enrichment());

enrichedNYCRides.print();

使用这个 MapFunction:

1
2
3
4
5
6
7
public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {

@Override
public EnrichedRide map(TaxiRide taxiRide) throws Exception {
return new EnrichedRide(taxiRide);
}
}

flatmap()

MapFunction 只适用于一对一的转换:对每个进入算子的流元素,map() 将仅输出一个转换后的元素。对于除此以外的场景,你将要使用 flatmap()

1
2
3
4
5
6
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));

DataStream<EnrichedRide> enrichedNYCRides = rides
.flatMap(new NYCEnrichment());

enrichedNYCRides.print();

其中用到的 FlatMapFunction :

1
2
3
4
5
6
7
8
9
10
public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {

@Override
public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
if (valid.filter(taxiRide)) {
out.collect(new EnrichedRide(taxiRide));
}
}
}

使用接口中提供的 Collectorflatmap() 可以输出你想要的任意数量的元素,也可以一个都不发。

Keyed Streams

keyBy()

将一个流根据其中的一些属性来进行分区是十分有用的,这样我们可以使所有具有相同属性的事件分到相同的组里。例如,如果你想找到从每个网格单元出发的最远的出租车行程。按 SQL 查询的方式来考虑,这意味着要对 startCell 进行 GROUP BY 再排序,在 Flink 中这部分可以用 keyBy(KeySelector) 实现。

1
2
3
rides
.flatMap(new NYCEnrichment())
.keyBy(enrichedRide -> enrichedRide.startCell)

每个 keyBy 会通过 shuffle 来为数据流进行重新分区。总体来说这个开销是很大的,它涉及网络通信、序列化和反序列化。

keyBy and network shuffle

通过计算得到键

KeySelector 不仅限于从事件中抽取键。你也可以按想要的方式计算得到键值,只要最终结果是确定的,并且实现了 hashCode()equals()。这些限制条件不包括产生随机数或者返回 Arrays 或 Enums 的 KeySelector,但你可以用元组和 POJO 来组成键,只要他们的元素遵循上述条件。

键必须按确定的方式产生,因为它们会在需要的时候被重新计算,而不是一直被带在流记录中。

例如,比起创建一个新的带有 startCell 字段的 EnrichedRide 类,用这个字段作为 key:

1
keyBy(enrichedRide -> enrichedRide.startCell)

我们更倾向于这样做:

1
keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))

Keyed Stream 的聚合

以下代码为每个行程结束事件创建了一个新的包含 startCell 和时长(分钟)的元组流:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.joda.time.Interval;

DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
.flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {

@Override
public void flatMap(EnrichedRide ride,
Collector<Tuple2<Integer, Minutes>> out) throws Exception {
if (!ride.isStart) {
Interval rideInterval = new Interval(ride.startTime, ride.endTime);
Minutes duration = rideInterval.toDuration().toStandardMinutes();
out.collect(new Tuple2<>(ride.startCell, duration));
}
}
});

现在就可以产生一个流,对每个 startCell 仅包含那些最长行程的数据。

有很多种方法表示使用哪个字段作为键。前面使用 EnrichedRide POJO 的例子,用字段名来指定键。而这个使用 Tuple2 对象的例子中,用字段在元组中的序号(从 0 开始)来指定键。

1
2
3
4
minutesByStartCell
.keyBy(value -> value.f0) // .keyBy(value -> value.startCell)
.maxBy(1) // duration
.print();

现在每次行程时长达到新的最大值,都会输出一条新记录,例如下面这个对应 50797 网格单元的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
...
4> (64549,5M)
4> (46298,18M)
1> (51549,14M)
1> (53043,13M)
1> (56031,22M)
1> (50797,6M)
...
1> (50797,8M)
...
1> (50797,11M)
...
1> (50797,12M)

(隐式的)状态

这是培训中第一个涉及到有状态流的例子。尽管状态的处理是透明的,Flink 必须跟踪每个不同的键的最大时长。

只要应用中有状态,你就应该考虑状态的大小。如果键值的数量是无限的,那 Flink 的状态需要的空间也同样是无限的。

在流处理场景中,考虑有限窗口的聚合往往比整个流聚合更有意义。

reduce() 和其他聚合算子

上面用到的 maxBy() 只是 Flink 中 KeyedStream 上众多聚合函数中的一个。还有一个更通用的 reduce() 函数可以用来实现你的自定义聚合。

有状态的转换

在 Flink 不参与管理状态的情况下,你的应用也可以使用状态,但 Flink 为其管理状态提供了一些引人注目的特性:

  • 本地性: Flink 状态是存储在使用它的机器本地的,并且可以以内存访问速度来获取
  • 持久性: Flink 状态是容错的,例如,它可以自动按一定的时间间隔产生 checkpoint,并且在任务失败后进行恢复
  • 纵向可扩展性: Flink 状态可以存储在集成的 RocksDB 实例中,这种方式下可以通过增加本地磁盘来扩展空间
  • 横向可扩展性: Flink 状态可以随着集群的扩缩容重新分布
  • 可查询性: Flink 状态可以通过使用 状态查询 API 从外部进行查询。

在本节中你将学习如何使用 Flink 的 API 来管理 keyed state。

Rich Functions

至此,你已经看到了 Flink 的几种函数接口,包括 FilterFunctionMapFunction,和 FlatMapFunction。这些都是单一抽象方法模式。

对其中的每一个接口,Flink 同样提供了一个所谓 “rich” 的变体,如 RichFlatMapFunction,其中增加了以下方法,包括:

  • open(Configuration c)
  • close()
  • getRuntimeContext()

open() 仅在算子初始化时调用一次。可以用来加载一些静态数据,或者建立外部服务的链接等。

getRuntimeContext() 为整套潜在有趣的东西提供了一个访问途径,最明显的,它是你创建和访问 Flink 状态的途径。

一个使用 Keyed State 的例子

在这个例子里,想象你有一个要去重的事件数据流,对每个键只保留第一个事件。下面是完成这个功能的应用,使用一个名为 DeduplicatorRichFlatMapFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static class Event {
public final String key;
public final long timestamp;
...
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(new EventSource())
.keyBy(e -> e.key)
.flatMap(new Deduplicator())
.print();

env.execute();
}

为了实现这个功能,Deduplicator 需要记录每个键是否已经有了相应的记录。它将通过使用 Flink 的 keyed state 接口来做这件事。

当你使用像这样的 keyed stream 的时候,Flink 会为每个状态中管理的条目维护一个键值存储。

Flink 支持几种不同方式的 keyed state,这个例子使用的是最简单的一个,叫做 ValueState。意思是对于 每个键 ,Flink 将存储一个单一的对象 —— 在这个例子中,存储的是一个 Boolean 类型的对象。

我们的 Deduplicator 类有两个方法:open()flatMap()open() 方法通过定义 ValueStateDescriptor<Boolean> 建立了管理状态的使用。构造器的参数定义了这个状态的名字(“keyHasBeenSeen”),并且为如何序列化这些对象提供了信息(在这个例子中的 Types.BOOLEAN)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
ValueState<Boolean> keyHasBeenSeen;

@Override
public void open(Configuration conf) {
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
keyHasBeenSeen = getRuntimeContext().getState(desc);
}

@Override
public void flatMap(Event event, Collector<Event> out) throws Exception {
if (keyHasBeenSeen.value() == null) {
out.collect(event);
keyHasBeenSeen.update(true);
}
}
}

当 flatMap 方法调用 keyHasBeenSeen.value() 时,Flink 会在 当前键的上下文 中检索状态值,只有当状态为 null 时,才会输出当前事件。这种情况下,它同时也将更新 keyHasBeenSeentrue

这种访问和更新按键分区的状态的机制也许看上去很神奇,因为在 Deduplicator 的实现中,键不是明确可见的。当 Flink 运行时调用 RichFlatMapFunctionopen 方法时, 是没有事件的,所以这个时候上下文中不含有任何键。但当它调用 flatMap 方法,被处理的事件的键在运行时中就是可用的了,并且被用来确定操作哪个 Flink 状态后端的入口。

部署在分布式集群时,将会有很多 Deduplicator 的实例,每一个实例将负责整个键空间的互斥子集中的一个。所以,当你看到一个单独的 ValueState,比如

1
ValueState<Boolean> keyHasBeenSeen;

要理解这个代表的不仅仅是一个单独的布尔类型变量,而是一个分布式的共享键值存储。

清理状态

上面例子有一个潜在的问题:当键空间是无界的时候将发生什么?Flink 会对每个使用过的键都存储一个 Boolean 类型的实例。如果是键是有限的集合还好,但在键无限增长的应用中,清除再也不会使用的状态是很必要的。这通过在状态对象上调用 clear() 来实现,如下:

1
keyHasBeenSeen.clear()

对一个给定的键值,你也许想在它一段时间不使用后来做这件事。当学习 ProcessFunction 的相关章节时,你将看到在事件驱动的应用中怎么用定时器来做这个。

也可以选择使用 状态的过期时间(TTL),为状态描述符配置你想要旧状态自动被清除的时间。

Non-keyed State

在没有键的上下文中我们也可以使用 Flink 管理的状态。这也被称作 算子的状态。它包含的接口是很不一样的,由于对用户定义的函数来说使用 non-keyed state 是不太常见的,所以这里就不多介绍了。这个特性最常用于 source 和 sink 的实现。

Connected Streams

相比于下面这种预先定义的转换:

simple transformation

有时你想要更灵活地调整转换的某些功能,比如数据流的阈值、规则或者其他参数。Flink 支持这种需求的模式称为 connected streams ,一个单独的算子有两个输入流。

connected streams

connected stream 也可以被用来实现流的关联。

示例

在这个例子中,一个控制流是用来指定哪些词需要从 streamOfWords 里过滤掉的。 一个称为 ControlFunctionRichCoFlatMapFunction 作用于连接的流来实现这个功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> control = env
.fromElements("DROP", "IGNORE")
.keyBy(x -> x);

DataStream<String> streamOfWords = env
.fromElements("Apache", "DROP", "Flink", "IGNORE")
.keyBy(x -> x);

control
.connect(streamOfWords)
.flatMap(new ControlFunction())
.print();

env.execute();
}

这里注意两个流只有键一致的时候才能连接。 keyBy 的作用是将流数据分区,当 keyed stream 被连接时,他们必须按相同的方式分区。这样保证了两个流中所有键相同的事件发到同一个实例上。这样也使按键关联两个流成为可能。

在这个例子中,两个流都是 DataStream<String> 类型的,并且都将字符串作为键。正如你将在下面看到的,RichCoFlatMapFunction 在状态中存了一个布尔类型的变量,这个变量被两个流共享。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private ValueState<Boolean> blocked;

@Override
public void open(Configuration config) {
blocked = getRuntimeContext()
.getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}

@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}

@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(data_value);
}
}
}

RichCoFlatMapFunction 是一种可以被用于一对连接流的 FlatMapFunction,并且它可以调用 rich function 的接口。这意味着它可以是有状态的。

布尔变量 blocked 被用于记录在数据流 control 中出现过的键(在这个例子中是单词),并且这些单词从 streamOfWords 过滤掉。这是 keyed state,并且它是被两个流共享的,这也是为什么两个流必须有相同的键值空间。

在 Flink 运行时中,flatMap1flatMap2 在连接流有新元素到来时被调用 —— 在我们的例子中,control 流中的元素会进入 flatMap1streamOfWords 中的元素会进入 flatMap2。这是由两个流连接的顺序决定的,本例中为 control.connect(streamOfWords)

认识到你没法控制 flatMap1flatMap2 的调用顺序是很重要的。这两个输入流是相互竞争的关系,Flink 运行时将根据从一个流或另一个流中消费的事件做它要做的。对于需要保证时间和/或顺序的场景,你会发现在 Flink 的管理状态中缓存事件一直到它们能够被处理是必须的。(注意:如果你真的感到绝望,可以使用自定义的算子实现 InputSelectable 接口,在两输入算子消费它的输入流时增加一些顺序上的限制。)

参考资料

Flink API

Flink 为流式/批式处理应用程序的开发提供了不同级别的抽象。

Programming levels of abstraction

  • Flink API 最底层的抽象为有状态实时流处理。其抽象实现是 Process Function,并且 Process Function 被 Flink 框架集成到了 DataStream API 中来为我们使用。它允许用户在应用程序中自由地处理来自单流或多流的事件(数据),并提供具有全局一致性和容错保障的状态。此外,用户可以在此层抽象中注册事件时间(event time)和处理时间(processing time)回调方法,从而允许程序可以实现复杂计算。

  • Flink API 第二层抽象是 Core APIs。实际上,许多应用程序不需要使用到上述最底层抽象的 API,而是可以使用 Core APIs 进行编程:其中包含 DataStream API(应用于有界/无界数据流场景)和 DataSet API(应用于有界数据集场景)两部分。Core APIs 提供的流式 API(Fluent API)为数据处理提供了通用的模块组件,例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等。此层 API 中处理的数据类型在每种编程语言中都有其对应的类。

    Process Function 这类底层抽象和 DataStream API 的相互集成使得用户可以选择使用更底层的抽象 API 来实现自己的需求。DataSet API 还额外提供了一些原语,比如循环/迭代(loop/iteration)操作。

  • Flink API 第三层抽象是 Table APITable API 是以表(Table)为中心的声明式编程(DSL)API,例如在流式数据场景下,它可以表示一张正在动态改变的表。Table API 遵循(扩展)关系模型:即表拥有 schema(类似于关系型数据库中的 schema),并且 Table API 也提供了类似于关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。Table API 程序是以声明的方式定义应执行的逻辑操作,而不是确切地指定程序应该执行的代码。尽管 Table API 使用起来很简洁并且可以由各种类型的用户自定义函数扩展功能,但还是比 Core API 的表达能力差。此外,Table API 程序在执行之前还会使用优化器中的优化规则对用户编写的表达式进行优化。

    表和 DataStream/DataSet 可以进行无缝切换,Flink 允许用户在编写应用程序时将 Table APIDataStream/DataSet API 混合使用。

  • Flink API 最顶层抽象是 SQL。这层抽象在语义和程序表达式上都类似于 _Table API_,但是其程序实现都是 SQL 查询表达式。SQL 抽象与 Table API 抽象之间的关联是非常紧密的,并且 SQL 查询语句可以在 Table API 中定义的表上执行。

ProcessFunction

ProcessFunction 是 Flink 所提供的最具表达力的接口。ProcessFunction 可以处理一或两条输入数据流中的单个事件或者归入一个特定窗口内的多个事件。它提供了对于时间和状态的细粒度控制。开发者可以在其中任意地修改状态,也能够注册定时器用以在未来的某一时刻触发回调函数。因此,你可以利用 ProcessFunction 实现许多有状态的事件驱动应用所需要的基于单个事件的复杂业务逻辑。

下面的代码示例展示了如何在 KeyedStream 上利用 KeyedProcessFunction 对标记为 STARTEND 的事件进行处理。当收到 START 事件时,处理函数会记录其时间戳,并且注册一个时长 4 小时的计时器。如果在计时器结束之前收到 END 事件,处理函数会计算其与上一个 START 事件的时间间隔,清空状态并将计算结果返回。否则,计时器结束,并清空状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**

* 将相邻的 keyed START 和 END 事件相匹配并计算两者的时间间隔
* 输入数据为 Tuple2<String, String> 类型,第一个字段为 key 值,
* 第二个字段标记 START 和 END 事件。
*/
public static class StartEndDuration
extends KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, Long>> {

private ValueState<Long> startTime;

@Override
public void open(Configuration conf) {
// obtain state handle
startTime = getRuntimeContext()
.getState(new ValueStateDescriptor<Long>("startTime", Long.class));
}

/** Called for each processed event. */
@Override
public void processElement(
Tuple2<String, String> in,
Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {

switch (in.f1) {
case "START":
// set the start time if we receive a start event.
startTime.update(ctx.timestamp());
// register a timer in four hours from the start event.
ctx.timerService()
.registerEventTimeTimer(ctx.timestamp() + 4 * 60 * 60 * 1000);
break;
case "END":
// emit the duration between start and end event
Long sTime = startTime.value();
if (sTime != null) {
out.collect(Tuple2.of(in.f0, ctx.timestamp() - sTime));
// clear the state
startTime.clear();
}
default:
// do nothing
}
}

/** Called when a timer fires. */
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) {

// Timeout interval exceeded. Cleaning up the state.
startTime.clear();
}
}

这个例子充分展现了 KeyedProcessFunction 强大的表达力,也因此是一个实现相当复杂的接口。

DataStream API

DataStream API 为许多通用的流处理操作提供了处理原语。这些操作包括窗口、逐条记录的转换操作,在处理事件时进行外部数据库查询等。DataStream API 支持 Java 和 Scala 语言,预先定义了例如map()reduce()aggregate() 等函数。你可以通过扩展实现预定义接口或使用 Java、Scala 的 lambda 表达式实现自定义的函数。

下面的代码示例展示了如何捕获会话时间范围内所有的点击流事件,并对每一次会话的点击量进行计数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 网站点击 Click 的数据流
DataStream<Click> clicks = ...

DataStream<Tuple2<String, Long>> result = clicks
// 将网站点击映射为 (userId, 1) 以便计数
.map(
// 实现 MapFunction 接口定义函数
new MapFunction<Click, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Click click) {
return Tuple2.of(click.userId, 1L);
}
})
// 以 userId (field 0) 作为 key
.keyBy(0)
// 定义 30 分钟超时的会话窗口
.window(EventTimeSessionWindows.withGap(Time.minutes(30L)))
// 对每个会话窗口的点击进行计数,使用 lambda 表达式定义 reduce 函数
.reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));

SQL & Table API

Flink 支持两种关系型的 API,Table API 和 SQL。这两个 API 都是批处理和流处理统一的 API,这意味着在无边界的实时数据流和有边界的历史记录数据流上,关系型 API 会以相同的语义执行查询,并产生相同的结果。Table API 和 SQL 借助了 Apache Calcite 来进行查询的解析,校验以及优化。它们可以与 DataStream 和 DataSet API 无缝集成,并支持用户自定义的标量函数,聚合函数以及表值函数。

Flink 的关系型 API 旨在简化数据分析数据流水线和 ETL 应用的定义。

下面的代码示例展示了如何使用 SQL 语句查询捕获会话时间范围内所有的点击流事件,并对每一次会话的点击量进行计数。此示例与上述 DataStream API 中的示例有着相同的逻辑。

1
2
3
SELECT userId, COUNT(*)
FROM clicks
GROUP BY SESSION(clicktime, INTERVAL '30' MINUTE), userId

Flink 具有数个适用于常见数据处理应用场景的扩展库。这些库通常嵌入在 API 中,且并不完全独立于其它 API。它们也因此可以受益于 API 的所有特性,并与其他库集成。

  • **复杂事件处理(CEP)**:模式检测是事件流处理中的一个非常常见的用例。Flink 的 CEP 库提供了 API,使用户能够以例如正则表达式或状态机的方式指定事件模式。CEP 库与 Flink 的 DataStream API 集成,以便在 DataStream 上评估模式。CEP 库的应用包括网络入侵检测,业务流程监控和欺诈检测。
  • **DataSet API*:DataSet API 是 Flink 用于批处理应用程序的核心 API。DataSet API 所提供的基础算子包括mapreduce_、(outer) join_、_co-group_、iterate*等。所有算子都有相应的算法和数据结构支持,对内存中的序列化数据进行操作。如果数据大小超过预留内存,则过量数据将存储到磁盘。Flink 的 DataSet API 的数据处理算法借鉴了传统数据库算法的实现,例如混合散列连接(hybrid hash-join)和外部归并排序(external merge-sort)。
  • Gelly: Gelly 是一个可扩展的图形处理和分析库。Gelly 是在 DataSet API 之上实现的,并与 DataSet API 集成。因此,它能够受益于其可扩展且健壮的操作符。Gelly 提供了内置算法,如 label propagation、triangle enumeration 和 page rank 算法,也提供了一个简化自定义图算法实现的 Graph API

参考资料

Flink 事件驱动

处理函数(Process Functions)

简介

ProcessFunction 将事件处理与 Timer,State 结合在一起,使其成为流处理应用的强大构建模块。 这是使用 Flink 创建事件驱动应用程序的基础。它和 RichFlatMapFunction 十分相似, 但是增加了 Timer。

示例

如果你已经体验了 流式分析训练动手实践, 你应该记得,它是采用 TumblingEventTimeWindow 来计算每个小时内每个司机的小费总和, 像下面的示例这样:

1
2
3
4
5
// 计算每个司机每小时的小费总和
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.process(new AddTips());

使用 KeyedProcessFunction 去实现相同的操作更加直接且更有学习意义。 让我们开始用以下代码替换上面的代码:

1
2
3
4
// 计算每个司机每小时的小费总和
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.process(new PseudoWindow(Time.hours(1)));

在这个代码片段中,一个名为 PseudoWindowKeyedProcessFunction 被应用于 KeyedStream, 其结果是一个 DataStream<Tuple3<Long, Long, Float>> (与使用 Flink 内置时间窗口的实现生成的流相同)。

PseudoWindow 的总体轮廓示意如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 在时长跨度为一小时的窗口中计算每个司机的小费总和。
// 司机ID作为 key。
public static class PseudoWindow extends
KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {

private final long durationMsec;

public PseudoWindow(Time duration) {
this.durationMsec = duration.toMilliseconds();
}

@Override
// 在初始化期间调用一次。
public void open(Configuration conf) {
. . .
}

@Override
// 每个票价事件(TaxiFare-Event)输入(到达)时调用,以处理输入的票价事件。
public void processElement(
TaxiFare fare,
Context ctx,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {

. . .
}

@Override
// 当当前水印(watermark)表明窗口现在需要完成的时候调用。
public void onTimer(long timestamp,
OnTimerContext context,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {

. . .
}
}

注意事项:

  • 有几种类型的 ProcessFunctions – 不仅包括 KeyedProcessFunction,还包括 CoProcessFunctionsBroadcastProcessFunctions 等.
  • KeyedProcessFunction 是一种 RichFunction。作为 RichFunction,它可以访问使用 Managed Keyed State 所需的 opengetRuntimeContext 方法。
  • 有两个回调方法须要实现: processElementonTimer。每个输入事件都会调用 processElement 方法; 当计时器触发时调用 onTimer。它们可以是基于事件时间(event time)的 timer,也可以是基于处理时间(processing time)的 timer。 除此之外,processElementonTimer 都提供了一个上下文对象,该对象可用于与 TimerService 交互。 这两个回调还传递了一个可用于发出结果的 Collector

open() 方法

1
2
3
4
5
6
7
8
9
10
11
// 每个窗口都持有托管的 Keyed state 的入口,并且根据窗口的结束时间执行 keyed 策略。
// 每个司机都有一个单独的MapState对象。
private transient MapState<Long, Float> sumOfTips;

@Override
public void open(Configuration conf) {

MapStateDescriptor<Long, Float> sumDesc =
new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
sumOfTips = getRuntimeContext().getMapState(sumDesc);
}

由于票价事件(fare-event)可能会乱序到达,有时需要在计算输出前一个小时结果前,处理下一个小时的事件。 这样能够保证“乱序造成的延迟数据”得到正确处理(放到前一个小时中)。 实际上,如果 Watermark 延迟比窗口长度长得多,则可能有多个窗口同时打开,而不仅仅是两个。 此实现通过使用 MapState 来支持处理这一点,该 MapState 将每个窗口的结束时间戳映射到该窗口的小费总和。

processElement() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void processElement(
TaxiFare fare,
Context ctx,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {

long eventTime = fare.getEventTime();
TimerService timerService = ctx.timerService();

if (eventTime <= timerService.currentWatermark()) {
// 事件延迟;其对应的窗口已经触发。
} else {
// 将 eventTime 向上取值并将结果赋值到包含当前事件的窗口的末尾时间点。
long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);

// 在窗口完成时将启用回调
timerService.registerEventTimeTimer(endOfWindow);

// 将此票价的小费添加到该窗口的总计中。
Float sum = sumOfTips.get(endOfWindow);
if (sum == null) {
sum = 0.0F;
}
sum += fare.tip;
sumOfTips.put(endOfWindow, sum);
}
}

需要考虑的事项:

  • 延迟的事件怎么处理?watermark 后面的事件(即延迟的)正在被删除。 如果你想做一些比这更高级的操作,可以考虑使用旁路输出(Side outputs),这将在下一节中解释。
  • 本例使用一个 MapState,其中 keys 是时间戳(timestamp),并为同一时间戳设置一个 Timer。 这是一种常见的模式;它使得在 Timer 触发时查找相关信息变得简单高效。

onTimer() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public void onTimer(
long timestamp,
OnTimerContext context,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {

long driverId = context.getCurrentKey();
// 查找刚结束的一小时结果。
Float sumOfTips = this.sumOfTips.get(timestamp);

Tuple3<Long, Long, Float> result = Tuple3.of(driverId, timestamp, sumOfTips);
out.collect(result);
this.sumOfTips.remove(timestamp);
}

注意:

  • 传递给 onTimerOnTimerContext context 可用于确定当前 key。
  • 我们的 pseudo-windows 在当前 Watermark 到达每小时结束时触发,此时调用 onTimer。 这个 onTimer 方法从 sumOfTips 中删除相关的条目,这样做的效果是不可能容纳延迟的事件。 这相当于在使用 Flink 的时间窗口时将 allowedLateness 设置为零。

性能考虑

Flink 提供了为 RocksDB 优化的 MapStateListState 类型。 相对于 ValueState,更建议使用 MapStateListState,因为使用 RocksDBStateBackend 的情况下, MapStateListStateValueState 性能更好。 RocksDBStateBackend 可以附加到 ListState,而无需进行(反)序列化, 对于 MapState,每个 key/value 都是一个单独的 RocksDB 对象,因此可以有效地访问和更新 MapState

旁路输出(Side Outputs)

简介

有几个很好的理由希望从 Flink 算子获得多个输出流,如下报告条目:

  • 异常情况(exceptions)
  • 格式错误的事件(malformed events)
  • 延迟的事件(late events)
  • operator 告警(operational alerts),如与外部服务的连接超时

旁路输出(Side outputs)是一种方便的方法。除了错误报告之外,旁路输出也是实现流的 n 路分割的好方法。

示例

现在你可以对上一节中忽略的延迟事件执行某些操作。

Side output channel 与 OutputTag<T> 相关联。这些标记拥有自己的名称,并与对应 DataStream 类型一致。

1
private static final OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {};

上面显示的是一个静态 OutputTag<TaxiFare> ,当在 PseudoWindowprocessElement 方法中发出延迟事件时,可以引用它:

1
2
3
4
5
6
if (eventTime <= timerService.currentWatermark()) {
// 事件延迟,其对应的窗口已经触发。
ctx.output(lateFares, fare);
} else {
. . .
}

以及当在作业的 main 中从该旁路输出访问流时:

1
2
3
4
5
6
// 计算每个司机每小时的小费总和
SingleOutputStreamOperator hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.process(new PseudoWindow(Time.hours(1)));

hourlyTips.getSideOutput(lateFares).print();

或者,可以使用两个同名的 OutputTag 来引用同一个旁路输出,但如果这样做,它们必须具有相同的类型。

结语

在本例中,你已经了解了如何使用 ProcessFunction 重新实现一个简单的时间窗口。 当然,如果 Flink 内置的窗口 API 能够满足你的开发需求,那么一定要优先使用它。 但如果你发现自己在考虑用 Flink 的窗口做些错综复杂的事情,不要害怕自己动手。

此外,ProcessFunctions 对于计算分析之外的许多其他用例也很有用。 下面的实践练习提供了一个完全不同的例子。

ProcessFunctions 的另一个常见用例是清理过时 State。如果你回想一下 Rides and Fares Exercise , 其中使用 RichCoFlatMapFunction 来计算简单 Join,那么示例方案假设 TaxiRides 和 TaxiFares 两个事件是严格匹配为一个有效 数据对(必须同时出现)并且每一组这样的有效数据对都和一个唯一的 rideId 严格对应。如果数据对中的某个 TaxiRides 事件(TaxiFares 事件) 丢失,则同一 rideId 对应的另一个出现的 TaxiFares 事件(TaxiRides 事件)对应的 State 则永远不会被清理掉。 所以这里可以使用 KeyedCoProcessFunction 的实现代替它(RichCoFlatMapFunction),并且可以使用计时器来检测和清除任何过时 的 State。

参考资料

Flink 架构

Apache Flink 是一个分布式系统,它需要计算资源来执行应用程序。Flink 集成了所有常见的集群资源管理器,例如 Hadoop YARNApache MesosKubernetes,但同时也可以作为独立集群运行。

Flink 被设计为能够很好地工作在上述每个资源管理器中,这是通过资源管理器特定(resource-manager-specific)的部署模式实现的。Flink 可以采用与当前资源管理器相适应的方式进行交互。

部署 Flink 应用程序时,Flink 会根据应用程序配置的并行性自动标识所需的资源,并从资源管理器请求这些资源。在发生故障的情况下,Flink 通过请求新资源来替换发生故障的容器。提交或控制应用程序的所有通信都是通过 REST 调用进行的,这可以简化 Flink 与各种环境中的集成。

运行任意规模应用

Flink 旨在任意规模上运行有状态流式应用。因此,应用程序被并行化为可能数千个任务,这些任务分布在集群中并发执行。所以应用程序能够充分利用无尽的 CPU、内存、磁盘和网络 IO。而且 Flink 很容易维护非常大的应用程序状态。其异步和增量的检查点算法对处理延迟产生最小的影响,同时保证精确一次状态的一致性。

Flink 用户报告了其生产环境中一些令人印象深刻的扩展性数字

  • 处理每天处理数万亿的事件,
  • 应用维护几 TB 大小的状态
  • 应用在数千个内核上运行

利用内存性能

有状态的 Flink 程序针对本地状态访问进行了优化。任务的状态始终保留在内存中,如果状态大小超过可用内存,则会保存在能高效访问的磁盘数据结构中。任务通过访问本地(通常在内存中)状态来进行所有的计算,从而产生非常低的处理延迟。Flink 通过定期和异步地对本地状态进行持久化存储来保证故障场景下精确一次的状态一致性。

img

Flink 运行时由两种类型的进程组成:一个 JobManager 和一个或者多个 **TaskManager**。

The processes involved in executing a Flink dataflow

Client 不是运行时和程序执行的一部分,而是用于准备数据流并将其发送给 JobManager。之后,客户端可以断开连接(_分离模式_),或保持连接来接收进程报告(_附加模式_)。客户端可以作为触发执行 Java/Scala 程序的一部分运行,也可以在命令行进程 ./bin/flink run ... 中运行。

可以通过多种方式启动 JobManagerTaskManager:直接在机器上作为 standalone 集群启动、在容器中启动、或者通过 YARN 等资源框架管理并启动。TaskManager 连接到 JobManager,宣布自己可用,并被分配工作。

JobManager

JobManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责,它决定

  • 何时调度下一个 task(或一组 task)
  • 对完成的 task 或执行失败做出反应
  • 协调 checkpoint
  • 协调从失败中恢复
  • 等等

JobManager 由三个不同的组件组成:

  • ResourceManager:负责 Flink 集群中的资源提供、回收、分配 - 它管理 task slots,这是 Flink 集群中资源调度的单位(请参考 TaskManager。Flink 为不同的环境和资源提供者(例如 YARN、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。在 standalone 设置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行启动新的 TaskManager。

  • Dispatcher:提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执行信息。

  • JobMaster:负责管理单个 JobGraph 的执行。Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster。

始终至少有一个 JobManager。高可用(HA)设置中可能有多个 JobManager,其中一个始终是 _leader_,其他的则是 _standby_(请参考 高可用(HA))。

TaskManager

_TaskManager_(也称为 _worker_)执行作业流的 task,并且缓存和交换数据流。

必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task _slot_。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子(请参考Tasks 和算子链)。

Tasks 和算子链

对于分布式执行,Flink 将算子的 subtasks 链接成 _tasks_。每个 task 由一个线程执行。将算子链接成 task 是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量。链行为是可以配置的;请参考链文档以获取详细信息。

下图中样例数据流用 5 个 subtask 执行,因此有 5 个并行线程。

Operator chaining into Tasks

Task Slots 和资源

每个 worker(TaskManager)都是一个 _JVM 进程_,可以在单独的线程中执行一个或多个 subtask。为了控制一个 TaskManager 中接受多少个 task,就有了所谓的 task slots(至少一个)。

每个 task slot 代表 TaskManager 中资源的固定子集。例如,具有 3 个 slot 的 TaskManager,会将其托管内存 1/3 用于每个 slot。分配资源意味着 subtask 不会与其他作业的 subtask 竞争托管内存,而是具有一定数量的保留托管内存。注意此处没有 CPU 隔离;当前 slot 仅分离 task 的托管内存。

通过调整 task slot 的数量,用户可以定义 subtask 如何互相隔离。每个 TaskManager 有一个 slot,这意味着每个 task 组都在单独的 JVM 中运行(例如,可以在单独的容器中启动)。具有多个 slot 意味着更多 subtask 共享同一 JVM。同一 JVM 中的 task 共享 TCP 连接(通过多路复用)和心跳信息。它们还可以共享数据集和数据结构,从而减少了每个 task 的开销。

A TaskManager with Task Slots and Tasks

默认情况下,Flink 允许 subtask 共享 slot,即便它们是不同的 task 的 subtask,只要是来自于同一作业即可。结果就是一个 slot 可以持有整个作业管道。允许 slot 共享有两个主要优点:

  • Flink 集群所需的 task slot 和作业中使用的最大并行度恰好一样。无需计算程序总共包含多少个 task(具有不同并行度)。
  • 容易获得更好的资源利用。如果没有 slot 共享,非密集 subtask(_source/map()_)将阻塞和密集型 subtask(_window_) 一样多的资源。通过 slot 共享,我们示例中的基本并行度从 2 增加到 6,可以充分利用分配的资源,同时确保繁重的 subtask 在 TaskManager 之间公平分配。

TaskManagers with shared Task Slots

Flink 应用程序 是从其 main() 方法产生的一个或多个 Flink 作业的任何用户程序。这些作业的执行可以在本地 JVM(LocalEnvironment)中进行,或具有多台机器的集群的远程设置(RemoteEnvironment)中进行。对于每个程序,ExecutionEnvironment 提供了一些方法来控制作业执行(例如设置并行度)并与外界交互(请参考 Flink 程序剖析 )。

Flink 应用程序的作业可以被提交到长期运行的 Flink Session 集群、专用的 Flink Job 集群Flink Application 集群。这些选项之间的差异主要与集群的生命周期和资源隔离保证有关。

  • 集群生命周期:在 Flink Session 集群中,客户端连接到一个预先存在的、长期运行的集群,该集群可以接受多个作业提交。即使所有作业完成后,集群(和 JobManager)仍将继续运行直到手动停止 session 为止。因此,Flink Session 集群的寿命不受任何 Flink 作业寿命的约束。
  • 资源隔离:TaskManager slot 由 ResourceManager 在提交作业时分配,并在作业完成时释放。由于所有作业都共享同一集群,因此在集群资源方面存在一些竞争 — 例如提交工作阶段的网络带宽。此共享设置的局限性在于,如果 TaskManager 崩溃,则在此 TaskManager 上运行 task 的所有作业都将失败;类似的,如果 JobManager 上发生一些致命错误,它将影响集群中正在运行的所有作业。
  • 其他注意事项:拥有一个预先存在的集群可以节省大量时间申请资源和启动 TaskManager。有种场景很重要,作业执行时间短并且启动时间长会对端到端的用户体验产生负面的影响 — 就像对简短查询的交互式分析一样,希望作业可以使用现有资源快速执行计算。

以前,Flink Session 集群也被称为 session 模式下的 Flink 集群。

  • 集群生命周期:在 Flink Job 集群中,可用的集群管理器(例如 YARN)用于为每个提交的作业启动一个集群,并且该集群仅可用于该作业。在这里,客户端首先从集群管理器请求资源启动 JobManager,然后将作业提交给在这个进程中运行的 Dispatcher。然后根据作业的资源请求惰性的分配 TaskManager。一旦作业完成,Flink Job 集群将被拆除。
  • 资源隔离:JobManager 中的致命错误仅影响在 Flink Job 集群中运行的一个作业。
  • 其他注意事项:由于 ResourceManager 必须应用并等待外部资源管理组件来启动 TaskManager 进程和分配资源,因此 Flink Job 集群更适合长期运行、具有高稳定性要求且对较长的启动时间不敏感的大型作业。

以前,Flink Job 集群也被称为 job (or per-job) 模式下的 Flink 集群。

Kubernetes 不支持 Flink Job 集群。 请参考 Standalone KubernetesNative Kubernetes

  • 集群生命周期:Flink Application 集群是专用的 Flink 集群,仅从 Flink 应用程序执行作业,并且 main()方法在集群上而不是客户端上运行。提交作业是一个单步骤过程:无需先启动 Flink 集群,然后将作业提交到现有的 session 集群;相反,将应用程序逻辑和依赖打包成一个可执行的作业 JAR 中,并且集群入口(ApplicationClusterEntryPoint)负责调用 main()方法来提取 JobGraph。例如,这允许你像在 Kubernetes 上部署任何其他应用程序一样部署 Flink 应用程序。因此,Flink Application 集群的寿命与 Flink 应用程序的寿命有关。
  • 资源隔离:在 Flink Application 集群中,ResourceManager 和 Dispatcher 作用于单个的 Flink 应用程序,相比于 Flink Session 集群,它提供了更好的隔离。

Flink Job 集群可以看做是 Flink Application 集群”客户端运行“的替代方案。

默认情况下,每个 Flink 集群只有一个 JobManager 实例。这会导致 _单点故障(SPOF)_:如果 JobManager 崩溃,则不能提交任何新程序,运行中的程序也会失败。

使用 JobManager 高可用模式,你可以从 JobManager 失败中恢复,从而消除单点故障。

如何启用集群高可用

JobManager 高可用是指,在任何时候都有一个 JobManager Leader,如果 Leader 出现故障,则有多个备用 JobManager 来接管领导。这解决了单点故障问题,只要有备用 JobManager 担任领导者,程序就可以继续运行。

如下是一个使用三个 JobManager 实例的例子:

img

Flink 的 高可用服务 封装了所需的服务,使一切可以正常工作:

  • 领导者选举:从 n 个候选者中选出一个领导者
  • 服务发现:检索当前领导者的地址
  • 状态持久化:继承程序恢复作业所需的持久化状态(JobGraphs、用户代码 jar、已完成的检查点)

Flink 提供了两种高可用服务实现:

  • ZooKeeper:每个 Flink 集群部署都可以使用 ZooKeeper HA 服务。它们需要一个运行的 ZooKeeper 复制组(quorum)。
  • Kubernetes:Kubernetes HA 服务只能运行在 Kubernetes 上。

高可用数据生命周期

为了恢复提交的作业,Flink 持久化元数据和 job 组件。高可用数据将一直保存,直到相应的作业执行成功、被取消或最终失败。当这些情况发生时,将删除所有高可用数据,包括存储在高可用服务中的元数据。

参考资料

Flink 简介

关键概念:源源不断的流式数据处理、事件时间、有状态流处理和状态快照

流处理

任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。

数据可以被作为 无界 或者 有界 流来处理。

  1. 无界流 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
  2. 有界流 有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。

Bounded and unbounded streams

Apache Flink 擅长处理无界和有界数据集 精确的时间控制和状态化使得 Flink 的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。

批处理是有界数据流处理的范例。在这种模式下,你可以选择在计算结果输出之前输入整个数据集,这也就意味着你可以对整个数据集的数据进行排序、统计或汇总计算后再输出结果。

流处理正相反,其涉及无界数据流。至少理论上来说,它的数据输入永远不会结束,因此程序必须持续不断地对到达的数据进行处理。

在 Flink 中,应用程序由用户自定义算子转换而来的流式 dataflows 所组成。这些流式 dataflows 形成了有向图,以一个或多个(source)开始,并以一个或多个(sink)结束。

A DataStream program, and its dataflow.

通常,程序代码中的 transformation 和 dataflow 中的算子(operator)之间是一一对应的。但有时也会出现一个 transformation 包含多个算子的情况,如上图所示。

Flink 应用程序可以消费来自消息队列或分布式日志这类流式数据源(例如 Apache Kafka 或 Kinesis)的实时数据,也可以从各种的数据源中消费有界的历史数据。同样,Flink 应用程序生成的结果流也可以发送到各种数据汇中。

Flink application with sources and sinks

并行 Dataflows

Flink 程序本质上是分布式并行程序。在程序执行期间,一个流有一个或多个流分区(Stream Partition),每个算子有一个或多个算子子任务(Operator Subtask)。每个子任务彼此独立,并在不同的线程中运行,或在不同的计算机或容器中运行。

算子子任务数就是其对应算子的并行度。在同一程序中,不同算子也可能具有不同的并行度。

A parallel dataflow

Flink 算子之间可以通过一对一(_直传_)模式或重新分发模式传输数据:

  • 一对一模式(例如上图中的 Sourcemap() 算子之间)可以保留元素的分区和顺序信息。这意味着 map() 算子的 subtask[1] 输入的数据以及其顺序与 Source 算子的 subtask[1] 输出的数据和顺序完全相同,即同一分区的数据只会进入到下游算子的同一分区。
  • 重新分发模式(例如上图中的 map()keyBy/window 之间,以及 keyBy/windowSink 之间)则会更改数据所在的流分区。当你在程序中选择使用不同的 _transformation_,每个算子子任务也会根据不同的 transformation 将数据发送到不同的目标子任务。例如以下这几种 transformation 和其对应分发数据的模式:_keyBy()_(通过散列键重新分区)、_broadcast()_(广播)或 _rebalance()(随机重新分发)。在重新分发数据的过程中,元素只有在每对输出和输入子任务之间才能保留其之间的顺序信息(例如,_keyBy/window 的 subtask[2] 接收到的 map() 的 subtask[1] 中的元素都是有序的)。因此,上图所示的 keyBy/windowSink 算子之间数据的重新分发时,不同键(key)的聚合结果到达 Sink 的顺序是不确定的。

自定义时间流处理

对于大多数流数据处理应用程序而言,能够使用处理实时数据的代码重新处理历史数据并产生确定并一致的结果非常有价值。

在处理流式数据时,我们通常更需要关注事件本身发生的顺序而不是事件被传输以及处理的顺序,因为这能够帮助我们推理出一组事件(事件集合)是何时发生以及结束的。例如电子商务交易或金融交易中涉及到的事件集合。

为了满足上述这类的实时流处理场景,我们通常会使用记录在数据流中的事件时间的时间戳,而不是处理数据的机器时钟的时间戳。

有状态流处理

Flink 中的算子可以是有状态的。这意味着如何处理一个事件可能取决于该事件之前所有事件数据的累积结果。Flink 中的状态不仅可以用于简单的场景(例如统计仪表板上每分钟显示的数据),也可以用于复杂的场景(例如训练作弊检测模型)。

Flink 应用程序可以在分布式集群上并行运行,其中每个算子的各个并行实例会在单独的线程中独立运行,并且通常情况下是会在不同的机器上运行。

有状态算子的并行实例组在存储其对应状态时通常是按照键(key)进行分片存储的。每个并行实例算子负责处理一组特定键的事件数据,并且这组键对应的状态会保存在本地。

如下图的 Flink 作业,其前三个算子的并行度为 2,最后一个 sink 算子的并行度为 1,其中第三个算子是有状态的,并且你可以看到第二个算子和第三个算子之间是全互联的(fully-connected),它们之间通过网络进行数据分发。通常情况下,实现这种类型的 Flink 程序是为了通过某些键对数据流进行分区,以便将需要一起处理的事件进行汇合,然后做统一计算处理。

State is sharded

Flink 应用程序的状态访问都在本地进行,因为这有助于其提高吞吐量和降低延迟。通常情况下 Flink 应用程序都是将状态存储在 JVM 堆上,但如果状态太大,我们也可以选择将其以结构化数据格式存储在高速磁盘中。

State is local

通过状态快照实现的容错

通过状态快照和流重放两种方式的组合,Flink 能够提供可容错的,精确一次计算的语义。这些状态快照在执行时会获取并存储分布式 pipeline 中整体的状态,它会将数据源中消费数据的偏移量记录下来,并将整个 job graph 中算子获取到该数据(记录的偏移量对应的数据)时的状态记录并存储下来。当发生故障时,Flink 作业会恢复上次存储的状态,重置数据源从状态中记录的上次消费的偏移量开始重新进行消费处理。而且状态快照在执行时会异步获取状态并存储,并不会阻塞正在进行的数据处理逻辑。

状态

只有在每一个单独的事件上进行转换操作的应用才不需要状态,换言之,每一个具有一定复杂度的流处理应用都是有状态的。任何运行基本业务逻辑的流处理应用都需要在一定时间内存储所接收的事件或中间结果,以供后续的某个时间点(例如收到下一个事件或者经过一段特定时间)进行访问并进行后续处理。

img

应用状态是 Flink 中的一等公民,Flink 提供了许多状态管理相关的特性支持,其中包括:

  • 多种状态基础类型:Flink 为多种不同的数据结构提供了相对应的状态基础类型,例如原子值(value),列表(list)以及映射(map)。开发者可以基于处理函数对状态的访问方式,选择最高效、最适合的状态基础类型。
  • 插件化的 State Backend:State Backend 负责管理应用程序状态,并在需要的时候进行 checkpoint。Flink 支持多种 state backend,可以将状态存在内存或者 RocksDB。RocksDB 是一种高效的嵌入式、持久化键值存储引擎。Flink 也支持插件式的自定义 state backend 进行状态存储。
  • 精确一次语义:Flink 的 checkpoint 和故障恢复算法保证了故障发生后应用状态的一致性。因此,Flink 能够在应用程序发生故障时,对应用程序透明,不造成正确性的影响。
  • 超大数据量状态:Flink 能够利用其异步以及增量式的 checkpoint 算法,存储数 TB 级别的应用状态。
  • 可弹性伸缩的应用:Flink 能够通过在更多或更少的工作节点上对状态进行重新分布,支持有状态应用的分布式的横向伸缩。

时间

时间语义

Flink 支持以下三种时间语义:

  • **事件时间(event time)**: 事件产生的时间,记录的是设备生产(或者存储)事件的时间
  • **摄取时间(ingestion time)**: Flink 读取事件时记录的时间
  • **处理时间(processing time)**: Flink pipeline 中具体算子处理事件的时间

为了获得可重现的结果,例如在计算过去的特定一天里第一个小时股票的最高价格时,我们应该使用事件时间。这样的话,无论什么时间去计算都不会影响输出结果。然而如果使用处理时间的话,实时应用程序的结果是由程序运行的时间所决定。多次运行基于处理时间的实时程序,可能得到的结果都不相同,也可能会导致再次分析历史数据或者测试新代码变得异常困难。

Event Time

如果想要使用事件时间,需要额外给 Flink 提供一个时间戳提取器和 Watermark 生成器,Flink 将使用它们来跟踪事件时间的进度。

Watermark

让我们通过一个简单的示例来演示为什么需要 watermarks 及其工作方式。

在此示例中,我们将看到带有混乱时间戳的事件流,如下所示。显示的数字表达的是这些事件实际发生时间的时间戳。到达的第一个事件发生在时间 4,随后发生的事件发生在更早的时间 2,依此类推:

··· 23 19 22 24 21 14 17 13 12 15 9 11 7 2 4 →

假设我们要对数据流排序,我们想要达到的目的是:应用程序应该在数据流里的事件到达时就有一个算子(我们暂且称之为排序)开始处理事件,这个算子所输出的流是按照时间戳排序好的。

让我们重新审视这些数据:

(1) 我们的排序器看到的第一个事件的时间戳是 4,但是我们不能立即将其作为已排序的流释放。因为我们并不能确定它是有序的,并且较早的事件有可能并未到达。事实上,如果站在上帝视角,我们知道,必须要等到时间戳为 2 的元素到来时,排序器才可以有事件输出。

需要一些缓冲,需要一些时间,但这都是值得的

(2) 接下来的这一步,如果我们选择的是固执的等待,我们永远不会有结果。首先,我们看到了时间戳为 4 的事件,然后看到了时间戳为 2 的事件。可是,时间戳小于 2 的事件接下来会不会到来呢?可能会,也可能不会。再次站在上帝视角,我们知道,我们永远不会看到时间戳 1。

最终,我们必须勇于承担责任,并发出指令,把带有时间戳 2 的事件作为已排序的事件流的开始

(3) 然后,我们需要一种策略,该策略定义:对于任何给定时间戳的事件,Flink 何时停止等待较早事件的到来。

这正是 watermarks 的作用 — 它们定义何时停止等待较早的事件。

Flink 中事件时间的处理取决于 _watermark 生成器_,后者将带有时间戳的特殊元素插入流中形成 _watermarks_。事件时间 t 的 watermark 代表 t 之前(很可能)都已经到达。

当 watermark 以 2 或更大的时间戳到达时,事件流的排序器应停止等待,并输出 2 作为已经排序好的流。

(4) 我们可能会思考,如何决定 watermarks 的不同生成策略

每个事件都会延迟一段时间后到达,然而这些延迟有所不同,有些事件可能比其他事件延迟得更多。一种简单的方法是假定这些延迟受某个最大延迟的限制。Flink 将此策略称为 最大无序边界 (bounded-out-of-orderness) watermark。当然,我们可以想像出更好的生成 watermark 的方法,但是对于大多数应用而言,固定延迟策略已经足够了。

延迟 VS 正确性

watermarks 给了开发者流处理的一种选择,它们使开发人员在开发应用程序时可以控制延迟和完整性之间的权衡。与批处理不同,批处理中的奢侈之处在于可以在产生任何结果之前完全了解输入,而使用流式传输,我们不被允许等待所有的时间都产生了,才输出排序好的数据,这与流相违背。

我们可以把 watermarks 的边界时间配置的相对较短,从而冒着在输入了解不完全的情况下产生结果的风险-即可能会很快产生错误结果。或者,你可以等待更长的时间,并利用对输入流的更全面的了解来产生结果。

当然也可以实施混合解决方案,先快速产生初步结果,然后在处理其他(最新)数据时向这些结果提供更新。对于有一些对延迟的容忍程度很低,但是又对结果有很严格的要求的场景下,或许是一个福音。

延迟

延迟是相对于 watermarks 定义的。Watermark(t) 表示事件流的时间已经到达了 t; watermark 之后的时间戳 ≤ t 的任何事件都被称之为延迟事件。

使用 Watermarks

如果想要使用基于带有事件时间戳的事件流,Flink 需要知道与每个事件相关的时间戳,而且流必须包含 watermark。

动手练习中使用的出租车数据源已经为我们处理了这些详细信息。但是,在您自己的应用程序中,您将必须自己进行处理,这通常是通过实现一个类来实现的,该类从事件中提取时间戳,并根据需要生成 watermarks。最简单的方法是使用 WatermarkStrategy

1
2
3
4
5
6
7
8
DataStream<Event> stream = ...

WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.timestamp);

DataStream<Event> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(strategy);

窗口

我们在操作无界数据流时,经常需要应对以下问题,我们经常把无界数据流分解成有界数据流聚合分析:

  • 每分钟的浏览量
  • 每位用户每周的会话数
  • 每个传感器每分钟的最高温度

用 Flink 计算窗口分析取决于两个主要的抽象操作:_Window Assigners_,将事件分配给窗口(根据需要创建新的窗口对象),以及 _Window Functions_,处理窗口内的数据。

Flink 的窗口 API 还具有 TriggersEvictors 的概念,Triggers 确定何时调用窗口函数,而 Evictors 则可以删除在窗口中收集的元素。

举一个简单的例子,我们一般这样使用键控事件流(基于 key 分组的输入事件流):

1
2
3
4
stream.
.keyBy(<key selector>)
.window(<window assigner>)
.reduce|aggregate|process(<window function>)

您不是必须使用键控事件流(keyed stream),但是值得注意的是,如果不使用键控事件流,我们的程序就不能 并行 处理。

1
2
3
stream.
.windowAll(<window assigner>)
.reduce|aggregate|process(<window function>)

窗口分配器

Flink 有一些内置的窗口分配器,如下所示:

Window assigners

通过一些示例来展示关于这些窗口如何使用,或者如何区分它们:

  • 滚动时间窗口
    • 每分钟页面浏览量
    • TumblingEventTimeWindows.of(Time.minutes(1))
  • 滑动时间窗口
    • 每 10 秒钟计算前 1 分钟的页面浏览量
    • SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))
  • 会话窗口
    • 每个会话的网页浏览量,其中会话之间的间隔至少为 30 分钟
    • EventTimeSessionWindows.withGap(Time.minutes(30))

以下都是一些可以使用的间隔时间 Time.milliseconds(n), Time.seconds(n), Time.minutes(n), Time.hours(n), 和 Time.days(n)

基于时间的窗口分配器(包括会话时间)既可以处理 事件时间,也可以处理 处理时间。这两种基于时间的处理没有哪一个更好,我们必须折衷。使用 处理时间,我们必须接受以下限制:

  • 无法正确处理历史数据,
  • 无法正确处理超过最大无序边界的数据,
  • 结果将是不确定的,

但是有自己的优势,较低的延迟。

使用基于计数的窗口时,请记住,只有窗口内的事件数量到达窗口要求的数值时,这些窗口才会触发计算。尽管可以使用自定义触发器自己实现该行为,但无法应对超时和处理部分窗口。

我们可能在有些场景下,想使用全局 window assigner 将每个事件(相同的 key)都分配给某一个指定的全局窗口。 很多情况下,一个比较好的建议是使用 ProcessFunction,具体介绍在这里

窗口应用函数

我们有三种最基本的操作窗口内的事件的选项:

  1. 像批量处理,ProcessWindowFunction 会缓存 Iterable 和窗口内容,供接下来全量计算;
  2. 或者像流处理,每一次有事件被分配到窗口时,都会调用 ReduceFunction 或者 AggregateFunction 来增量计算;
  3. 或者结合两者,通过 ReduceFunction 或者 AggregateFunction 预聚合的增量计算结果在触发窗口时, 提供给 ProcessWindowFunction 做全量计算。

接下来展示一段 1 和 3 的示例,每一个实现都是计算传感器的最大值。在每一个一分钟大小的事件时间窗口内, 生成一个包含 (key,end-of-window-timestamp, max_value) 的一组结果。

ProcessWindowFunction 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
DataStream<SensorReading> input = ...

input
.keyBy(x -> x.key)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(new MyWastefulMax());

public static class MyWastefulMax extends ProcessWindowFunction<
SensorReading, // 输入类型
Tuple3<String, Long, Integer>, // 输出类型
String, // 键类型
TimeWindow> { // 窗口类型

@Override
public void process(
String key,
Context context,
Iterable<SensorReading> events,
Collector<Tuple3<String, Long, Integer>> out) {

int max = 0;
for (SensorReading event : events) {
max = Math.max(event.value, max);
}
out.collect(Tuple3.of(key, context.window().getEnd(), max));
}
}

在当前实现中有一些值得关注的地方:

  • Flink 会缓存所有分配给窗口的事件流,直到触发窗口为止。这个操作可能是相当昂贵的。
  • Flink 会传递给 ProcessWindowFunction 一个 Context 对象,这个对象内包含了一些窗口信息。Context 接口 展示大致如下:
1
2
3
4
5
6
7
8
9
public abstract class Context implements java.io.Serializable {
public abstract W window();

public abstract long currentProcessingTime();
public abstract long currentWatermark();

public abstract KeyedStateStore windowState();
public abstract KeyedStateStore globalState();
}

windowStateglobalState 可以用来存储当前的窗口的 key、窗口或者当前 key 的每一个窗口信息。这在一些场景下会很有用,试想,我们在处理当前窗口的时候,可能会用到上一个窗口的信息。

增量聚合示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
DataStream<SensorReading> input = ...

input
.keyBy(x -> x.key)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(new MyReducingMax(), new MyWindowFunction());

private static class MyReducingMax implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r1 : r2;
}
}

private static class MyWindowFunction extends ProcessWindowFunction<
SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {

@Override
public void process(
String key,
Context context,
Iterable<SensorReading> maxReading,
Collector<Tuple3<String, Long, SensorReading>> out) {

SensorReading max = maxReading.iterator().next();
out.collect(Tuple3.of(key, context.window().getEnd(), max));
}
}

请注意 Iterable<SensorReading> 将只包含一个读数 – MyReducingMax 计算出的预先汇总的最大值。

晚到的事件

默认场景下,超过最大无序边界的事件会被删除,但是 Flink 给了我们两个选择去控制这些事件。

您可以使用一种称为旁路输出 的机制来安排将要删除的事件收集到侧输出流中,这里是一个示例:

1
2
3
4
5
6
7
8
9
OutputTag<Event> lateTag = new OutputTag<Event>("late"){};

SingleOutputStreamOperator<Event> result = stream.
.keyBy(...)
.window(...)
.sideOutputLateData(lateTag)
.process(...);

DataStream<Event> lateStream = result.getSideOutput(lateTag);

我们还可以指定 允许的延迟(allowed lateness) 的间隔,在这个间隔时间内,延迟的事件将会继续分配给窗口(同时状态会被保留),默认状态下,每个延迟事件都会导致窗口函数被再次调用(有时也称之为 late firing )。

默认情况下,允许的延迟为 0。换句话说,watermark 之后的元素将被丢弃(或发送到侧输出流)。

举例说明:

1
2
3
4
5
stream.
.keyBy(...)
.window(...)
.allowedLateness(Time.seconds(10))
.process(...);

当允许的延迟大于零时,只有那些超过最大无序边界以至于会被丢弃的事件才会被发送到侧输出流(如果已配置)。

深入了解窗口操作

Flink 的窗口 API 某些方面有一些奇怪的行为,可能和我们预期的行为不一致。 根据 Flink 用户邮件列表 和其他地方一些频繁被问起的问题, 以下是一些有关 Windows 的底层事实,这些信息可能会让您感到惊讶。

滑动窗口是通过复制来实现的

滑动窗口分配器可以创建许多窗口对象,并将每个事件复制到每个相关的窗口中。例如,如果您每隔 15 分钟就有 24 小时的滑动窗口,则每个事件将被复制到 4 * 24 = 96 个窗口中。

时间窗口会和时间对齐

仅仅因为我们使用的是一个小时的处理时间窗口并在 12:05 开始运行您的应用程序,并不意味着第一个窗口将在 1:05 关闭。第一个窗口将长 55 分钟,并在 1:00 关闭。

请注意,滑动窗口和滚动窗口分配器所采用的 offset 参数可用于改变窗口的对齐方式。有关详细的信息,请参见 滚动窗口滑动窗口

window 后面可以接 window

比如说:

1
2
3
4
5
6
stream
.keyBy(t -> t.key)
.window(<window assigner>)
.reduce(<reduce function>)
.windowAll(<same window assigner>)
.reduce(<same reduce function>)

可能我们会猜测以 Flink 的能力,想要做到这样看起来是可行的(前提是你使用的是 ReduceFunction 或 AggregateFunction ),但不是。

之所以可行,是因为时间窗口产生的事件是根据窗口结束时的时间分配时间戳的。例如,一个小时小时的窗口所产生的所有事件都将带有标记一个小时结束的时间戳。后面的窗口内的数据消费和前面的流产生的数据是一致的。

空的时间窗口不会输出结果

事件会触发窗口的创建。换句话说,如果在特定的窗口内没有事件,就不会有窗口,就不会有输出结果。

延迟时间会导致延迟聚合

会话窗口的实现是基于窗口的一个抽象能力,窗口可以 _聚合_。会话窗口中的每个数据在初始被消费时,都会被分配一个新的窗口,但是如果窗口之间的间隔足够小,多个窗口就会被聚合。延迟事件可以弥合两个先前分开的会话间隔,从而产生一个虽然有延迟但是更加准确地结果。

参考资料

Java 基础语法特性

注释

空白行,或者注释的内容,都会被 Java 编译器忽略掉。

Java 支持多种注释方式,下面的示例展示了各种注释的使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class HelloWorld {
/*
* JavaDoc 注释
*/
public static void main(String[] args) {
// 单行注释
/* 多行注释:
1. 注意点a
2. 注意点b
*/
System.out.println("Hello World");
}
}

基本数据类型

img

👉 扩展阅读:深入理解 Java 基本数据类型

变量和常量

Java 支持的变量类型有:

  • 局部变量 - 类方法中的变量。
  • 成员变量(也叫实例变量) - 类方法外的变量,不过没有 static 修饰。
  • 静态变量(也叫类变量) - 类方法外的变量,用 static 修饰。

特性对比:

局部变量 实例变量(也叫成员变量) 类变量(也叫静态变量)
局部变量声明在方法、构造方法或者语句块中。 实例变量声明在方法、构造方法和语句块之外。 类变量声明在方法、构造方法和语句块之外。并且以 static 修饰。
局部变量在方法、构造方法、或者语句块被执行的时候创建,当它们执行完成后,变量将会被销毁。 实例变量在对象创建的时候创建,在对象被销毁的时候销毁。 类变量在第一次被访问时创建,在程序结束时销毁。
局部变量没有默认值,所以必须经过初始化,才可以使用。 实例变量具有默认值。数值型变量的默认值是 0,布尔型变量的默认值是 false,引用类型变量的默认值是 null。变量的值可以在声明时指定,也可以在构造方法中指定。 类变量具有默认值。数值型变量的默认值是 0,布尔型变量的默认值是 false,引用类型变量的默认值是 null。变量的值可以在声明时指定,也可以在构造方法中指定。此外,静态变量还可以在静态语句块中初始化。
对于局部变量,如果是基本类型,会把值直接存储在栈;如果是引用类型,会把其对象存储在堆,而把这个对象的引用(指针)存储在栈。 实例变量存储在堆。 类变量存储在静态存储区。
访问修饰符不能用于局部变量。 访问修饰符可以用于实例变量。 访问修饰符可以用于类变量。
局部变量只在声明它的方法、构造方法或者语句块中可见。 实例变量对于类中的方法、构造方法或者语句块是可见的。一般情况下应该把实例变量设为私有。通过使用访问修饰符可以使实例变量对子类可见。 与实例变量具有相似的可见性。但为了对类的使用者可见,大多数静态变量声明为 public 类型。
实例变量可以直接通过变量名访问。但在静态方法以及其他类中,就应该使用完全限定名:ObejectReference.VariableName。 静态变量可以通过:ClassName.VariableName 的方式访问。
无论一个类创建了多少个对象,类只拥有类变量的一份拷贝。
类变量除了被声明为常量外很少使用。

变量修饰符

  • 访问级别修饰符
    • 如果变量是实例变量或类变量,可以添加访问级别修饰符(public/protected/private)
  • 静态修饰符
    • 如果变量是类变量,需要添加 static 修饰
  • final
    • 如果变量使用 final 修饰符,就表示这是一个常量,不能被修改。

数组

img

👉 扩展阅读:深入理解 Java 数组

枚举

img

👉 扩展阅读:深入理解 Java 枚举

操作符

Java 中支持的操作符类型如下:

img

👉 扩展阅读:Java 操作符

方法

img

👉 扩展阅读:深入理解 Java 方法

控制语句

img

👉 扩展阅读:Java 控制语句

异常

img

img

👉 扩展阅读:深入理解 Java 异常

泛型

img

👉 扩展阅读:深入理解 Java 泛型

反射

img

img

👉 扩展阅读:深入理解 Java 反射和动态代理

注解

img

img

img

img

👉 扩展阅读:深入理解 Java 注解

序列化

img

👉 扩展阅读:Java 序列化

Elasticsearch 优化

Elasticsearch 是当前流行的企业级搜索引擎,设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。作为一个开箱即用的产品,在生产环境上线之后,我们其实不一定能确保其的性能和稳定性。如何根据实际情况提高服务的性能,其实有很多技巧。这章我们分享从实战经验中总结出来的 elasticsearch 性能优化,主要从硬件配置优化、索引优化设置、查询方面优化、数据结构优化、集群架构优化等方面讲解。

硬件配置优化

升级硬件设备配置一直都是提高服务能力最快速有效的手段,在系统层面能够影响应用性能的一般包括三个因素:CPU、内存和 IO,可以从这三方面进行 ES 的性能优化工作。

CPU 配置

一般说来,CPU 繁忙的原因有以下几个:

  1. 线程中有无限空循环、无阻塞、正则匹配或者单纯的计算;
  2. 发生了频繁的 GC;
  3. 多线程的上下文切换;

大多数 Elasticsearch 部署往往对 CPU 要求不高。因此,相对其它资源,具体配置多少个(CPU)不是那么关键。你应该选择具有多个内核的现代处理器,常见的集群使用 2 到 8 个核的机器。如果你要在更快的 CPUs 和更多的核数之间选择,选择更多的核数更好。多个内核提供的额外并发远胜过稍微快一点点的时钟频率。

内存配置

如果有一种资源是最先被耗尽的,它可能是内存。排序和聚合都很耗内存,所以有足够的堆空间来应付它们是很重要的。即使堆空间是比较小的时候,也能为操作系统文件缓存提供额外的内存。因为 Lucene 使用的许多数据结构是基于磁盘的格式,Elasticsearch 利用操作系统缓存能产生很大效果。

64 GB 内存的机器是非常理想的,但是 32 GB 和 16 GB 机器也是很常见的。少于 8 GB 会适得其反(你最终需要很多很多的小机器),大于 64 GB 的机器也会有问题。

由于 ES 构建基于 lucene,而 lucene 设计强大之处在于 lucene 能够很好的利用操作系统内存来缓存索引数据,以提供快速的查询性能。lucene 的索引文件 segements 是存储在单文件中的,并且不可变,对于 OS 来说,能够很友好地将索引文件保持在 cache 中,以便快速访问;因此,我们很有必要将一半的物理内存留给 lucene;另一半的物理内存留给 ES(JVM heap)。

内存分配

当机器内存小于 64G 时,遵循通用的原则,50% 给 ES,50% 留给 lucene。

当机器内存大于 64G 时,遵循以下原则:

  • 如果主要的使用场景是全文检索,那么建议给 ES Heap 分配 4~32G 的内存即可;其它内存留给操作系统,供 lucene 使用(segments cache),以提供更快的查询性能。
  • 如果主要的使用场景是聚合或排序,并且大多数是 numerics,dates,geo_points 以及 not_analyzed 的字符类型,建议分配给 ES Heap 分配 4~32G 的内存即可,其它内存留给操作系统,供 lucene 使用,提供快速的基于文档的聚类、排序性能。
  • 如果使用场景是聚合或排序,并且都是基于 analyzed 字符数据,这时需要更多的 heap size,建议机器上运行多 ES 实例,每个实例保持不超过 50% 的 ES heap 设置(但不超过 32 G,堆内存设置 32 G 以下时,JVM 使用对象指标压缩技巧节省空间),50% 以上留给 lucene。

禁止 swap

禁止 swap,一旦允许内存与磁盘的交换,会引起致命的性能问题。可以通过在 elasticsearch.yml 中 bootstrap.memory_lock: true,以保持 JVM 锁定内存,保证 ES 的性能。

GC 设置

保持 GC 的现有设置,默认设置为:Concurrent-Mark and Sweep(CMS),别换成 G1 GC,因为目前 G1 还有很多 BUG。

保持线程池的现有设置,目前 ES 的线程池较 1.X 有了较多优化设置,保持现状即可;默认线程池大小等于 CPU 核心数。如果一定要改,按公式 ( ( CPU 核心数 * 3 ) / 2 ) + 1 设置;不能超过 CPU 核心数的 2 倍;但是不建议修改默认配置,否则会对 CPU 造成硬伤。

磁盘

硬盘对所有的集群都很重要,对大量写入的集群更是加倍重要(例如那些存储日志数据的)。硬盘是服务器上最慢的子系统,这意味着那些写入量很大的集群很容易让硬盘饱和,使得它成为集群的瓶颈。

在经济压力能承受的范围下,尽量使用固态硬盘(SSD)。固态硬盘相比于任何旋转介质(机械硬盘,磁带等),无论随机写还是顺序写,都会对 IO 有较大的提升。

如果你正在使用 SSDs,确保你的系统 I/O 调度程序是配置正确的。当你向硬盘写数据,I/O 调度程序决定何时把数据实际发送到硬盘。大多数默认 *nix 发行版下的调度程序都叫做 cfq(完全公平队列)。

调度程序分配时间片到每个进程。并且优化这些到硬盘的众多队列的传递。但它是为旋转介质优化的:机械硬盘的固有特性意味着它写入数据到基于物理布局的硬盘会更高效。

这对 SSD 来说是低效的,尽管这里没有涉及到机械硬盘。但是,deadline 或者 noop 应该被使用。deadline 调度程序基于写入等待时间进行优化,noop 只是一个简单的 FIFO 队列。

这个简单的更改可以带来显著的影响。仅仅是使用正确的调度程序,我们看到了 500 倍的写入能力提升。

如果你使用旋转介质(如机械硬盘),尝试获取尽可能快的硬盘(高性能服务器硬盘,15k RPM 驱动器)

使用 RAID0 是提高硬盘速度的有效途径,对机械硬盘和 SSD 来说都是如此。没有必要使用镜像或其它 RAID 变体,因为 Elasticsearch 在自身层面通过副本,已经提供了备份的功能,所以不需要利用磁盘的备份功能,同时如果使用磁盘备份功能的话,对写入速度有较大的影响。

最后,避免使用网络附加存储(NAS)。人们常声称他们的 NAS 解决方案比本地驱动器更快更可靠。除却这些声称,我们从没看到 NAS 能配得上它的大肆宣传。NAS 常常很慢,显露出更大的延时和更宽的平均延时方差,而且它是单点故障的。

索引优化设置

索引优化主要是在 Elasticsearch 的插入层面优化,Elasticsearch 本身索引速度其实还是蛮快的,具体数据,我们可以参考官方的 benchmark 数据。我们可以根据不同的需求,针对索引优化。

批量提交

当有大量数据提交的时候,建议采用批量提交(Bulk 操作);此外使用 bulk 请求时,每个请求不超过几十 M,因为太大会导致内存使用过大。

比如在做 ELK 过程中,Logstash indexer 提交数据到 Elasticsearch 中,batch size 就可以作为一个优化功能点。但是优化 size 大小需要根据文档大小和服务器性能而定。

像 Logstash 中提交文档大小超过 20MB,Logstash 会将一个批量请求切分为多个批量请求。

如果在提交过程中,遇到 EsRejectedExecutionException 异常的话,则说明集群的索引性能已经达到极限了。这种情况,要么提高服务器集群的资源,要么根据业务规则,减少数据收集速度,比如只收集 Warn、Error 级别以上的日志。

增加 Refresh 时间间隔

为了提高索引性能,Elasticsearch 在写入数据的时候,采用延迟写入的策略,即数据先写到内存中,当超过默认 1 秒(index.refresh_interval)会进行一次写入操作,就是将内存中 segment 数据刷新到磁盘中,此时我们才能将数据搜索出来,所以这就是为什么 Elasticsearch 提供的是近实时搜索功能,而不是实时搜索功能。

如果我们的系统对数据延迟要求不高的话,我们可以通过延长 refresh 时间间隔,可以有效地减少 segment 合并压力,提高索引速度。比如在做全链路跟踪的过程中,我们就将 index.refresh_interval 设置为 30s,减少 refresh 次数。再如,在进行全量索引时,可以将 refresh 次数临时关闭,即 index.refresh_interval 设置为-1,数据导入成功后再打开到正常模式,比如 30s。

在加载大量数据时候可以暂时不用 refresh 和 repliccas,index.refresh_interval 设置为-1,index.number_of_replicas 设置为 0。

修改 index_buffer_size 的设置

索引缓冲的设置可以控制多少内存分配给索引进程。这是一个全局配置,会应用于一个节点上所有不同的分片上。

1
2
indices.memory.index_buffer_size: 10%
indices.memory.min_index_buffer_size: 48mb

indices.memory.index_buffer_size 接受一个百分比或者一个表示字节大小的值。默认是 10%,意味着分配给节点的总内存的 10%用来做索引缓冲的大小。这个数值被分到不同的分片(shards)上。如果设置的是百分比,还可以设置 min_index_buffer_size (默认 48mb)和 max_index_buffer_size(默认没有上限)。

修改 translog 相关的设置

一是控制数据从内存到硬盘的操作频率,以减少硬盘 IO。可将 sync_interval 的时间设置大一些。默认为 5s。

1
index.translog.sync_interval: 5s

也可以控制 tranlog 数据块的大小,达到 threshold 大小时,才会 flush 到 lucene 索引文件。默认为 512m。

1
index.translog.flush_threshold_size: 512mb

注意 _id 字段的使用

_id 字段的使用,应尽可能避免自定义 _id,以避免针对 ID 的版本管理;建议使用 ES 的默认 ID 生成策略或使用数字类型 ID 做为主键。

注意 _all 字段及 _source 字段的使用

**_**all 字段及 _source 字段的使用,应该注意场景和需要,_all 字段包含了所有的索引字段,方便做全文检索,如果无此需求,可以禁用;_source 存储了原始的 document 内容,如果没有获取原始文档数据的需求,可通过设置 includes、excludes 属性来定义放入 _source 的字段。

合理的配置使用 index 属性

合理的配置使用 index 属性,analyzed 和 not_analyzed,根据业务需求来控制字段是否分词或不分词。只有 groupby 需求的字段,配置时就设置成 not_analyzed,以提高查询或聚类的效率。

减少副本数量

Elasticsearch 默认副本数量为 3 个,虽然这样会提高集群的可用性,增加搜索的并发数,但是同时也会影响写入索引的效率。

在索引过程中,需要把更新的文档发到副本节点上,等副本节点生效后在进行返回结束。使用 Elasticsearch 做业务搜索的时候,建议副本数目还是设置为 3 个,但是像内部 ELK 日志系统、分布式跟踪系统中,完全可以将副本数目设置为 1 个。

查询方面优化

Elasticsearch 作为业务搜索的近实时查询时,查询效率的优化显得尤为重要。

路由优化

当我们查询文档的时候,Elasticsearch 如何知道一个文档应该存放到哪个分片中呢?它其实是通过下面这个公式来计算出来的。

1
shard = hash(routing) % number_of_primary_shards

routing 默认值是文档的 id,也可以采用自定义值,比如用户 ID。

不带 routing 查询

在查询的时候因为不知道要查询的数据具体在哪个分片上,所以整个过程分为 2 个步骤:

  1. 分发:请求到达协调节点后,协调节点将查询请求分发到每个分片上。
  2. 聚合:协调节点搜集到每个分片上查询结果,再将查询的结果进行排序,之后给用户返回结果。

带 routing 查询

查询的时候,可以直接根据 routing 信息定位到某个分配查询,不需要查询所有的分配,经过协调节点排序。

向上面自定义的用户查询,如果 routing 设置为 userid 的话,就可以直接查询出数据来,效率提升很多。

Filter VS Query

尽可能使用过滤器上下文(Filter)替代查询上下文(Query)

  • Query:此文档与此查询子句的匹配程度如何?
  • Filter:此文档和查询子句匹配吗?

Elasticsearch 针对 Filter 查询只需要回答“是”或者“否”,不需要像 Query 查询一样计算相关性分数,同时 Filter 结果可以缓存。

深度翻页

在使用 Elasticsearch 过程中,应尽量避免大翻页的出现。

正常翻页查询都是从 from 开始 size 条数据,这样就需要在每个分片中查询打分排名在前面的 from+size 条数据。协同节点收集每个分配的前 from+size 条数据。协同节点一共会受到 N*(from+size) 条数据,然后进行排序,再将其中 from 到 from+size 条数据返回出去。如果 from 或者 size 很大的话,导致参加排序的数量会同步扩大很多,最终会导致 CPU 资源消耗增大。

可以通过使用 Elasticsearch scroll 和 scroll-scan 高效滚动的方式来解决这样的问题。

也可以结合实际业务特点,文档 id 大小如果和文档创建时间是一致有序的,可以以文档 id 作为分页的偏移量,并将其作为分页查询的一个条件。

脚本(script)合理使用

我们知道脚本使用主要有 3 种形式,内联动态编译方式、_script 索引库中存储和文件脚本存储的形式;一般脚本的使用场景是粗排,尽量用第二种方式先将脚本存储在 _script 索引库中,起到提前编译,然后通过引用脚本 id,并结合 params 参数使用,即可以达到模型(逻辑)和数据进行了分离,同时又便于脚本模块的扩展与维护。具体 ES 脚本的深入内容请参考 Elasticsearch 脚本模块的详解

数据结构优化

基于 Elasticsearch 的使用场景,文档数据结构尽量和使用场景进行结合,去掉没用及不合理的数据。

尽量减少不需要的字段

如果 Elasticsearch 用于业务搜索服务,一些不需要用于搜索的字段最好不存到 ES 中,这样即节省空间,同时在相同的数据量下,也能提高搜索性能。

避免使用动态值作字段,动态递增的 mapping,会导致集群崩溃;同样,也需要控制字段的数量,业务中不使用的字段,就不要索引。控制索引的字段数量、mapping 深度、索引字段的类型,对于 ES 的性能优化是重中之重。

以下是 ES 关于字段数、mapping 深度的一些默认设置:

1
2
3
index.mapping.nested_objects.limit: 10000
index.mapping.total_fields.limit: 1000
index.mapping.depth.limit: 20

Nested Object vs Parent/Child

尽量避免使用 nested 或 parent/child 的字段,能不用就不用;nested query 慢,parent/child query 更慢,比 nested query 慢上百倍;因此能在 mapping 设计阶段搞定的(大宽表设计或采用比较 smart 的数据结构),就不要用父子关系的 mapping。

如果一定要使用 nested fields,保证 nested fields 字段不能过多,目前 ES 默认限制是 50。因为针对 1 个 document,每一个 nested field,都会生成一个独立的 document,这将使 doc 数量剧增,影响查询效率,尤其是 JOIN 的效率。

1
index.mapping.nested_fields.limit: 50
对比 Nested Object Parent/Child
优点 文档存储在一起,因此读取性高 父子文档可以独立更新,互不影响
缺点 更新父文档或子文档时需要更新整个文档 为了维护 join 关系,需要占用部分内存,读取性能较差
场景 子文档偶尔更新,查询频繁 子文档更新频繁

选择静态映射,非必需时,禁止动态映射

尽量避免使用动态映射,这样有可能会导致集群崩溃,此外,动态映射有可能会带来不可控制的数据类型,进而有可能导致在查询端出现相关异常,影响业务。

此外,Elasticsearch 作为搜索引擎时,主要承载 query 的匹配和排序的功能,那数据的存储类型基于这两种功能的用途分为两类,一是需要匹配的字段,用来建立倒排索引对 query 匹配用,另一类字段是用做粗排用到的特征字段,如 ctr、点击数、评论数等等。

集群架构设计

合理的部署 Elasticsearch 有助于提高服务的整体可用性。

主节点、数据节点和协调节点分离

Elasticsearch 集群在架构拓朴时,采用主节点、数据节点和负载均衡节点分离的架构,在 5.x 版本以后,又可将数据节点再细分为“Hot-Warm”的架构模式。

Elasticsearch 的配置文件中有 2 个参数,node.master 和 node.data。这两个参数搭配使用时,能够帮助提供服务器性能。

主(master)节点

配置 node.master:truenode.data:false,该 node 服务器只作为一个主节点,但不存储任何索引数据。我们推荐每个集群运行 3 个专用的 master 节点来提供最好的弹性。使用时,你还需要将 discovery.zen.minimum_master_nodes setting 参数设置为 2,以免出现脑裂(split-brain)的情况。用 3 个专用的 master 节点,专门负责处理集群的管理以及加强状态的整体稳定性。因为这 3 个 master 节点不包含数据也不会实际参与搜索以及索引操作,在 JVM 上它们不用做相同的事,例如繁重的索引或者耗时,资源耗费很大的搜索。因此不太可能会因为垃圾回收而导致停顿。因此,master 节点的 CPU,内存以及磁盘配置可以比 data 节点少很多的。

数据(data)节点

配置 node.master:falsenode.data:true,该 node 服务器只作为一个数据节点,只用于存储索引数据,使该 node 服务器功能单一,只用于数据存储和数据查询,降低其资源消耗率。

在 Elasticsearch 5.x 版本之后,data 节点又可再细分为“Hot-Warm”架构,即分为热节点(hot node)和暖节点(warm node)。

hot 节点:

hot 节点主要是索引节点(写节点),同时会保存近期的一些频繁被查询的索引。由于进行索引非常耗费 CPU 和 IO,即属于 IO 和 CPU 密集型操作,建议使用 SSD 的磁盘类型,保持良好的写性能;我们推荐部署最小化的 3 个 hot 节点来保证高可用性。根据近期需要收集以及查询的数据量,可以增加服务器数量来获得想要的性能。

将节点设置为 hot 类型需要 elasticsearch.yml 如下配置:

1
node.attr.box_type: hot

如果是针对指定的 index 操作,可以通过 settings 设置 index.routing.allocation.require.box_type: hot 将索引写入 hot 节点。

warm 节点:

这种类型的节点是为了处理大量的,而且不经常访问的只读索引而设计的。由于这些索引是只读的,warm 节点倾向于挂载大量磁盘(普通磁盘)来替代 SSD。内存、CPU 的配置跟 hot 节点保持一致即可;节点数量一般也是大于等于 3 个。

将节点设置为 warm 类型需要 elasticsearch.yml 如下配置:

1
node.attr.box_type: warm

同时,也可以在 elasticsearch.yml 中设置 index.codec:best_compression 保证 warm 节点的压缩配置。

当索引不再被频繁查询时,可通过 index.routing.allocation.require.box_type:warm,将索引标记为 warm,从而保证索引不写入 hot 节点,以便将 SSD 磁盘资源用在刀刃上。一旦设置这个属性,ES 会自动将索引合并到 warm 节点。

协调(coordinating)节点

协调节点用于做分布式里的协调,将各分片或节点返回的数据整合后返回。该节点不会被选作主节点,也不会存储任何索引数据。该服务器主要用于查询负载均衡。在查询的时候,通常会涉及到从多个 node 服务器上查询数据,并将请求分发到多个指定的 node 服务器,并对各个 node 服务器返回的结果进行一个汇总处理,最终返回给客户端。在 ES 集群中,所有的节点都有可能是协调节点,但是,可以通过设置 node.masternode.datanode.ingest 都为 false 来设置专门的协调节点。需要较好的 CPU 和较高的内存。

  • node.master:false 和 node.data:true,该 node 服务器只作为一个数据节点,只用于存储索引数据,使该 node 服务器功能单一,只用于数据存储和数据查询,降低其资源消耗率。
  • node.master:true 和 node.data:false,该 node 服务器只作为一个主节点,但不存储任何索引数据,该 node 服务器将使用自身空闲的资源,来协调各种创建索引请求或者查询请求,并将这些请求合理分发到相关的 node 服务器上。
  • node.master:false 和 node.data:false,该 node 服务器即不会被选作主节点,也不会存储任何索引数据。该服务器主要用于查询负载均衡。在查询的时候,通常会涉及到从多个 node 服务器上查询数据,并将请求分发到多个指定的 node 服务器,并对各个 node 服务器返回的结果进行一个汇总处理,最终返回给客户端。

关闭 data 节点服务器中的 http 功能

针对 Elasticsearch 集群中的所有数据节点,不用开启 http 服务。将其中的配置参数这样设置,http.enabled:false,同时也不要安装 head, bigdesk, marvel 等监控插件,这样保证 data 节点服务器只需处理创建/更新/删除/查询索引数据等操作。

http 功能可以在非数据节点服务器上开启,上述相关的监控插件也安装到这些服务器上,用于监控 Elasticsearch 集群状态等数据信息。这样做一来出于数据安全考虑,二来出于服务性能考虑。

一台服务器上最好只部署一个 node

一台物理服务器上可以启动多个 node 服务器节点(通过设置不同的启动 port),但一台服务器上的 CPU、内存、硬盘等资源毕竟有限,从服务器性能考虑,不建议一台服务器上启动多个 node 节点。

集群分片设置

ES 一旦创建好索引后,就无法调整分片的设置,而在 ES 中,一个分片实际上对应一个 lucene 索引,而 lucene 索引的读写会占用很多的系统资源,因此,分片数不能设置过大;所以,在创建索引时,合理配置分片数是非常重要的。一般来说,我们遵循一些原则:

  1. 控制每个分片占用的硬盘容量不超过 ES 的最大 JVM 的堆空间设置(一般设置不超过 32 G,参考上面的 JVM 内存设置原则),因此,如果索引的总容量在 500 G 左右,那分片大小在 16 个左右即可;当然,最好同时考虑原则 2。
  2. 考虑一下 node 数量,一般一个节点有时候就是一台物理机,如果分片数过多,大大超过了节点数,很可能会导致一个节点上存在多个分片,一旦该节点故障,即使保持了 1 个以上的副本,同样有可能会导致数据丢失,集群无法恢复。所以,一般都设置分片数不超过节点数的 3 倍

参考资料