Dunwu Blog

大道至简,知易行难

《RocketMQ 技术内幕》笔记

读源代码前的准备

RocketMQ 源代码的目录结构

  • broker:broker 模块(broker 启动进程) 。
  • client:消息客户端,包含生产者、消息消费者相关类。
  • common:公共包。
  • dev:开发者信息(非源代码) 。
  • distribution:部署实例文件夹(非源代码) 。
  • example:RocketMQ 示例代码。
  • filter:消息过滤相关基础类。
  • filter:消息过滤服务器实现相关类(Filter 启动进程) 。
  • logappender:日志实现相关类。
  • namesrv:N ameServer 实现相关类(Names 巳 rver 启动进程) 。
  • openmessaging:消息开放标准,正在制定中。
  • remoting:远程通信模块,基于 Netty 。
  • srvutil:服务器工具类。
  • store:消息存储实现相关类。
  • style:checkstyle 相关实现。
  • test:测试相关类。
  • tools:工具类,监控命令相关实现类。

RocketMQ 的设计理念和目标

设计理念

RocketMQ 设计基于主题的订阅与发布模式, 其核心功能包括:消息发送、消息存储( Broker )、消息消费。其整体设计追求简单与性能第一,主要体现在如下三个方面:

  • 自研 NameServer,而不是用 ZooKeeper 作为注册中心。因为 ZooKeeper 采用 CAP 模型中的 CP 模型,其实并不适用于注册中心的业务模式。
  • RocketMQ 的消息存储文件设计成文件组的概念,组内单个文件大小固定,方便引入内存映射机制,所有主
    题的消息存储基于顺序写, 极大地提供了消息写性能,同时为了兼顾消息消费与消息查找,引入了消息消费队列文件与索引文件。
  • 容忍存在设计缺陷,适当将某些工作下放给 RocketMQ 使用者。消息中间件的实现者经常会遇到一个难题:如何保证消息一定能被消息消费者消费,并且保证只消费一次。RocketMQ 的设计者给出的解决办法是不解决这个难题,而是退而求其次,只保证消息被消费者消费,但设计上允许消息被重复消费,这样极大地简化了消息中间件的内核,使得实现消息发送高可用变得非常简单与高效,消息重复问题由消费者在消息消费时实现幂等。

设计目标

  • 架构模式:RocketMQ 与大部分消息中间件一样,采用发布订阅模式,基本的参与组件主要包括:消息发送者、消息服务器(消息存储)、消息消费、路由发现。
  • 顺序消息:所谓顺序消息,就是消息消费者按照消息达到消息存储服务器的顺序消费。RocketMQ 可以严格保证消息有序。
  • 消息过滤:消息过滤是指在消息消费时,消息消费者可以对同一主题下的消息按照规则只消费自己感兴趣的消息。RocketMQ 消息过滤支持在服务端与消费端的消息过滤机制。
  • 消息在 Broker 端过滤。Broker 只将消息消费者感兴趣的消息发送给消息消费者。
  • 消息在消息消费端过滤,消息过滤方式完全由消息消费者自定义,但缺点是有很多无用的消息会从 Broker 传输到消费端。
  • 消息存储:消息中间件的一个核心实现是消息的存储,对消息存储一般有如下两个维度的考量:消息堆积能力和消息存储性能。RocketMQ 追求消息存储的高性能,引人内存映射机制,所有主题的消息顺序存储在同一个文件中。同时为了避免消息无限在消息存储服务器中累积,引入了消息文件过期机制与文件存储空间报警机制。
  • 消息高可用性
    • 通常影响消息可靠性的有以下几种情况。
      1. Broker 正常关机。
      2. Broker 异常 Crash 。
      3. OS Crash 。
      4. 机器断电,但是能立即恢复供电情况。
      5. 机器无法开机(可能是 CPU 、主板、内存等关键设备损坏) 。
      6. 磁盘设备损坏。
    • 针对上述情况,情况 1~4 的 RocketMQ 在同步刷盘机制下可以确保不丢失消息,在异步刷盘模式下,会丢失少量消息。情况 5-6 属于单点故障,一旦发生,该节点上的消息全部丢失,如果开启了异步复制机制, RoketMQ 能保证只丢失少量消息, RocketMQ 在后续版本中将引人双写机制,以满足消息可靠性要求极高的场合。
  • 消息到达( 消费)低延迟:RocketMQ 在消息不发生消息堆积时,以长轮询模式实现准实时的消息推送模式。
  • 确保消息必须被消费一次:RocketMQ 通过消息消费确认机制(ACK)来确保消息至少被消费一次,但由于 ACK 消息有可能丢失等其他原因, RocketMQ 无法做到消息只被消费一次,有重复消费的可能。
  • 回溯消息:回溯消息是指消息消费端已经消费成功的消息,由于业务要求需要重新消费消息。RocketMQ 支持按时间回溯消息,时间维度可精确到毫秒,可以向前或向后回溯。
  • 消息堆积:消息中间件的主要功能是异步解耦,必须具备应对前端的数据洪峰,提高后端系统的可用性,必然要求消息中间件具备一定的消息堆积能力。RocketMQ 消息存储使用磁盘文件(内存映射机制),并且在物理布局上为多个大小相等的文件组成逻辑文件组,可以无限循环使用。RocketMQ 消息存储文件并不是永久存储在消息服务器端,而是提供了过期机制,默认保留 3 天。
  • 定时消息:定时消息是指消息发送到 Broker 后, 不能被消息消费端立即消费,要到特定的时间点或者等待特定的时间后才能被消费。如果要支持任意精度的定时消息消费,必须在消息服务端对消息进行排序,势必带来很大的性能损耗,故 RocketMQ 不支持任意进度的定时消息,而只支持特定延迟级别。
  • 消息重试机制:消息重试是指消息在消费时,如果发送异常,消息中间件需要支持消息重新投递,RocketMQ 支持消息重试机制。

RocketMQ 路由中心 NameServer

NameServer 架构设计

Broker 消息服务器在启动时向所有 NameServer 注册,生产者(Producer)在发送消息之前先从 NameServer 获取 Broker 服务器地址列表,然后根据负载算法从列表中选择一台消息服务器进行消息发送。NameServer 与每台 Broker 服务器保持长连接,并间隔 30s 检测 Broker 是否存活,如果检测到 Broker 宕机, 则从路由注册表中将其移除。但是路由变化不会马上通知生产者,为什么要这样设计呢?这是为了降低 NameServer 实现的复杂性,在消息发送端提供容错机制来保证消息发送的高可用性。

NameServer 本身的高可用可通过部署多台 NameServer 服务器来实现,但彼此之间互不通信,也就是说 NameServer 服务器之间在某一时刻的数据并不会完全相同,但这对消息发送不会造成任何影响。

NameServer 启动流程

  1. 加载配置,然后根据配置初始化 NamesrvController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();

Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}

// 1. 初始化 NamesrvConfig 配置和 NettyServerConfig 配置
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);

// 1.1. 加载配置文件中的配置
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);

namesrvConfig.setConfigStorePath(file);

System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}

// 1.2. 加载启动命令中的配置
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}

MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

// 2. 强制必须设置环境变量 ROCKETMQ_HOME
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}

// 3. 打印配置项
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);

final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);

return controller;
}

  1. 根据启动属性创建 NamesrvController 实例,并初始化该实例, NameServerController 实例为 NameServer 核心控制器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public boolean initialize() {

// 加载KV 配置
this.kvConfigManager.load();

// 创建 NettyRemotingServer 网络处理对象
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

// 注册进程
this.registerProcessor();

// 开启两个定时任务(心跳检测)
// 任务一:NameServer 每隔 1O 秒扫描一次 Broker,移除不活跃的 Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 任务二:NameServer 每隔 1O 分钟打印一次 KV 配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);

// 如果是 TLS 模式,加载证书,开启安全模式
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}

return true;
}
  1. 注册 JVM 钩子函数并启动服务器,以便监昕 Broker 、生产者的网络请求。
1
2
3
4
5
6
7
8
9
// 注册 JVM 钩子函数并启动服务器,以便监昕Broker、 生产者的网络请求
// 如果代码中使用了线程池,一种优雅停机的方式就是注册一个 JVM 钩子函数,在 JVM 进程关闭之前,先将线程池关闭,及时释放资源
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));

NameServer 路由注册、故障剔除

NameServer 主要作用是为生产者和消息消费者提供关于主题 Topic 的路由信息,那么 NameServer 需要存储路由的基础信息,还要能够管理 Broker 节点,包括路由注册、路由删除等功能。

路由元信息

NameServer 路由实现类:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager。它主要存储了以下信息:

  • topicQueueTable:Topic 消息队列路由信息,消息发送时根据路由表进行负载均衡。
  • brokerAddrTable:Broker 基础信息, 包含 brokerName 、所属集群名称、主备 Broker 地址。
  • clusterAddrTable:Broker 集群信息,存储集群中所有 Broker 名称。
  • brokerLiveTable:Broker 状态信息。NameServer 每次收到心跳包时会替换该信息。
  • filterServerTable:Broker 上的 FilterServer 列表,用于类模式消息过滤。

RocketMQ 基于订阅发布机制,一个 Topic 拥有多个消息队列,一个 Broker 为每一主题默认创建 4 个读队列 4 个写队列。多个 Broker 组成一个集群,BrokerName 由相同的多台 Broker 组成 Master-Slave 架构, brokerId 为 0 代表 Master,大于 0 表示 Slave。BrokerLiveInfo 中的 lastUpdateTimestamp 存储上次收到 Broker 心跳包的时间。

路由注册

RocketMQ 路由注册是通过 Broker 与 NameServer 的心跳功能实现的。Broker 启动时向集群中所有的 NameServer 发送心跳语句,每隔 30s 向集群中所有 NameServer 发送心跳包, NameServer 收到 Broker 心跳包时会更新 brokerLiveTable 缓存中 BrokerLiveInfo 的 lastUpdateTimestamp ,然后 NameServer 每隔 10s 扫描 brokerLiveTable ,如果连续 120s 没有收到心跳包, NameServer 将移除该 Broker 的路由信息同时关闭 Socket 连接。

(1)Broker 发送心跳包

Broker 会遍历 NameServer 列表, 依次向所有 NameServer 发送心跳包。

(2)NameServer 处理心跳包

  • 路由注册需要加写锁,防止并发修改 RouteInfoManager 中的路由表。
  • 判断 Broker 所属集群是否存在,如果不存在,则创建,然后将 broker 加入到 Broker 集群。
  • 维护 BrokerData 信息,首先从 brokerAddrTable 根据 BrokerName 尝试获取 Broker 信息。
    • 如果不存在,则新建 BrokerData 并放入到 brokerAddrTable, registerFirst 设置为 true;
    • 如果存在,直接将 registerFirst 设置为 false,表示非第一次注册。
  • 如果 Broker 为 Master,并且 Broker Topic 配置信息发生变化或者是初次注册,则需要创建或更新 Topic 路由元数据。填充 topicQueueTable,其实就是为默认主题自动注册路由信息,其中包含 MixAII.DEFAULT_TOPIC 的路由信息。当生产者发送主题时,如果该主题未创建并且 BrokerConfig 的 autoCreateTopicEnable 为 true 时,将返回 MixAII.DEFAULT_TOPIC 的路由信息。
  • 更新 BrokerLiveInfo,存活 Broker 信息表,BrokeLiveInfo 是执行路由删除的重要依据。
  • 注册 Broker 的过滤器 Server 地址列表,一个 Broker 上会关联多个 FilterServer 消息过滤服务器;如果此 Broker 为从节点,则需要查找该 Broker 的 Master 的节点信息,并更新对应的 masterAddr 属性。

设计亮点: NameServe 与 Broker 保持长连接, Broker 状态存储在 brokerLiveTable 中,NameServer 每收到一个心跳包,将更新 brokerLiveTable 中关于 Broker 的状态信息以及路由表(topicQueueTable 、brokerAddrTable 、brokerLiveTable 、filterServerTable) 。更新上述路由表(HashTable)使用了锁粒度较少的读写锁,允许多个消息发送者(Producer )并发读,保证消息发送时的高并发。但同一时刻 NameServer 只处理一个 Broker 心跳包,多个心跳包请求串行执行。

路由删除

Broker 每隔 30s 向 NameServe 发送一个心跳包,心跳包中包含 BrokerId 、Broker 地址、Broker 名称、Broker 所属集群名称、Broker 关联的 FilterServer 列表。但是如果 Broker 宕机,NameServer 无法收到心跳包,此时 NameServer 如何来剔除这些失效的 Broker 呢? NameServer 会每隔 10s 扫描 brokerLiveTable 状态表,如果 BrokerLive 的 lastUpdateTimestamp 的时间戳距当前时间超过 120s ,则认为 Broker 失效,移除该 Broker,关闭与 Broker 连接,并同时更新 topicQueueTable 、brokerAddrTable 、brokerLiveTable 、filterServerTable 。

RocktMQ 有两个触发点来触发路由删除。

  • NameServer 定时扫描 brokerLiveTable 检测上次心跳包与当前系统时间的时间差,如果时间戳大于 120s ,则需要移除该 Broker 信息。

  • Broker 在正常被关闭的情况下,会执行 unregisterBroker 指令。

由于不管是何种方式触发的路由删除,路由删除的方法都是一样的,就是从 topicQueueTable 、rokerAddrTable 、brokerLiveTable 、filterServerTable 删除与该 Broker 相关的信息,但 RocketMQ 这两种方式维护路由信息时会抽取公共代码。

scanNotActiveBroker 在 NameServer 中每 10s 执行一次。逻辑很简单:遍历 brokerLiveInfo 路由表(HashMap),检测 BrokerLiveInfo 的 lastUpdateTimestamp。上次收到心跳包的时间如果超过当前时间 120s,NameServer 则认为该 Broker 已不可用,故需要将它移除,关闭 Channel,然后删除与该 Broker 相关的路由信息,路由表维护过程,需要申请写锁。

(1)申请写锁,根据 brokerAddress 从 brokerLiveTable 、filterServerTable 移除

(2)维护 brokerAddrTable 。遍历从 HashMap<String /* brokerName */, BrokerData> brokerAddrTable,从 BrokerData 的 HashMap<Long /* brokerId */, String /* broker address */> brokerAddrs 中,找到具体的 Broker ,从 BrokerData 中移除,如果移除后在 BrokerData 中不再包含其他 Broker,则在 brokerAddrTable 中移除该 brokerName 对应的条目。

(3)根据 brokerName,从 clusterAddrTable 中找到 Broker 并从集群中移除。如果移除后,集群中不包含任何 Broker,则将该集群从 clusterAddrTable 中移除。

(4)根据 brokerName,遍历所有主题的队列,如果队列中包含了当前 Broker 的队列, 则移除,如果 topic 只包含待移除 Broker 的队列的话,从路由表中删除该 topic。

路由发现

RocketMQ 路由发现是非实时的,当 Topic 路由出现变化后,NameServer 不主动推送给客户端,而是由客户端定时拉取主题最新的路由。根据主题名称拉取路由信息的命令编码为:GET_ROUTEINTO_BY_TOPIC 。

orderTopicConf :顺序消息配置内容,来自于 kvConfig 。

List<QueueData> queueData:topic 队列元数据。

List<BrokerData> brokerDatas:topic 分布的 broker 元数据。

HashMap<String/*brokerAdress*/,List<String> /*filterServer*/> :broker 上过滤服务器地址列表。

NameServer 路由发现实现方法:DefaultRequestProcessor#getRouteInfoByTopic

  1. 调用 RouterlnfoManager 的方法,从路由表 topicQueueTable 、brokerAddrTable 、filterServerTable 中分别填充 TopicRouteData 中的 List<QueueData>List<BrokerData> 和 filterServer 地址表。

  2. 如果找到主题对应的路由信息并且该主题为顺序消息,则从 NameServer KVconfig 中获取关于顺序消息相关的配置填充路由信息。

如果找不到路由信息 CODE 则使用 TOPIC NOT_EXISTS ,表示没有找到对应的路由。

RocketMQ 消息发送

漫谈 RocketMQ 消息发送

RocketMQ 支持 3 种消息发送方式:同步(sync) 、异步(async)、单向(oneway) 。

  • 同步:发送者向 MQ 执行发送消息 API 时,同步等待, 直到消息服务器返回发送结果。
  • 异步:发送者向 MQ 执行发送消息 API 时,指定消息发送成功后的回掉函数,然后调用消息发送 API 后,立即返回,消息发送者线程不阻塞,直到运行结束,消息发送成功或失败的回调任务在一个新的线程中执行。
  • 单向:消息发送者向 MQ 执行发送消息 API 时,直接返回,不等待消息服务器的结果,也不注册回调函数,简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上。

RocketMQ 消息发送需要考虑以下几个问题。

  • 消息队列如何进行负载?
  • 消息发送如何实现高可用?
  • 批量消息发送如何实现一致性?

认识 RocketMQ 消息

RocketMQ 消息的封装类是 org.apache.rocketmq.common.message.Message。其主要属性有:

  • topic:主题
  • properties:属性容器。RocketMQ 会向其中添加一些扩展属性:
    • tags:消息标签,用于消息过滤。
    • keys:消息索引,多个用空格隔开,RocketMQ 可以根据这些 key 快速检索到消息。
    • waitStoreMsgOK:消息发送时是否等消息存储完成后再返回。
    • delayTimeLevel:消息延迟级别,用于定时消息或消息重试。
  • body:消息体
  • transactionId:事务 ID

生产者启动流程

DefaultMQProducer 是默认的生产者实现类。它实现了 MQAdmin 的接口。

初识 DefaultMQProducer 消息发送者

DefaultMQProducer 的主要方法
  • void createTopic(String key, String newTopic, int queueNum, int topicSysFlag):创建主题
    • key:目前未实际作用,可以与 newTopic 相同。
    • newTopic:主题名称。
    • queueNum:队列数量。
    • topicSysFlag:主题系统标签,默认为 0 。
  • long searchOffset(final MessageQueue mq, final long timestamp):根据时间戳从队列中查找其偏移量。
  • long maxOffset(final MessageQueue mq):查找该消息队列中最大的物理偏移量。
  • long minOffset(final MessageQueue mq):查找该消息队列中最小物理偏移量。
  • MessageExt viewMessage(final String offsetMsgld):根据消息偏移量查找消息。
  • QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, final long end):根据条件查询消息。
    • topic:消息主题。
    • key:消息索引字段。
    • maxNum:本次最多取出消息条数。
    • begin:开始时间。
    • end:结束时间。
  • MessageExt viewMessage(String topic,String msgld):根据主题与消息 ID 查找消息。
  • List<MessageQueue> fetchPublishMessageQueues(final String topic):查找该主题下所有的消息队列。
  • SendResult send(final Message msg):同步发送消息,具体发送到主题中的哪个消息队列由负载算法决定。
  • SendResult send(final Message msg, final long timeout):同步发送消息,如果发送超过 timeout 则抛出超时异常。
  • void send(final Message msg, final SendCallback sendCallback):异步发送消息, sendCallback 参数是消息发送成功后的回调方法。
  • void send(final Message msg, final SendCallback sendCallback, final long timeout):异步发送消息,如果发送超过 timeout 指定的值,则抛出超时异常。
  • void sendOneway(final Message msg):单向消息发送,就是不在乎发送结果,消息发送出去后该方法立即返回。
  • SendResult send(final Message msg, final MessageQueue mq):同步方式发送消息,发送到指定消息队列。
  • void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback):异步方式发送消息,发送到指定消息队列。
  • void sendOneway(final Message msg, final MessageQueue mq):单向方式发送消息,发送到指定的消息队列。
  • SendResult send(final Message msg , final MessageQueueSelector selector, final Object arg):消息发送,指定消息选择算法,覆盖生产者默认的消息队列负载。
  • SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout):同步批量消息发送。
DefaultMQProducer 的核心属性
  • producerGroup:生产者所属组,消息服务器在回查事务状态时会随机选择该组中任何一个生产者发起事务回查请求。
  • createTopicKey:默认 topicKey 。
  • defaultTopicQueueNums:默认主题在每一个 Broker 队列数量。
  • sendMsgTimeout:发送消息默认超时时间, 默认 3s 。
  • compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认 4K。
  • retryTimesWhenSendFailed:同步方式发送消息重试次数,默认为 2 ,总共执行 3 次。
  • retryTimesWhenSendAsyncFailed:异步方式发送消息重试次数,默认为 2 。
  • retryAnotherBrokerWhenNotStoreOK:消息重试时选择另外一个 Broker ,是否不等待存储结果就返回, 默认为 false 。
  • maxMessageSize:允许发送的最大消息长度,默认为 4M ,眩值最大值为 2^32-1 。

生产者启动流程

DefaultMQProducerImpl#start() 是生产者的启动方法,其主要工作流程如下:

  1. 检查生产者组(productGroup)是否符合要求;并改变生产者的 instanceName 为进程 ID 。
  2. 获取或创建 MQClientInstance 实例。
    • 整个 JVM 实例中只存在一个 MQClientManager 实例(单例)。
    • MQClientManager 中维护一个 ConcurrentMap 类型的缓存,用于保证同一个 clientId 只会创建一个 MQClientInstance
  3. 将当前生产者注册到 MQClientInstance 中,方便后续调用网络请求、进行心跳检测等。
  4. 启动 MQClientInstance ,如果 MQClientInstance 已经启动,则本次启动不会真正执行。
  5. 向所有 Broker 发送心跳。
  6. 启动一个定时任务,用于定期清理过时的发送请求。

消息发送基本流程

消息发送的核心方法是 DefaultMQProducerImpl#sendDefaultImpl

消息长度验证

消息发送之前,首先确保生产者处于运行状态,然后验证消息是否符合相应的规范,具体的规范要求是主题名称、消息体不能为空、消息长度不能等于 0 且默认不能超过允许发送消息的最大长度 4M(maxMessageSize=l024 * 1024 * 4) 。

查找主题路由信息

消息发送之前,首先需要获取主题的路由信息,只有获取了这些信息我们才知道消息要发送到具体的 Broker 节点。

tryToFindTopicPublishInfo 是查找主题的路由信息的方法。

如果生产者中缓存了 topic 的路由信息,或路由信息中包含了消息队列,则直接返回该路由信息。

如果没有缓存或没有包含消息队列, 则向 NameServer 查询该 topic 的路由信息。

如果最终未找到路由信息,则抛出异常:无法找到主题相关路由信息异常。

TopicPublishinfo 的属性:

  • orderTopic:是否为顺序消息。
  • haveTopicRouterInfo:是否有主题路由信息。
  • List<MessageQueue> messageQueueList:Topic 的消息队列。
  • sendWhichQueue:用于选择消息队列。每选择一次消息队列, 该值会自增 1。
  • topicRouteData:主题路由数据。

MQClientlnstance#updateTopicRouteInfoFromNameServer 这个方法的功能是生产者更新和维护路由缓存。

  1. 如果 isDefault 为 true,则使用默认主题去查询,如果查询到路由信息,则替换路由信息中读写队列个数为生产者默认的队列个数(defaultTopicQueueNums);如果 isDefault 为 false,则使用参数 topic 去查询;如果未查询到路由信息,则返回 false ,表示路由信息未变化。
  2. 如果路由信息找到,与本地缓存中的路由信息进行对比,判断路由信息是否发生了改变,如果未发生变化,则直接返回 false 。
  3. 更新 MQClientInstance 的 Broker 地址缓存表。
  4. 根据 topicRouteData 中的 List<QueueData> 转换成 topicPublishInfoList<MessageQueue> 列表。其具体实现在 topicRouteData2TopicPublishInfo 中, 然后会更新该 MQClientInstance 所管辖的所有消息,发送关于 topic 的路由信息。
  5. 循环遍历路由信息的 QueueData 信息,如果队列没有写权限,则继续遍历下一个 QueueData;根据 brokerName 找到 brokerData 信息,找不到或没有找到 Master 节点,则遍历下一个 QueueData;根据写队列个数,根据 topic +序号 创建 MessageQueue ,填充 TopicPublishInfoList<QueueMessage>

选择 Broker

消息发送

RocketMQ 消息存储

RocketMQ 消息消费

消息过滤 FilterServer

RocketMQ 主从同步

RocketMQ 事务消息

RocketMQ 实战

参考资料

《软件工程之美》笔记

到底应该怎样理解软件工程?

软件产品危机:软件产品质量低劣、软件维护工作量大、成本不断上升、进度不可控、程序人员无限度地增加。

软件工程,它是为研究和克服软件危机而生。

软件工程的本质:用工程化方法去规范软件开发,让项目可以按时完成、成本可控、质量有保证。

软件工程的核心:是围绕软件项目开发,对开发过程的组织,对方法的运用,对工具的使用。

软件工程 = 过程 + 方法 + 工具。

工程思维:把每件事都当作一个项目来推进

有目的、有计划、有步骤地解决问题的方法就是工程方法。

工程方法通常会分成六个阶段:想法、概念、计划、设计、开发和发布。

  • 想法:想法阶段通常是想要解决问题。最开始问题通常是模糊的,所以需要清晰地定义好问题,研究其可行性,检查是否有可行的解决方案。
  • 概念:概念阶段就是用图纸、草图、模型等方式,提出一些概念性的解决方案。这些方案可能有多个,最终会确定一个解决方案。
  • 计划:计划阶段是关于如何实施的计划,通常会包含人员、任务、任务持续时间、任务的依赖关系,以及完成项目所需要的预算。
  • 设计:设计阶段就是要针对产品需求,将解决方案进一步细化,设计整体架构和划分功能模块,作为分工合作和开发实施的一个依据和参考。
  • 开发:开发阶段就是根据设计方案,将解决方案构建实施。开发阶段通常是一个迭代的过程,这个阶段通常会有构建、测试、调试和重新设计的迭代。
  • 发布:将最终结果包括文档发布。

瀑布模型:像工厂流水线一样把软件开发分层化

瀑布模型把整个项目过程分成了六个主要阶段:

  • 问题的定义及规划:这个阶段是需求方和开发方共同确定软件开发目标,同时还要做可行性研究,以确定项目可行。这个阶段会产生需求文档和可行性研究报告。
  • 需求分析:对需求方提出的所有需求,进行详细的分析。这个阶段一般需要和客户反复确认,以保证能充分理解客户需求。最终会形成需求分析文档。
  • 软件设计:根据需求分析的结果,对整个软件系统进行抽象和设计,如系统框架设计,数据库设计等等。最后会形成架构设计文档。
  • 程序编码:将架构设计和界面设计的结果转换成计算机能运行的程序代码。
  • 软件测试:在编码完成后,对可运行的结果对照需求分析文档进行严密的测试。如果测试发现问题,需要修复。最终测试完成后,形成测试报告。
  • 运行维护:在软件开发完成,正式运行投入使用。后续需要继续维护,修复错误和增加功能。交付时需要提供使用说明文档。

瀑布模型之外,还有哪些开发模型?

快速原型模型

快速原型模型,就是为了要解决客户的需求不明确和需求多变的问题。

先迅速建造一个可以运行的软件原型,然后收集用户反馈,再反复修改确认,使开发出的软件能真正反映用户需求,这种开发模型就叫快速原型模型,也叫原型模型。

原型模型因为能快速修改,所以能快速对用户的反馈和变更作出响应,同时原型模型注重和客户的沟通,所以最终开发出来的软件能够真正反映用户的需求。

但这种快速原型开发往往是以牺牲质量为代价的。

增量模型

增量模型是把待开发的软件系统模块化,然后在每个小模块的开发过程中,应用一个小瀑布模型,对这个模块进行需求分析、设计、编码和测试。相对瀑布模型而言,增量模型周期更短,不需要一次性把整个软件产品交付给客户,而是分批次交付。

因为增量模型的根基是模块化,所以,如果系统不能模块化,那么将很难采用增量模型的模式来开发。另外,对模块的划分很抽象,这本身对于系统架构的水平是要求很高的。

基于这样的特点,增量模型主要适用于:需求比较清楚,能模块化的软件系统,并且可以按模块分批次交付。

迭代模型

迭代模型每次只设计和实现产品的一部分,然后逐步完成更多功能。每次设计和实现一个阶段叫做一个迭代。

在迭代模型中,整个项目被拆分成一系列小的迭代。通常一个迭代的时间都是固定的,不会太长,例如 2-4 周。每次迭代只实现一部分功能,做能在这个周期内完成的功能。

在一个迭代中都会包括需求分析、设计、实现和测试,类似于一个小瀑布模型。迭代结束时要完成一个可以运行的交付版本。

增量模型是按照功能模块来拆分;而迭代模型则是按照时间来拆分,看单位时间内能完成多少功能。

V 模型

V 模型适合外包项目。V 模型本质上还是瀑布模型,只不过它是更重视对每个阶段验收测试的过程模型。

针对从需求定义一直到编码阶段,每个阶段都有对应的测试验收。

螺旋模型

如果你现在要做一个风险很高的项目,客户可能随时不给你钱了。这种情况下,如果采用传统瀑布模型,无疑风险很高,可能做完的时候才发现客户给不了钱,损失就很大了!

这种情况,基于增量模型或者迭代模型进行开发,就可以有效降低风险。你需要注意的是,在每次交付的时候,要同时做一个风险评估,如果风险过大就不继续后续开发了,及时止损。

这种强调风险,以风险驱动的方式完善项目的开发模型就是螺旋模型。

敏捷开发到底是想解决什么问题?

敏捷开发是一套价值观和原则。

瀑布模型面向的是过程,而敏捷开发面向的是人。

大厂都在用哪些敏捷方法?(上)

一切工作任务围绕 Ticket 开展

  • 每一个任务的状态都可以被跟踪起来:什么时候开始做的,谁在做,做完没有。
  • 整个团队在做什么一目了然。
  • Ticket 和敏捷开发中的 Backlog(任务清单)正好结合起来,通过 Ticket 可以收集管理整个项目的 Backlog 和当前 Sprint(迭代)的 Backlog。

基于 Git 和 CI 的开发流程

Git 本来只是源代码管理工具,但是其强大的分支管理和灵活的权限控制,结合一定的开发流程,却可以帮助你很好的控制代码质量。

站立会议

  • 每个人轮流介绍一下,昨天干了什么事情,今天计划做什么事情,工作上有没有障碍无法推进。有问题,记录到“问题停车场”。
  • 检查最近的 Ticket,甄别一下优先级。有需要讨论的先收集到问题停车场。
  • 针对未讨论的问题展开讨论,能在会议时间内解决的问题,就马上解决,不能解决的会后再私下讨论或者再组织会议。

大厂都在用哪些敏捷方法?(下)

在分工上:

  • 产品经理:写需求设计文档,将需求整理成 Ticket,随时和项目成员沟通确认需求;
  • 开发人员:每天从看板上按照优先级从高到低领取 Ticket,完成日常开发任务;
  • 测试人员:测试已经部署到测试环境的程序,如果发现 Bug,提交 Ticket;
  • 项目经理:保障日常工作流程正常执行,让团队成员可以专注工作,提供必要的帮助,解决问题。

如何完成需求和修复 Bug?

日常工作,是围绕 Ticket 来开展的。所有的需求、Bug、任务都作为 Ticket 提交到项目的 Backlog,每个 Sprint 的任务都以看板的形式展现出来。

每个人手头事情忙完后,就可以去看板上的“To Do”栏,按照优先级从高到低选取新的 Ticket。选取后移动到“In Progress”栏。

每周一部署生产环境

参考资料

RocketMQ FAQ

API 问题

connect to <172.17.0.1:10909> failed

启动后,Producer 客户端连接 RocketMQ 时报错:

1
2
3
4
5
6
7
8
9
10
11
12
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.0.1:10909> failed
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:357)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:343)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:327)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:290)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:688)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:901)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:878)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:873)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:369)
at com.emrubik.uc.mdm.sync.utils.MdmInit.sendMessage(MdmInit.java:62)
at com.emrubik.uc.mdm.sync.utils.MdmInit.main(MdmInit.java:2149)

原因:RocketMQ 部署在虚拟机上,内网 ip 为 10.10.30.63,该虚拟机一个 docker0 网卡,ip 为 172.17.0.1。RocketMQ broker 启动时默认使用了 docker0 网卡,Producer 客户端无法连接 172.17.0.1,造成以上问题。

解决方案

(1)干掉 docker0 网卡或修改网卡名称

(2)停掉 broker,修改 broker 配置文件,重启 broker。

修改 conf/broker.conf,增加两行来指定启动 broker 的 IP:

1
2
namesrvAddr = 10.10.30.63:9876
brokerIP1 = 10.10.30.63

启动时需要指定配置文件

1
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &

话术

职场“黑话”

二字动词

复盘,赋能,加持,沉淀,倒逼,落地,串联,协同,反哺,兼容,包装,重组,履约,响应,量化,布局,联动,细分,梳理,输出,加速,共建,支撑,融合,聚合,集成,对标,聚焦,抓手,拆解,抽象,摸索,提炼,打通,打透,吃透,迁移,分发,分装,辐射,围绕,复用,渗透,扩展,开拓,皮实,共创,共建,解耦,集成,对齐,拉齐,对焦,给到,拿到,死磕

三字名词

感知度,方法论,组合拳,引爆点,点线面,精细化,差异化,平台化,结构化,影响力,耦合性,便捷性,一致性,端到端,短平快,护城河,体验感,颗粒度

四字名词

生命周期,价值转化,强化认知,资源倾斜,完善逻辑,抽离透传,复用打法,商业模式,快速响应,定性定量,关键路径,去中心化,结果导向,垂直领域,归因分析,体验度量,信息屏障,资源整合

术语应用

比如原来你提问:这个问题你准备怎么解决?

现在的你可以说:你这个问题的底层逻辑是什么?顶层设计在哪?最终交付价值是什么?过程的抓手在哪?如何保证回答闭环?你比别人的亮点在哪?优势在哪?你的思考和沉淀是什么?这个问题换成我来问是否会不一样?在这之前,有自己的思考和沉淀吗?这些问题的颗粒度是怎样拆分的,能作为爆点,引发回答者对问题关键路径的探索吗?别人回答了,你能反哺赋能他们,共建团队意识生态吗?只会问而不会解决,你有自己独有的价值吗?

比如你之前只会一脸懵逼的看着他,愣着不敢说话,现在你可以这么回复他:

我们这款产品底层逻辑是打通信息屏障,创建行业新生态。顶层设计是聚焦用户感知赛道,通过差异化和颗粒度达到引爆点。交付价值是在垂直领域采用复用打法达成持久收益。抽离透传归因分析作为抓手为产品赋能,体验度量作为闭环的评判标准。亮点是载体,优势是链路。思考整个生命周期,完善逻辑考虑资源倾斜。方法论是组合拳达到平台化标准。

模板

yes and 法则

当别人提出一个观点,自己不认同时,我们往往会说,yes,but,这样别人会觉得“那就这样吧,你说什么就是什么吧”的想法,无法形成有效的沟通。

yes-and 的原则首先是要接纳,而不是全身都是刺,即当别人提出观点的时候还没仔细思考就先给予否定,尤其当对方不认可攻击你时,很多人立马想到的就是反击。

然后关键是这里的 and,这里的 and 并不是转折或反驳,而是并且或附加的内容,可以巧妙地避免意见不同甚至冲突,所以 yes-and 不仅仅是做事方式,也是一种特别好的沟通方式,让对方感受很舒服。

【示例】

男朋友特别喜欢爬山,有一天男朋友邀请女生去爬山,女生说了下面的一段话 “我不爱爬上,但是我特别好奇你到山顶后看到的风景,你能爬上去之后能给我拍几样张照片吗?专门为我拍的,然后你下来以后,我在哪个咖啡馆等你,你跟我讲讲此次的经历!”

可以听出来,这位女生并不爱爬山,也没有勉强自己去迎合新的男朋友,而是肯定了男朋友的爱好,并且在此基础上创造了新的情景来继续他们的交流!

PREP 模型

PREP 模型用于表达观点

PREP 四个英文字母分别代表:Point,观点;Reason,理由;Example,案例;Point,再次讲观点。这是最经典的表达结构。

整个 PREP 结构的关键是,开始就要讲出你的观点,点明主题;后面再举出理由来论证观点;案例部分,最好讲自己的经历或故事来解释,这样听众比较容易听懂;最后再重复和强调一下你的观点。

SCQ-A 模型

SCQ-A 模型 用于提出问题,请求帮助

  • situation:阐述背景
  • conflict:阐述冲突
  • question:为了解决冲突,你提出要解决的问题
  • answer:你的看法

【示例】

老板,最近竞争对手上任了新的 CEO,做了一系列措施,比如下调了产品的价格,增大推广和营销,导致现在我们的很多市场被对方蚕食了。

我们现在该如何调整应应对当前的情况,保持市场上的领导位置。

目前我的看法就是优化目前的营销渠道,全面包围对手。

FFC 赞美法则

所谓 FFC 赞美法,就是指在赞美人的时候,先用自己的语言来表达感受(Feel),然后再进一步通过陈述事实(Fact)来论证自己的感受,最后再通过比较(Compare),来加深对对方的认可,这样对方会感觉特别好。

让对方服从你行为的经典话术

这个话术一般来讲模板是这样的: 是..还是…/是否/要不要,xxx 好处是这样。

这里的关键是尽量不让用户思考,提供非 A 即 B 的选项,说某个选项的好处,从而让对方服从你。

比如麦当劳、肯德基有 3 个经典话术,进行快推式产品营销:

  • 您是否要加一包薯条,这样可以凑成一个套餐,节省 2 元?(实际上多消费 5 元)
  • 您要不要加 3 元把可乐换成大杯,可以多一半哦?
  • 您要不要加 10 元买个玩具给小朋友呢?

沟通中的万能表达模型- 观察+感受+需求+请求

  • 观察:即你观察的客观事实是什么?注意这里要是事实才行
  • 感受:通过观察之后,你心情感受是怎样的?
  • 需求:你内心希望要解决的问题是什么?
  • 请求:向对方请求你的需求需要得到满足

FFA 法则

  • Fact——事实
  • Feeling——感受
  • Action——行动

最近一段团队不少新人加入,整体运营效率下降了 30%。(事实)我感觉主要是业务知识传递有些跟不上。(感受)接下来的一周,我会安排老员工一对一辅导每位新人。(行动)

实战

别人求你办事,如果你说:“这事儿不太好办”,那么资源置换就来了。

不好办说明能办,但需要附加条件,懂的人自然知道接下来应该怎么办。

拒绝借钱:“你知道的,我最近 XX,也没钱。”

遇到借钱,只要你平时不太露富,就好用。

朋友管你借钱,用“你知道的”直接把皮球踢回去,再给个具体理由,比如买了什么东西、投入基金股票里了,随便什么都行。

只要把对方放在知道你没钱的位置上,他就不好再开下一句口,一般会主动结束对话。

表示体贴,但不想真的去接客户,先打电话沟通:“约的地方有点远,需要我去接您吗?”

绝大部分人遇到这样的问句,本能反应是不用,你的目的就达到了。

不过如果真遇到要你接的,这人多半有点矫情,以后相处要注意多恭维一下。

经典汇报话术 1:“老板,我们团队做了 A、B 两版方案,各有优势,您给提提意见,看选哪个好。”

该话术利用了沉锚效应,抛出了二者选其一的锚,避开全部拒绝的选项,引导领导“选一个”、“提意见”,减小被全部驳回的风险。

经典汇报话术 2:“领导,我是这么想的,XXX。第一,X;第二,X;第三,X。”

我们的大脑被训练得听到“第一、第二”就默认为其中有条理、有逻辑,不管其中是不是真的严丝合缝地支撑你的观点。善用一二三做汇报,领导会觉得你准备得很充分,考虑周全。

遇到问题请求领导帮忙:“经过了解,现在碰到了一些情况,我的解决办法是 XX,您看还有没有什么更好的办法。”

不要直接说自己解决不了,让领导想办法。不管自己提出的办法多平庸,都一定要提。

请求他人帮忙:“能请你帮我打印一下文件吗,因为我一会儿真有事。”、“不好意思,能插一下队吗,因为我真的着急。”

善用“因为所以”,“因为所以”是十几年语文教学留给我们的条件反射,不管多离奇的理由,听到的时候都会默认有道理。

聊天想聊下去:揪住对方句子里的关键词+延伸过往彼此交流过的信息。

例 1

朋友:“今天又加班,烦死了。”

你:“怎么又加班啊,又是上回让你加班的那个领导吗?”

例 2

闺蜜:“我爱豆塌房子了!”

你:“哪个爱豆?上回你说的 XXX?我去!”

聊天不想继续聊下去:重复关键词+感叹词。重点!不要扩展任何有效信息。

例 1

朋友:“今天又加班,烦死了”

你:“怎么又加班啊,唉,这叫啥事啊,我无语。”

例 2

闺蜜:“我爱豆又塌房子了!”

你:“又塌房子!我去,也太那个了,绝了!”

领导说:“辛苦了。”

你:“从中学到很多,很有收获。”

敏而好学、不居功,领导更喜欢这样的下属。

给领导的节日问候:尊称+感谢+互动+祝福

尊称放在前面,引起注意。互动要具体、细节,才有记忆点。

XX,过年好!
感谢您一直的关照,从您身上学到很多。

上次 XX,您说 XXX 我一直记得,受益匪浅。

又到新的一年,祝您和家人新年快乐!

改变一个人的想法:认同立场,替换观点。

无论任何人,观点不是不可改变的,但立场很难动摇。

比如你的预算交上去,被砍了很大一块,你能做的不是抱怨老板抠门,而是认同老板砍预算是为了控制成本,开公司是为了赚钱的,这就是老板该有的立场。

所以,如果你不想自己的预算被砍掉,只有你能向老板展示,你做的方案能为他赚更多钱,老板就不会不同意。

工作汇报

实情 话术
新项目玩砸了 进行了积极的试错,吸收了宝贵的经验。
数据不好看 有较大的增长空间
啥也没干 稳定发展
接下来,我依旧打算啥也不干 保持现有成绩,稳定成果
数据稍微好看一点 取得了较大增长

沟通

当你觉得对方特别啰嗦,又不好意思打断对方谈话

直接给对方说的话下定义,作评判。例如:“好的”、“那确实不错”、“的确是这样”、“嗯,你说的对” 等等。

对方会瞬间失去表达欲望。

当你接到你不想做的任务时

  • 我仔细看了一下这个需求,我这里可能存在 XXX XXX XXX 方面的短板
  • “领导,我仔细看了一下这个需求,我这里可能存在 XXX XXX XXX 方面的短板”
  • “想要推进这件事情的话,我可能需要 XXX XXX XXX 方面的支持”

好的可能:领导感觉你是认真经过调研,分析了可行性,确实你不太适合,他表示会再考虑考虑

坏的可能:你还是得做,但是这句话的意思已经很明确了:我可是给你说清楚了啊,这件事办砸了,你可不能赖我

当我想刺探什么秘密的时候,我会先说一个结论,然后看对方的反应

  • “我们公司下个月要发奖金你知道对吧,想好怎么花了么?”
    • “卧槽你这消息灵通啊”——真的要发
    • “你听谁说的”——不知道真假,但是可以继续测
  • 这个时候我会再补一句:“我能得到这个消息,说明肯定有人放风给我”
    • “胡扯,没有这个规划”——下个月不发奖金
    • “你每天花花肠子怎么这么多”——没有明确的回应这个问题,我的猜测 80%是真实的

临时要交什么汇报,或者做工作总结的时候,不要紧张

【你日常都在做什么】+【这件事的目的是什么】+【你在做这件事的时候有什么困难】+【怎么把这件事做的更好】

比如:

我最近在更新我的知乎账号,坚持回答问题,这是为了能够积累更多的粉丝数,获得更多的认同,也培养自己输出的习惯

但是我发现我的阅读量上去了,但是点赞量还没有起来

所以我决定在写到这一段的时候给读者老爷们磕个头求个赞,哐哐哐

想要说服别人的时候

注意两点:

  • 先肯定对方的想法

  • 尽量不要出现比较主观的用语,如:“我觉得”、“照我看”、“我认为”

【示例】我仔细听了你的诉求,很有道理,这个需求是可以理解的。但是,我们换个角度想一想:……。而且,我还听别人说:…..。所以,不妨折中一下:…..。你觉得这样如何呢?

当别人给你布置杂活的时候

我们要死扣细节,不停问细节:

  • “你说的这件事大概什么时候需要?”

  • “这个时间点具体到几点?”

  • “我是微信给你还是邮箱给你?”

  • “那需要我先给一个计划,你帮我看看合适不合适么?”

  • ……

一个杂活而已,你不停地追问细节,会极大地增加你们之间的沟通成本,让对方崩溃,从此再也不想给你安排杂活。

参考资料

《职场求生攻略》笔记

学会如何工作,和学习技术同等重要

以利益为视角,以换位思考为手段。

优先级:工作中那么多事情,我要如何安排优先级?

基于工作性质安排优先级

工作可以划分为:

业务拓展:需求分析、设计、开发都属于这范畴。

安全问题:安全无小事。要高度重视安全问题。

线上问题:直接影响用户体验和权益。要第一优先级去处理。

基于合作安排优先级

事情如果没有明显的轻重缓急,优先做那些会阻塞别人工作的事情。

做事情本身的优先级

我们做事情的时候,如果能把其中的每一步都想清楚,理清依赖关系,安排得井井有条,这就已经事半功倍了。

沟通:邮件那么重要,你还在轻视邮件吗?

邮件的特性

  1. 异步交流:邮件是一种异步交流的方式,双方有足够的时间准备邮件内容。
  2. 无法修改:邮件内容无法修改,这是邮件可靠的基石。
  3. 方便扩散:邮件有邮件组,可以很方便地把相关人员加进来,并且保留邮件历史记录。

邮件是公司内部的合同

场景 1:设计确认(邮件的“确认”功能)

场景 2:优先级(邮件的“证据链”功能)

场景 3:大促(邮件的“沟通协调”功能)

场景 4:新业务接入(邮件的“防遗忘”功能)

场景 5:技术升级和 Bug 修复(邮件的“广而告之”功能)

邮件的魅力

我们都是普通人,普通人没有“背锅”的压力,就没有持久的把事情做好的动力。

沟通:程序员为什么应该爱上交流?

主观能动性:为什么程序员,需要发挥主观能动性?

责任的边界:程序员的职责范围仅仅只是被安排的任务吗?

RocketMQ 基本原理

原理

分布式消息系统作为实现分布式系统可扩展、可伸缩性的关键组件,需要具有高吞吐量、高可用等特点。而谈到消息系统的设计,就回避不了两个问题:

  1. 消息的顺序问题
  2. 消息的重复问题

顺序消息

第一种模型

假如生产者产生了 2 条消息:M1、M2,要保证这两条消息的顺序,应该怎样做?你脑中想到的可能是这样:

假定 M1 发送到 S1,M2 发送到 S2,如果要保证 M1 先于 M2 被消费,那么需要 M1 到达消费端被消费后,通知 S2,然后 S2 再将 M2 发送到消费端。

这个模型存在的问题是,如果 M1 和 M2 分别发送到两台 Server 上,就不能保证 M1 先达到 MQ 集群,也不能保证 M1 被先消费。换个角度看,如果 M2 先于 M1 达到 MQ 集群,甚至 M2 被消费后,M1 才达到消费端,这时消息也就乱序了,说明以上模型是不能保证消息的顺序的。

第二种模型

如何才能在 MQ 集群保证消息的顺序?一种简单的方式就是将 M1、M2 发送到同一个 Server 上:

这样可以保证 M1 先于 M2 到达 MQServer(生产者等待 M1 发送成功后再发送 M2),根据先达到先被消费的原则,M1 会先于 M2 被消费,这样就保证了消息的顺序。

这个模型也仅仅是理论上可以保证消息的顺序,在实际场景中可能会遇到下面的问题:

只要将消息从一台服务器发往另一台服务器,就会存在网络延迟问题。如上图所示,如果发送 M1 耗时大于发送 M2 的耗时,那么 M2 就仍将被先消费,仍然不能保证消息的顺序。即使 M1 和 M2 同时到达消费端,由于不清楚消费端 1 和消费端 2 的负载情况,仍然有可能出现 M2 先于 M1 被消费的情况。

如何解决这个问题?将 M1 和 M2 发往同一个消费者,且发送 M1 后,需要消费端响应成功后才能发送 M2。

这可能产生另外的问题:如果 M1 被发送到消费端后,消费端 1 没有响应,那是继续发送 M2 呢,还是重新发送 M1?一般为了保证消息一定被消费,肯定会选择重发 M1 到另外一个消费端 2,就如下图所示。

这样的模型就严格保证消息的顺序,细心的你仍然会发现问题,消费端 1 没有响应 Server 时有两种情况,一种是 M1 确实没有到达(数据在网络传送中丢失),另外一种消费端已经消费 M1 且已经发送响应消息,只是 MQ Server 端没有收到。如果是第二种情况,重发 M1,就会造成 M1 被重复消费。也就引入了我们要说的第二个问题,消息重复问题,这个后文会详细讲解。

回过头来看消息顺序问题,严格的顺序消息非常容易理解,也可以通过文中所描述的方式来简单处理。总结起来,要实现严格的顺序消息,简单且可行的办法就是:

保证生产者 - MQServer - 消费者是一对一对一的关系。

这样的设计虽然简单易行,但也会存在一些很严重的问题,比如:

  1. 并行度就会成为消息系统的瓶颈(吞吐量不够)
  2. 更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。

RocketMQ 的解决方案:通过合理的设计或者将问题分解来规避。如果硬要把时间花在解决问题本身,实际上不仅效率低下,而且也是一种浪费。从这个角度来看消息的顺序问题,我们可以得出两个结论:

  1. 不关注乱序的应用实际大量存在
  2. 队列无序并不意味着消息无序

最后我们从源码角度分析 RocketMQ 怎么实现发送顺序消息。

RocketMQ 通过轮询所有队列的方式来确定消息被发送到哪一个队列(负载均衡策略)。比如下面的示例中,订单号相同的消息会被先后发送到同一个队列中:

1
2
3
4
5
6
7
8
9
10
11
// RocketMQ 通过 MessageQueueSelector 中实现的算法来确定消息发送到哪一个队列上
// RocketMQ 默认提供了两种 MessageQueueSelector 实现:随机/Hash
// 当然你可以根据业务实现自己的 MessageQueueSelector 来决定消息按照何种策略发送到消息队列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);

在获取到路由信息以后,会根据 MessageQueueSelector 实现的算法来选择一个队列,同一个 OrderId 获取到的肯定是同一个队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
private SendResult send()  {
// 获取topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
// 根据我们的算法,选择一个发送队列
// 这里的arg = orderId
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}
}
}

消息重复

造成消息重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

  1. 消费端处理消息的业务逻辑保持幂等性。
  2. 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。

第 1 条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。

第 2 条原理就是利用一张日志表来记录已经处理成功的消息的 ID,如果新到的消息 ID 已经在日志表中,那么就不再处理这条消息。

第 1 条解决方案,很明显应该在消费端实现,不属于消息系统要实现的功能。

第 2 条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是 RocketMQ 不解决消息重复的问题的原因。

RocketMQ 不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。

事务消息

RocketMQ 除了支持普通消息,顺序消息,另外还支持事务消息。

假设这样的场景:

img

图中执行本地事务(Bob 账户扣款)和发送异步消息应该保证同时成功或者同时失败,也就是扣款成功了,发送消息一定要成功,如果扣款失败了,就不能再发送消息。那问题是:我们是先扣款还是先发送消息呢?

img

RocketMQ 分布式事务步骤:

发送 Prepared 消息 2222222222222222222,并拿到接受消息的地址。
执行本地事务
通过第 1 步骤拿到的地址去访问消息,并修改消息状态。

参考资料

《Kafka 核心源码解读》笔记

开篇词

从功能上讲,Kafka 源码分为四大模块。

  • 服务器端源码:实现 Kafka 架构和各类优秀特性的基础。
  • Java 客户端源码:定义了与 Broker 端的交互机制,以及通用的 Broker 端组件支撑代码。
  • Connect 源码:用于实现 Kafka 与外部系统的高性能数据传输。
  • Streams 源码:用于实现实时的流处理功能。

导读

构建 Kafka 工程和源码阅读环境、Scala 语言热身

kafka 项目主要目录结构

  • bin 目录:保存 Kafka 工具行脚本,我们熟知的 kafka-server-start 和 kafka-consoleproducer 等脚本都存放在这里。
  • clients 目录:保存 Kafka 客户端代码,比如生产者和消费者的代码都在该目录下。
  • config 目录:保存 Kafka 的配置文件,其中比较重要的配置文件是 server.properties。
  • connect 目录:保存 Connect 组件的源代码。我在开篇词里提到过,Kafka Connect 组件是用来实现 Kafka 与外部系统之间的实时数据传输的。
  • core 目录:保存 Broker 端代码。Kafka 服务器端代码全部保存在该目录下。
  • streams 目录:保存 Streams 组件的源代码。Kafka Streams 是实现 Kafka 实时流处理的组件。

日志段

保存消息文件的对象是怎么实现的?

Kafka 日志结构

Kafka 日志对象由多个日志段对象组成,而每个日志段对象会在磁盘上创建一组文件,包括消息日志文件(.log)位移索引文件(.index)时间戳索引文件(.timeindex)以及已中止(Aborted)事务的索引文件(.txnindex)。当然,如果你没有使用 Kafka 事务,已中止事务的索引文件是不会被创建出来的。

一个 Kafka 主题有很多分区,每个分区就对应一个 Log 对象,在物理磁盘上则对应于一个子目录。比如你创建了一个双分区的主题 test-topic,那么,Kafka 在磁盘上会创建两个子目录:test-topic-0 和 test-topic-1。而在服务器端,这就是两个 Log 对象。每个子目录下存在多组日志段,也就是多组 .log.index.timeindex 文件组合,只不过文件名不同,因为每个日志段的起始位移不同。

日志段源码解析

日志段源码位于 Kafka 的 core 工程的 LogSegment.scala 中。该文件下定义了三个 Scala 对象:

  • LogSegment class:日志段类
  • LogSegment object:保存静态变量或静态方法。相当于 LogSegment class 的工具类。
  • LogFlushStats object:尾部有个 stats,用于统计,负责为日志落盘进行计时。

LogSegment class 声明

1
2
3
4
5
6
7
8
class LogSegment private[log] (val log: FileRecords,
val lazyOffsetIndex: LazyIndex[OffsetIndex],
val lazyTimeIndex: LazyIndex[TimeIndex],
val txnIndex: TransactionIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val time: Time) extends Logging { ... }

参数说明:

  • log包含日志条目的文件记录FileRecords 就是实际保存 Kafka 消息的对象。
  • lazyOffsetIndex偏移量索引
  • lazyTimeIndex时间戳索引
  • txnIndex事务索引
  • baseOffset此段中偏移量的下限。事实上,在磁盘上看到的 Kafka 文件名就是 baseOffset 的值。每个 LogSegment 对象实例一旦被创建,它的起始位移就是固定的了,不能再被更改。
  • indexIntervalBytes索引中条目之间的近似字节数。indexIntervalBytes 值其实就是 Broker 端参数 log.index.interval.bytes 值,它控制了日志段对象新增索引项的频率。默认情况下,日志段至少新写入 4KB 的消息数据才会新增一条索引项。
  • rollJitterMs日志段对象新增倒计时的“扰动值”。因为目前 Broker 端日志段新增倒计时是全局设置,这就是说,在未来的某个时刻可能同时创建多个日志段对象,这将极大地增加物理磁盘 I/O 压力。有了 rollJitterMs 值的干扰,每个新增日志段在创建时会彼此岔开一小段时间,这样可以缓解物理磁盘的 I/O 负载瓶颈。
  • time:**Timer 实例**。

append 方法

append 方法接收 4 个参数:分别表示

  • largestOffset:待写入消息批次中消息的最大位移值
  • largestTimestamp:最大时间戳
  • shallowOffsetOfMaxTimestamp:最大时间戳对应消息的位移
  • records:真正要写入的消息集合

  • 第一步:在源码中,首先调用 log.sizeInBytes 方法判断该日志段是否为空,如果是空的话, Kafka 需要记录要写入消息集合的最大时间戳,并将其作为后面新增日志段倒计时的依据。
  • 第二步:代码调用 ensureOffsetInRange 方法确保输入参数最大位移值是合法的。那怎么判断是不是合法呢?标准就是看它与日志段起始位移的差值是否在整数范围内,即 largestOffset - baseOffset 的值是不是 介于 [0,Int.MAXVALUE] 之间。在极个别的情况下,这个差值可能会越界,这时, append 方法就会抛出异常,阻止后续的消息写入。一旦你碰到这个问题,你需要做的是升级你的 Kafka 版本,因为这是由已知的 Bug 导致的。
  • 第三步:待这些做完之后,append 方法调用 FileRecordsappend 方法执行真正的写入。它的工作是将内存中的消息对象写入到操作系统的页缓存就可以了。
  • 第四步:再下一步,就是更新日志段的最大时间戳以及最大时间戳所属消息的位移值属性。每个日志段都要保存当前最大时间戳信息和所属消息的位移信息。还记得 Broker 端提供定期删除日志的功能吗?比如我只想保留最近 7 天的日志,没错,当前最大时间戳这个值就是判断的依据;而最大时间戳对应的消息的位移值则用于时间戳索引项。虽然后面我会详细介绍,这里我还是稍微提一下:时间戳索引项保存时间戳与消息位移的对应关系。在这步操作中,Kafka 会更新并保存这组对应关系。
  • 第五步:append 方法的最后一步就是更新索引项和写入的字节数了。我在前面说过,日志段每写入 4KB 数据就要写入一个索引项。当已写入字节数超过了 4KB 之后,append 方法会调用索引对象的 append 方法新增索引项,同时清空已写入字节数,以备下次重新累积计算。

read 方法

read 方法作用:从第一个偏移量 >= startOffset 的 Segment 开始读取消息集。如果指定了 maxOffset,则消息集将包含不超过 maxSize 字节,并将在 maxOffset 之前结束。

read 方法入参

  • startOffset:要读取的第一条消息的位移;
  • maxSize:能读取的最大字节数;
  • maxPosition :能读到的最大文件位置;
  • minOneMessage:是否允许在消息体过大时至少返回第一条消息。

read 方法代码逻辑:

  1. 调用 translateOffset 方法定位要读取的起始文件位置 (startPosition)。输入参数 startOffset 仅仅是位移值,Kafka 需要根据索引信息找到对应的物理文件位置才能开始读取消息。
  2. 待确定了读取起始位置,日志段代码需要根据这部分信息以及 maxSizemaxPosition 参数共同计算要读取的总字节数。举个例子,假设 maxSize=100,maxPosition=300,startPosition=250,那么 read 方法只能读取 50 字节,因为 maxPosition - startPosition = 50。我们把它和 maxSize 参数相比较,其中的最小值就是最终能够读取的总字节数。
  3. 调用 FileRecordsslice 方法,从指定位置读取指定大小的消息集合。

recover 方法

recover 开始时,代码依次调用索引对象的 reset 方法清空所有的索引文件,之后会开始遍历日志段中的所有消息集合或消息批次(RecordBatch)。对于读取到的每个消息集合,日志段必须要确保它们是合法的,这主要体现在两个方面:

  • 该集合中的消息必须要符合 Kafka 定义的二进制格式;
  • 该集合中最后一条消息的位移值不能越界,即它与日志段起始位移的差值必须是一个正整数值。

校验完消息集合之后,代码会更新遍历过程中观测到的最大时间戳以及所属消息的位移值。同样,这两个数据用于后续构建索引项。再之后就是不断累加当前已读取的消息字节数,并根据该值有条件地写入索引项。最后是更新事务型 Producer 的状态以及 Leader Epoch 缓存。不过,这两个并不是理解 Kafka 日志结构所必需的组件,因此,我们可以忽略它们。

遍历执行完成后,Kafka 会将日志段当前总字节数和刚刚累加的已读取字节数进行比较,如果发现前者比后者大,说明日志段写入了一些非法消息,需要执行截断操作,将日志段大小调整回合法的数值。同时, Kafka 还必须相应地调整索引文件的大小。把这些都做完之后,日志段恢复的操作也就宣告结束了。

日志

日志是日志段的容器,里面定义了很多管理日志段的操作。

Log 源码结构

  • LogAppendInfo(C):保存消息元数据信息
  • LogAppendInfo(O):LogAppendInfo(C)工厂方法类
  • UnifiedLog(C):UnifiedLog.scala 中最核心的代码
  • UnifiedLog(O):UnifiedLog(C)工厂方法类
  • RollParams(C):用于控制日志段是否切分(Roll)的数据结构。
  • RollParams(O):RollParams 伴生类的工厂方法。
  • LogMetricNames(O):定义了 Log 对象的监控指标。
  • LogOffsetSnapshot(C):封装分区所有位移元数据的容器类。
  • LogReadInfo(C):封装读取日志返回的数据及其元数据。
  • CompletedTxn(C):记录已完成事务的元数据,主要用于构建事务索引。

Log Class & Object

Log Object 作用:

  • 定义了 Kafka 支持的文件类型
    • .log:Kafka 日志文件
    • .index:Kafka 偏移量索引文件
    • .timeindex:Kafka 时间戳索引文件
    • .txnindex:Kafka 事务索引文件
    • .snapshot:Kafka 为幂等型或事务型 Producer 所做的快照文件
    • .deleted:被标记为待删除的文件
    • .cleaned:用于日志清理的临时文件
    • .swap:将文件交换到日志中时使用的临时文件
    • -delete:被标记为待删除的目录
    • -future:用于变更主题分区文件夹地址的目录
  • 定义了多种工具类方法

UnifiedLog Class 定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// UnifiedLog 定义
class UnifiedLog(@volatile var logStartOffset: Long,
private val localLog: LocalLog,
brokerTopicStats: BrokerTopicStats,
val producerIdExpirationCheckIntervalMs: Int,
@volatile var leaderEpochCache: Option[LeaderEpochFileCache],
val producerStateManager: ProducerStateManager,
@volatile private var _topicId: Option[Uuid],
val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup { ... }

// LocalLog 定义
class LocalLog(@volatile private var _dir: File,
@volatile private[log] var config: LogConfig,
private[log] val segments: LogSegments,
@volatile private[log] var recoveryPoint: Long,
@volatile private var nextOffsetMetadata: LogOffsetMetadata,
private[log] val scheduler: Scheduler,
private[log] val time: Time,
private[log] val topicPartition: TopicPartition,
private[log] val logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup { ... }

上面属性中最重要的两个属性是:_dirlogStartOffset_dir 就是这个日志所在的文件夹路径,也就是主题分区的路径。logStartOffset,表示日志的当前最早位移。_dirlogStartOffset 都是 volatile var 类型,表示它们的值是变动的,而且可能被多个线程更新。

Log End Offset(LEO),是表示日志下一条待插入消息的位移值,而这个 Log Start Offset 是跟它相反的,它表示日志当前对外可见的最早一条消息的位移值。

图中绿色的位移值 3 是日志的 Log Start Offset,而位移值 15 表示 LEO。另外,位移值 8 是高水位值,它是区分已提交消息和未提交消息的分水岭。

有意思的是,Log End Offset 可以简称为 LEO,但 Log Start Offset 却不能简称为 LSO。因为在 Kafka 中,LSO 特指 Log Stable Offset,属于 Kafka 事务的概念。

Log 类的其他属性你暂时不用理会,因为它们要么是很明显的工具类属性,比如 timer 和 scheduler,要么是高阶用法才会用到的高级属性,比如 producerStateManager 和 logDirFailureChannel。工具类的代码大多是做辅助用的,跳过它们也不妨碍我们理解 Kafka 的核心功能;而高阶功能代码设计复杂,学习成本高,性价比不高。

其他一些重要属性:

  • nextOffsetMetadata:它封装了下一条待插入消息的位移值,你基本上可以把这个属性和 LEO 等同起来。
  • highWatermarkMetadata:是分区日志高水位值。
  • segments:我认为这是 Log 类中最重要的属性。它保存了分区日志下所有的日志段信息,只不过是用 Map 的数据结构来保存的。Map 的 Key 值是日志段的起始位移值,Value 则是日志段对象本身。Kafka 源码使用 ConcurrentNavigableMap 数据结构来保存日志段对象,就可以很轻松地利用该类提供的线程安全和各种支持排序的方法,来管理所有日志段对象。
  • Leader Epoch Cache 对象。Leader Epoch 是社区于 0.11.0.0 版本引入源码中的,主要是用来判断出现 Failure 时是否执行日志截断操作(Truncation)。之前靠高水位来判断的机制,可能会造成副本间数据不一致的情形。这里的 Leader Epoch Cache 是一个缓存类数据,里面保存了分区 Leader 的 Epoch 值与对应位移值的映射关系,我建议你查看下 LeaderEpochFileCache 类,深入地了解下它的实现原理。

LOG 类初始化逻辑

Log 的常见操作

Log 的常见操作可以分为 4 类:

  • 高水位管理操作:高水位的概念在 Kafka 中举足轻重,对它的管理,是 Log 最重要的功能之一。
  • 日志段管理:Log 是日志段的容器。高效组织与管理其下辖的所有日志段对象,是源码要解决的核心问题。
  • 关键位移值管理:日志定义了很多重要的位移值,比如 Log Start Offset 和 LEO 等。确保这些位移值的正确性,是构建消息引擎一致性的基础。
  • 读写操作:所谓的操作日志,大体上就是指读写日志。读写操作的作用之大,不言而喻。

高水位管理操作

高水位定义:

1
@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)

高水位值是 volatile(易变型)的。因为多个线程可能同时读取它,因此需要设置成 volatile,保证内存可见性。另外,由于高水位值可能被多个线程同时修改,因此源码使用 Java Monitor 锁来确保并发修改的线程安全。

高水位值的初始值是 Log Start Offset 值。上节课我们提到,每个 Log 对象都会维护一个 Log Start Offset 值。当首次构建高水位时,它会被赋值成 Log Start Offset 值。

LogOffsetMetadata 定义

1
2
3
case class LogOffsetMetadata(messageOffset: Long,
segmentBaseOffset: Long = Log.UnknownOffset,
relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {

三个参数:

  • messageOffset:消息位移值,这是最重要的信息。我们总说高水位值,其实指的就是这个变量的值。
  • segmentBaseOffset:保存该位移值所在日志段的起始位移。日志段起始位移值辅助计算两条消息在物理磁盘文件中位置的差值,即两条消息彼此隔了多少字节。这个计算有个前提条件,即两条消息必须处在同一个日志段对象上,不能跨日志段对象。否则它们就位于不同的物理文件上,计算这个值就没有意义了。这里的 segmentBaseOffset,就是用来判断两条消息是否处于同一个日志段的。
  • relativePositionSegment:保存该位移值所在日志段的物理磁盘位置。这个字段在计算两个位移值之间的物理磁盘位置差值时非常有用。你可以想一想,Kafka 什么时候需要计算位置之间的字节数呢?答案就是在读取日志的时候。假设每次读取时只能读 1MB 的数据,那么,源码肯定需要关心两个位移之间所有消息的总字节数是否超过了 1MB。
获取和设置高水位值
1
2
3
4
5
6
7
8
9
10
11
private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
if (newHighWatermark.messageOffset < 0) // 高水位值不能是负数
throw new IllegalArgumentException("High watermark offset should be non-negative")

lock synchronized { // 保护Log对象修改的Monitor锁
highWatermarkMetadata = newHighWatermark // 赋值新的高水位值
producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)
maybeIncrementFirstUnstableOffset() // First Unstable Offset是Kafka事务机制
}
trace(s"Setting high watermark $newHighWatermark")
}
更新高水位值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def updateHighWatermark(hw: Long): Long = {
// 新高水位值一定介于[Log Start Offset,Log End Offset]之间
val newHighWatermark = if (hw < logStartOffset)
logStartOffset
else if (hw > logEndOffset)
logEndOffset
else
hw
// 设置高水位值
updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))
// 最后返回新高水位值
newHighWatermark
}

def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
// 新高水位值不能越过Log End Offset
if (newHighWatermark.messageOffset > logEndOffset)
throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
s"log end offset $logEndOffsetMetadata")

lock.synchronized {
val oldHighWatermark = fetchHighWatermarkMetadata // 获取老的高水位值

// 保证高水位单调递增。当新的偏移元数据位于较新的段上时,我们还会更新高水位线,每当日志滚动到新段时就会发生这种情况。
if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
(oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
updateHighWatermarkMetadata(newHighWatermark)
Some(oldHighWatermark) // 返回老的高水位值
} else {
None
}
}
}

这两个方法有着不同的用途。updateHighWatermark 方法,主要用在 Follower 副本从 Leader 副本获取到消息后更新高水位值。一旦拿到新的消息,就必须要更新高水位值;而 maybeIncrementHighWatermark 方法,主要是用来更新 Leader 副本的高水位值。需要注意的是,Leader 副本高水位值的更新是有条件的——某些情况下会更新高水位值,某些情况下可能不会。

读取高水位值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
checkIfMemoryMappedBufferClosed() // 读取时确保日志不能被关闭

val offsetMetadata = highWatermarkMetadata // 保存当前高水位值到本地变量
if (offsetMetadata.messageOffsetOnly) { // 没有获得到完整的高水位元数据
lock.synchronized {
// 给定消息偏移量,在日志中找到其对应的偏移量元数据。如果消息偏移量超出范围,则抛出异常
val fullOffset = convertToOffsetMetadataOrThrow(highWatermark)
updateHighWatermarkMetadata(fullOffset) // 然后再更新一下高水位对象
fullOffset
}
} else {
offsetMetadata
}
}

日志段管理

添加

1
2
@threadsafe
def addSegment(segment: LogSegment): LogSegment = this.segments.put(segment.baseOffset, segment)

删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def deleteOldSegments(): Int = {
if (config.delete) {
deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
} else {
deleteLogStartOffsetBreachedSegments()
}
}

private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
lock synchronized {
val deletable = deletableSegments(predicate)
if (deletable.nonEmpty)
info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
deleteSegments(deletable)
}
}

修改

源码里面不涉及修改日志段对象,所谓的修改或更新也就是替换而已,用新的日志段对象替换老的日志段对象。举个简单的例子。segments.put(1L, newSegment) 语句在没有 Key=1 时是添加日志段,否则就是替换已有日志段。

查询

主要都是利用了 ConcurrentSkipListMap 的现成方法。

  • segments.firstEntry:获取第一个日志段对象;
  • segments.lastEntry:获取最后一个日志段对象,即 Active Segment;
  • segments.higherEntry:获取第一个起始位移值 ≥ 给定 Key 值的日志段对象;
  • segments.floorEntry:获取最后一个起始位移值 ≤ 给定 Key 值的日志段对象。

关键位移值管理

Log 对象维护了一些关键位移值数据,比如 Log Start Offset、LEO 等。

Log 对象中的 LEO 永远指向下一条待插入消息,也就是说,LEO 值上面是没有消息的

1
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _

Log End Offset 对象被更新的时机:

  • 对象初始化时:当 Log 对象初始化时,我们必须要创建一个 LEO 对象,并对其进行初始化。
  • 写入新消息时:这个最容易理解。以上面的图为例,当不断向 Log 对象插入新消息时,LEO 值就像一个指针一样,需要不停地向右移动,也就是不断地增加。
  • Log 对象发生日志切分(Log Roll)时:日志切分是啥呢?其实就是创建一个全新的日志段对象,并且关闭当前写入的日志段对象。这通常发生在当前日志段对象已满的时候。一旦发生日志切分,说明 Log 对象切换了 Active Segment,那么,LEO 中的起始位移值和段大小数据都要被更新,因此,在进行这一步操作时,我们必须要更新 LEO 对象。
  • 日志截断(Log Truncation)时:这个也是显而易见的。日志中的部分消息被删除了,自然可能导致 LEO 值发生变化,从而要更新 LEO 对象。

Log Start Offset 被更新的时机:

  • Log 对象初始化时:和 LEO 类似,Log 对象初始化时要给 Log Start Offset 赋值,一般是将第一个日志段的起始位移值赋值给它。
  • 日志截断时:同理,一旦日志中的部分消息被删除,可能会导致 Log Start Offset 发生变化,因此有必要更新该值。
  • Follower 副本同步时:一旦 Leader 副本的 Log 对象的 Log Start Offset 值发生变化。为了维持和 Leader 副本的一致性,Follower 副本也需要尝试去更新该值。
  • 删除日志段时:这个和日志截断是类似的。凡是涉及消息删除的操作都有可能导致 LogStart Offset 值的变化。
  • 删除消息时:严格来说,这个更新时机有点本末倒置了。在 Kafka 中,删除消息就是通过抬高 Log Start Offset 值来实现的,因此,删除消息时必须要更新该值。

读写操作

写操作流程:

读操作

read 方法中有 4 个参数:

  • startOffset,即从 Log 对象的哪个位移值开始读消息。
  • maxLength,即最多能读取多少字节。
  • isolation,设置读取隔离级别,主要控制能够读取的最大位移值,多用于 Kafka 事务。
  • minOneMessage,即是否允许至少读一条消息。设想如果消息很大,超过了 maxLength,正常情况下 read 方法永远不会返回任何消息。但如果设置了该参数为 true,read 方法就保证至少能够返回一条消息。

索引

索引类图及源文件组织架构

  • AbstractIndex.scala:它定义了最顶层的抽象类,这个类封装了所有索引类型的公共操作。
  • LazyIndex.scala:它定义了 AbstractIndex 上的一个包装类,实现索引项延迟加载。这个类主要是为了提高性能。
  • OffsetIndex.scala:定义位移索引,保存“< 位移值,文件磁盘物理位置 >”对。
  • TimeIndex.scala:定义时间戳索引,保存“< 时间戳,位移值 >”对。
  • TransactionIndex.scala:定义事务索引,为已中止事务(Aborted Transcation)保存重要的元数据信息。只有启用 Kafka 事务后,这个索引才有可能出现。

AbstractIndex 代码结构

  • 索引文件(file)。每个索引对象在磁盘上都对应了一个索引文件。你可能注意到了,这个字段是 var 型,说明它是可以被修改的。难道索引对象还能动态更换底层的索引文件吗?是的。自 1.1.0 版本之后,Kafka 允许迁移底层的日志路径,所以,索引文件自然要是可以更换的。
  • 起始位移值(baseOffset)。索引对象对应日志段对象的起始位移值。举个例子,如果你查看 Kafka 日志路径的话,就会发现,日志文件和索引文件都是成组出现的。比如说,如果日志文件是 00000000000000000123.log,正常情况下,一定还有一组索引文件 00000000000000000123.index、00000000000000000123.timeindex 等。这里的“123”就是这组文件的起始位移值,也就是 baseOffset 值。
  • 索引文件最大字节数(maxIndexSize)。它控制索引文件的最大长度。Kafka 源码传入该参数的值是 Broker 端参数 segment.index.bytes 的值,即 10MB。这就是在默认情况下,所有 Kafka 索引文件大小都是 10MB 的原因。
  • 索引文件打开方式(writable)。“True”表示以“读写”方式打开,“False”表示以“只读”方式打开。

位移索引

位移索引也就是所谓的 OffsetIndex。Key 就是消息的相对位移,Value 是保存该消息的日志段文件中该消息第一个字节的物理文件位置。

参考资料

《分布式协议与算法实战》笔记

拜占庭将军问题

拜占庭将军问题是由莱斯利·兰波特在其同名论文中提出的分布式对等网络通信容错问题。其实是借拜占庭将军的例子,抛出了分布式共识性问题,并探讨和论证了解决的方法。

分布式计算中,不同的节点通过通讯交换信息达成共识而按照同一套协作策略行动。但有时候,系统中的节点可能出错而发送错误的信息,用于传递信息的通讯网络也可能导致信息损坏,使得网络中不同的成员关于全体协作的策略得出不同结论,从而破坏系统一致性。拜占庭将军问题被认为是容错性问题中最难的问题类型之一。

问题描述

一群拜占庭将军各领一支军队共同围困一座城市。

为了简化问题,军队的行动策略只有两种:进攻(Attack,后面简称 A)或 撤退(Retreat,后面简称 R)。如果这些军队不是统一进攻或撤退,就可能因兵力不足导致失败。因此,将军们通过投票来达成一致策略:同进或同退

因为将军们分别在城市的不同方位,所以他们只能通过信使互相联系。在投票过程中,每位将军都将自己的投票信息(A 或 R)通知其他所有将军,这样一来每位将军根据自己的投票和其他所有将军送来的信息就可以分析出共同的投票结果而决定行动策略。

这个抽象模型的问题在于:将军中可能存在叛徒,他们不仅会发出误导性投票,还可能选择性地发送投票信息

由于将军之间需要通过信使通讯,叛变将军可能通过伪造信件来以其他将军的身份发送假投票。而即使在保证所有将军忠诚的情况下,也不能排除信使被敌人截杀,甚至被敌人间谍替换等情况。因此很难通过保证人员可靠性及通讯可靠性来解决问题。

假使那些忠诚(或是没有出错)的将军仍然能通过多数决定来决定他们的战略,便称达到了拜占庭容错。在此,票都会有一个默认值,若消息(票)没有被收到,则使用此默认值来投票。

上述的故事可以映射到分布式系统中,_将军代表分布式系统中的节点;信使代表通信系统;叛徒代表故障或异常_。

img

问题分析

兰伯特针对拜占庭将军问题,给出了两个解决方案:口头协议和书面协议。

本文介绍一下口头协议。

在口头协议中,拜占庭将军问题被简化为将军 - 副官模型,其核心规则如下:

  • 忠诚的副官遵守同一命令。
  • 若将军是忠诚的,所有忠诚的副官都执行他的命令。
  • 如果叛徒人数为 m,将军人数不能少于 3m + 1 ,那么拜占庭将军问题就能解决了。——关于这个公式,可以不必深究,如果对推导过程感兴趣,可以参考论文。

示例一、叛徒人数为 1,将军人数为 3

img

这个示例中,将军人数不满足 3m + 1,无法保证忠诚的副官都执行将军的命令。

示例二、叛徒人数为 1,将军人数为 4

img

这个示例中,将军人数满足 3m + 1,无论是副官中有叛徒,还是将军是叛徒,都能保证忠诚的副官执行将军的命令。

CAP 理论

CAP 是指:在一个分布式系统中, 一致性、可用性和分区容忍性,最多只能同时满足其中两项。

  • 一致性(C:Consistency):多个数据副本是否能保持一致
  • 可用性(A:Availability):分布式系统在面对各种异常时可以提供正常服务的能力
  • 分区容忍性(P:Partition Tolerance):分布式系统在遇到任何网络分区故障的时候,仍然需要能对外提供一致性和可用性的服务,除非是整个网络环境都发生了故障

CAP 权衡

在分布式系统中,分区容忍性必不可少,因为需要总是假设网络是不可靠的;CAP 理论实际在是要在可用性和一致性之间做权衡。

  • CP:需要让所有节点下线成为不可用的状态,等待同步完成。
  • AP:在同步过程中允许读取所有节点的数据,但是数据可能不一致。

ACID 理论

ACID 特性:

  • 原子性(Atomicity)
    • 事务被视为不可分割的最小单元,事务中的所有操作要么全部提交成功,要么全部失败回滚。
    • 回滚可以用日志来实现,日志记录着事务所执行的修改操作,在回滚时反向执行这些修改操作即可。
  • 一致性(Consistency)
    • 数据库在事务执行前后都保持一致性状态。
    • 在一致性状态下,所有事务对一个数据的读取结果都是相同的。
  • 隔离性(Isolation)
    • 一个事务所做的修改在最终提交以前,对其它事务是不可见的。
  • 持久性(Durability)
    • 一旦事务提交,则其所做的修改将会永远保存到数据库中。即使系统发生崩溃,事务执行的结果也不能丢失。
    • 可以通过数据库备份和恢复来实现,在系统发生奔溃时,使用备份的数据库进行数据恢复。

img

在分布式系统中实现 ACID 比单机复杂的多。

在分布式系统中实现 ACID,即实现分布式事务,具体的方案有如下几种:

  • 两阶段提交(2PC)
  • 三阶段提交(3PC)
  • 补偿事务(TCC)
  • 本地消息表(异步确保)
  • MQ 事务消息
  • Sagas 事务模型

BASE 理论

BASE 理论是对 CAP 中一致性和可用性权衡的结果。

BASE 是指:即使无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式来使系统达到最终一致性。

BASE 特性

  • 基本可用(Basically Available):指分布式系统在出现故障的时候,保证核心可用,允许损失部分可用性。
  • 软状态(Soft State):指允许系统中的数据存在中间状态,并认为该中间状态不会影响系统整体可用性,即允许系统不同节点的数据副本之间进行同步的过程存在延时。
  • 最终一致性(Eventually Consistent):最终一致性强调的是系统中所有的数据副本,在经过一段时间的同步后,最终能达到一致的状态。

img

Paxos 算法

Paxos 是 Leslie Lamport 于 1990 年提出的一种基于消息传递且具有高度容错特性的共识(consensus)算法。

Paxos 算法包含 2 个部分:

  • Basic Paxos 算法:描述的多节点之间如何就某个值达成共识。
  • Multi Paxos 思想:描述的是执行多个 Basic Paxos 实例,就一系列值达成共识。

Paxos 算法解决的问题正是分布式共识性问题,即一个分布式系统中的各个进程如何就某个值(决议)达成一致。

Paxos 算法运行在允许宕机故障的异步系统中,不要求可靠的消息传递,可容忍消息丢失、延迟、乱序以及重复。它利用大多数 (Majority) 机制保证了 2N+1 的容错能力,即 2N+1 个节点的系统最多允许 N 个节点同时出现故障。

Basic Paxos 算法

角色

img

  • 提议者(Proposer):发出提案(Proposal),用于投票表决。Proposal 信息包括提案编号 (Proposal ID) 和提议的值 (Value)。在绝大多数场景中,集群中收到客户端请求的节点,才是提议者。这样做的好处是,对业务代码没有入侵性,也就是说,我们不需要在业务代码中实现算法逻辑。
  • 决策者(Acceptor):对每个 Proposal 进行投票,若 Proposal 获得多数 Acceptor 的接受,则称该 Proposal 被批准。一般来说,集群中的所有节点都在扮演决策者的角色,参与共识协商,并接受和存储数据。
  • 学习者(Learner):不参与决策,从 Proposers/Acceptors 学习、记录最新达成共识的提案(Value)。一般来说,学习者是数据备份节点,比如主从架构中的从节点,被动地接受数据,容灾备份。

在多副本状态机中,每个副本都同时具有 Proposer、Acceptor、Learner 三种角色。

这三种角色,在本质上代表的是三种功能:

  • 提议者代表的是接入和协调功能,收到客户端请求后,发起二阶段提交,进行共识协商;
  • 接受者代表投票协商和存储数据,对提议的值进行投票,并接受达成共识的值,存储保存;
  • 学习者代表存储数据,不参与共识协商,只接受达成共识的值,存储保存。

算法

Paxos 算法通过一个决议分为两个阶段(Learn 阶段之前决议已经形成):

  1. Prepare 阶段:Proposer 向 Acceptors 发出 Prepare 请求,Acceptors 针对收到的 Prepare 请求进行 Promise 承诺。
  2. Accept 阶段:Proposer 收到多数 Acceptors 承诺的 Promise 后,向 Acceptors 发出 Propose 请求,Acceptors 针对收到的 Propose 请求进行 Accept 处理。
  3. Learn 阶段:Proposer 在收到多数 Acceptors 的 Accept 之后,标志着本次 Accept 成功,决议形成,将形成的决议发送给所有 Learners。

Paxos 算法流程中的每条消息描述如下:

  • Prepare: Proposer 生成全局唯一且递增的 Proposal ID (可使用时间戳加 Server ID),向所有 Acceptors 发送 Prepare 请求,这里无需携带提案内容,只携带 Proposal ID 即可。

  • Promise: Acceptors 收到 Prepare 请求后,做出“两个承诺,一个应答”。

    • 两个承诺:

      • 不再接受 Proposal ID 小于等于当前请求的 Prepare 请求。
      • 不再接受 Proposal ID 小于当前请求的 Propose 请求。
    • 一个应答:

      • 不违背以前作出的承诺下,回复已经 Accept 过的提案中 Proposal ID 最大的那个提案的 Value 和 Proposal ID,没有则返回空值。
  • Propose: Proposer 收到多数 Acceptors 的 Promise 应答后,从应答中选择 Proposal ID 最大的提案的 Value,作为本次要发起的提案。如果所有应答的提案 Value 均为空值,则可以自己随意决定提案 Value。然后携带当前 Proposal ID,向所有 Acceptors 发送 Propose 请求。

  • Accept: Acceptor 收到 Propose 请求后,在不违背自己之前作出的承诺下,接受并持久化当前 Proposal ID 和提案 Value。

  • Learn: Proposer 收到多数 Acceptors 的 Accept 后,决议形成,将形成的决议发送给所有 Learners。

Multi Paxos 思想

Basic Paxos 的问题

Basic Paxos 有以下问题,导致它不能应用于实际:

  • Basic Paxos 算法只能对一个值形成决议
  • Basic Paxos 算法会消耗大量网络带宽。Basic Paxos 中,决议的形成至少需要两次网络通信,在高并发情况下可能需要更多的网络通信,极端情况下甚至可能形成活锁。如果想连续确定多个值,Basic Paxos 搞不定了。

Multi Paxos 的改进

Multi Paxos 正是为解决以上问题而提出。Multi Paxos 基于 Basic Paxos 做了两点改进:

  • 针对每一个要确定的值,运行一次 Paxos 算法实例(Instance),形成决议。每一个 Paxos 实例使用唯一的 Instance ID 标识。
  • 在所有 Proposer 中选举一个 Leader,由 Leader 唯一地提交 Proposal 给 Acceptor 进行表决。这样没有 Proposer 竞争,解决了活锁问题。在系统中仅有一个 Leader 进行 Value 提交的情况下,Prepare 阶段就可以跳过,从而将两阶段变为一阶段,提高效率。

Multi Paxos 首先需要选举 Leader,Leader 的确定也是一次决议的形成,所以可执行一次 Basic Paxos 实例来选举出一个 Leader。选出 Leader 之后只能由 Leader 提交 Proposal,在 Leader 宕机之后服务临时不可用,需要重新选举 Leader 继续服务。在系统中仅有一个 Leader 进行 Proposal 提交的情况下,Prepare 阶段可以跳过。

Multi Paxos 通过改变 Prepare 阶段的作用范围至后面 Leader 提交的所有实例,从而使得 Leader 的连续提交只需要执行一次 Prepare 阶段,后续只需要执行 Accept 阶段,将两阶段变为一阶段,提高了效率。为了区分连续提交的多个实例,每个实例使用一个 Instance ID 标识,Instance ID 由 Leader 本地递增生成即可。

Multi Paxos 允许有多个自认为是 Leader 的节点并发提交 Proposal 而不影响其安全性,这样的场景即退化为 Basic Paxos。

Chubby 和 Boxwood 均使用 Multi Paxos。ZooKeeper 使用的 Zab 也是 Multi Paxos 的变形。

Raft 算法

Raft 基础

Raft 将一致性问题分解成了三个子问题:

  • 选举 Leader
  • 日志复制
  • 安全性

服务器角色

在 Raft 中,任何时刻,每个服务器都处于这三个角色之一 :

  • Leader - 领导者,通常一个系统中是一主(Leader)多从(Follower)。Leader 负责处理所有的客户端请求
  • Follower - 跟随者,不会发送任何请求,只是简单的 响应来自 Leader 或者 Candidate 的请求
  • Candidate - 参选者,选举新 Leader 时的临时角色。

img

:bulb: 图示说明:

  • Follower 只响应来自其他服务器的请求。在一定时限内,如果 Follower 接收不到消息,就会转变成 Candidate,并发起选举。
  • Candidate 向 Follower 发起投票请求,如果获得集群中半数以上的选票,就会转变为 Leader。
  • 在一个 Term 内,Leader 始终保持不变,直到下线了。Leader 需要周期性向所有 Follower 发送心跳消息,以阻止 Follower 转变为 Candidate。

任期

img

Raft 把时间分割成任意长度的 任期(Term),任期用连续的整数标记。每一段任期从一次选举开始。Raft 保证了在一个给定的任期内,最多只有一个领导者

  • 如果选举成功,Leader 会管理整个集群直到任期结束。
  • 如果选举失败,那么这个任期就会因为没有 Leader 而结束。

不同服务器节点观察到的任期转换状态可能不一样

  • 服务器节点可能观察到多次的任期转换。
  • 服务器节点也可能观察不到任何一次任期转换。

任期在 Raft 算法中充当逻辑时钟的作用,使得服务器节点可以查明一些过期的信息(比如过期的 Leader)。每个服务器节点都会存储一个当前任期号,这一编号在整个时期内单调的增长。当服务器之间通信的时候会交换当前任期号。

  • 如果一个服务器的当前任期号比其他人小,那么他会更新自己的编号到较大的编号值。
  • 如果一个 Candidate 或者 Leader 发现自己的任期号过期了,那么他会立即恢复成跟随者状态。
  • 如果一个节点接收到一个包含过期的任期号的请求,那么他会直接拒绝这个请求。

RPC

Raft 算法中服务器节点之间的通信使用 **_远程过程调用(RPC)_**。

基本的一致性算法只需要两种 RPC:

  • RequestVote RPC - 请求投票 RPC,由 Candidate 在选举期间发起。
  • AppendEntries RPC - 附加条目 RPC,由 Leader 发起,用来复制日志和提供一种心跳机制。

选举 Leader

选举规则

Raft 使用一种心跳机制来触发 Leader 选举

Leader 需要周期性的向所有 Follower 发送心跳消息,以此维持自己的权威并阻止新 Leader 的产生。

每个 Follower 都设置了一个随机的竞选超时时间,一般为 150ms ~ 300ms,如果在竞选超时时间内没有收到 Leader 的心跳消息,就会认为当前 Term 没有可用的 Leader,并发起选举来选出新的 Leader。开始一次选举过程,Follower 先要增加自己的当前 Term 号,并转换为 Candidate

Candidate 会并行的向集群中的所有服务器节点发送投票请求(RequestVote RPC,它会保持当前状态直到以下三件事情之一发生:

  • 自己成为 Leader
  • 其他的服务器成为 Leader
  • 没有任何服务器成为 Leader
自己成为 Leader
  • 当一个 Candidate 从整个集群半数以上的服务器节点获得了针对同一个 Term 的选票,那么它就赢得了这次选举并成为 Leader。每个服务器最多会对一个 Term 投出一张选票,按照先来先服务(FIFO)的原则。_要求半数以上选票的规则确保了最多只会有一个 Candidate 赢得此次选举_。
  • 一旦 Candidate 赢得选举,就立即成为 Leader。然后它会向其他的服务器发送心跳消息来建立自己的权威并且阻止新的领导人的产生。
其他的服务器成为 Leader

等待投票期间,Candidate 可能会从其他的服务器接收到声明它是 Leader 的 AppendEntries RPC

  • 如果这个 Leader 的 Term 号(包含在此次的 RPC 中)不小于 Candidate 当前的 Term,那么 Candidate 会承认 Leader 合法并回到 Follower 状态。
  • 如果此次 RPC 中的 Term 号比自己小,那么 Candidate 就会拒绝这个消息并继续保持 Candidate 状态。
没有任何服务器成为 Leader

如果有多个 Follower 同时成为 Candidate,那么选票可能会被瓜分以至于没有 Candidate 可以赢得半数以上的投票。当这种情况发生的时候,每一个 Candidate 都会竞选超时,然后通过增加当前 Term 号来开始一轮新的选举。然而,没有其他机制的话,选票可能会被无限的重复瓜分。

Raft 算法使用随机选举超时时间的方法来确保很少会发生选票瓜分的情况,就算发生也能很快的解决。为了阻止选票起初就被瓜分,竞选超时时间是一个随机的时间,在一个固定的区间(例如 150-300 毫秒)随机选择,这样可以把选举都分散开。

  • 以至于在大多数情况下,只有一个服务器会超时,然后它赢得选举,成为 Leader,并在其他服务器超时之前发送心跳包。
  • 同样的机制也被用在选票瓜分的情况下:每一个 Candidate 在开始一次选举的时候会重置一个随机的选举超时时间,然后在超时时间内等待投票的结果;这样减少了在新的选举中另外的选票瓜分的可能性。

理解了上面的选举规则后,我们通过动图来加深认识。

日志复制

日志格式

日志由含日志索引(log index)的日志条目(log entry)组成。每个日志条目包含它被创建时的 Term 号(下图中方框中的数字),和一个复制状态机需要执行的指令。如果一个日志条目被复制到半数以上的服务器上,就被认为可以提交(Commit)了。

  • 日志条目中的 Term 号被用来检查是否出现不一致的情况。
  • 日志条目中的日志索引(一个整数值)用来表明它在日志中的位置。

img

Raft 日志同步保证如下两点:

  • 如果不同日志中的两个日志条目有着相同的日志索引和 Term,则它们所存储的命令是相同的
    • 这个特性基于这条原则:Leader 最多在一个 Term 内、在指定的一个日志索引上创建一条日志条目,同时日志条目在日志中的位置也从来不会改变。
  • 如果不同日志中的两个日志条目有着相同的日志索引和 Term,则它们之前的所有条目都是完全一样的
    • 这个特性由 AppendEntries RPC 的一个简单的一致性检查所保证。在发送 AppendEntries RPC 时,Leader 会把新日志条目之前的日志条目的日志索引和 Term 号一起发送。如果 Follower 在它的日志中找不到包含相同日志索引和 Term 号的日志条目,它就会拒绝接收新的日志条目。

日志复制流程

img

  1. Leader 负责处理所有客户端的请求。
  2. Leader 把请求作为日志条目加入到它的日志中,然后并行的向其他服务器发送 AppendEntries RPC 请求,要求 Follower 复制日志条目。
  3. Follower 复制成功后,返回确认消息。
  4. 当这个日志条目被半数以上的服务器复制后,Leader 提交这个日志条目到它的复制状态机,并向客户端返回执行结果。

注意:如果 Follower 崩溃或者运行缓慢,再或者网络丢包,Leader 会不断的重复尝试发送 AppendEntries RPC 请求 (尽管已经回复了客户端),直到所有的跟随者都最终复制了所有的日志条目。

日志一致性

一般情况下,Leader 和 Followers 的日志保持一致,因此日志条目一致性检查通常不会失败。然而,Leader 崩溃可能会导致日志不一致:旧的 Leader 可能没有完全复制完日志中的所有条目。

Leader 和 Follower 日志不一致的可能

Leader 和 Follower 可能存在多种日志不一致的可能。

img

:bulb: 图示说明:

上图阐述了 Leader 和 Follower 可能存在多种日志不一致的可能,每一个方框表示一个日志条目,里面的数字表示任期号 。

当一个 Leader 成功当选时,Follower 可能出现以下情况(a-f):

  • 存在未更新日志条目,如(a、b)。
  • 存在未提交日志条目,如(c、d)。
  • 两种情况都存在,如(e、f)。

_例如,场景 f 可能会这样发生,某服务器在 Term2 的时候是 Leader,已附加了一些日志条目到自己的日志中,但在提交之前就崩溃了;很快这个机器就被重启了,在 Term3 重新被选为 Leader,并且又增加了一些日志条目到自己的日志中;在 Term 2 和 Term 3 的日志被提交之前,这个服务器又宕机了,并且在接下来的几个任期里一直处于宕机状态_。

Leader 和 Follower 日志一致的保证

Leader 通过强制 Followers 复制它的日志来处理日志的不一致,Followers 上的不一致的日志会被 Leader 的日志覆盖

  • Leader 为了使 Followers 的日志同自己的一致,Leader 需要找到 Followers 同它的日志一致的地方,然后覆盖 Followers 在该位置之后的条目。
  • Leader 会从后往前试,每次日志条目失败后尝试前一个日志条目,直到成功找到每个 Follower 的日志一致位点,然后向后逐条覆盖 Followers 在该位置之后的条目。

安全性

前面描述了 Raft 算法是如何选举 Leader 和复制日志的。

Raft 还增加了一些限制来完善 Raft 算法,以保证安全性:保证了任意 Leader 对于给定的 Term,都拥有了之前 Term 的所有被提交的日志条目。

选举限制

拥有最新的已提交的日志条目的 Follower 才有资格成为 Leader。

Raft 使用投票的方式来阻止一个 Candidate 赢得选举除非这个 Candidate 包含了所有已经提交的日志条目。 Candidate 为了赢得选举必须联系集群中的大部分节点,这意味着每一个已经提交的日志条目在这些服务器节点中肯定存在于至少一个节点上。如果 Candidate 的日志至少和大多数的服务器节点一样新(这个新的定义会在下面讨论),那么他一定持有了所有已经提交的日志条目。

RequestVote RPC 实现了这样的限制:RequestVote RPC 中包含了 Candidate 的日志信息, Follower 会拒绝掉那些日志没有自己新的投票请求

如何判断哪个日志条目比较新?

Raft 通过比较两份日志中最后一条日志条目的日志索引和 Term 来判断哪个日志比较新。

  • 先判断 Term,哪个数值大即代表哪个日志比较新。
  • 如果 Term 相同,再比较 日志索引,哪个数值大即代表哪个日志比较新。

提交旧任期的日志条目

一个当前 Term 的日志条目被复制到了半数以上的服务器上,Leader 就认为它是可以被提交的。如果这个 Leader 在提交日志条目前就下线了,后续的 Leader 可能会覆盖掉这个日志条目。

img

💡 图示说明:

上图解释了为什么 Leader 无法对旧 Term 的日志条目进行提交。

  • 阶段 (a) ,S1 是 Leader,且 S1 写入日志条目为 (Term 2,日志索引 2),只有 S2 复制了这个日志条目。
  • 阶段 (b),S1 下线,S5 被选举为 Term3 的 Leader。S5 写入日志条目为 (Term 3,日志索引 2)。
  • 阶段 (c),S5 下线,S1 重新上线,并被选举为 Term4 的 Leader。此时,Term 2 的那条日志条目已经被复制到了集群中的大多数节点上,但是还没有被提交。
  • 阶段 (d),S1 再次下线,S5 重新上线,并被重新选举为 Term3 的 Leader。然后 S5 覆盖了日志索引 2 处的日志。
  • 阶段 (e),如果阶段 (d) 还未发生,即 S1 再次下线之前,S1 把自己主导的日志条目复制到了大多数节点上,那么在后续 Term 里面这些新日志条目就会被提交。这样在同一时刻就同时保证了,之前的所有旧日志条目就会被提交。

Raft 永远不会通过计算副本数目的方式去提交一个之前 Term 内的日志条目。只有 Leader 当前 Term 里的日志条目通过计算副本数目可以被提交;一旦当前 Term 的日志条目以这种方式被提交,那么由于日志匹配特性,之前的日志条目也都会被间接的提交。

当 Leader 复制之前任期里的日志时,Raft 会为所有日志保留原始的 Term,这在提交规则上产生了额外的复杂性。在其他的一致性算法中,如果一个新的领导人要重新复制之前的任期里的日志时,它必须使用当前新的任期号。Raft 使用的方法更加容易辨别出日志,因为它可以随着时间和日志的变化对日志维护着同一个任期编号。另外,和其他的算法相比,Raft 中的新领导人只需要发送更少日志条目(其他算法中必须在他们被提交之前发送更多的冗余日志条目来为他们重新编号)。

日志压缩

在实际的系统中,不能让日志无限膨胀,否则系统重启时需要花很长的时间进行恢复,从而影响可用性。Raft 采用对整个系统进行快照来解决,快照之前的日志都可以丢弃。

每个副本独立的对自己的系统状态生成快照,并且只能对已经提交的日志条目生成快照。

快照包含以下内容:

  • 日志元数据。最后一条已提交的日志条目的日志索引和 Term。这两个值在快照之后的第一条日志条目的 AppendEntries RPC 的完整性检查的时候会被用上。
  • 系统当前状态。

当 Leader 要发送某个日志条目,落后太多的 Follower 的日志条目会被丢弃,Leader 会将快照发给 Follower。或者新上线一台机器时,也会发送快照给它。

img

生成快照的频率要适中,频率过高会消耗大量 I/O 带宽;频率过低,一旦需要执行恢复操作,会丢失大量数据,影响可用性。推荐当日志达到某个固定的大小时生成快照。

生成一次快照可能耗时过长,影响正常日志同步。可以通过使用 copy-on-write 技术避免快照过程影响正常日志同步。

说明:本文仅阐述 Raft 算法的核心内容,不包括算法论证、评估等

一致性哈希算法

一致性哈希(Consistent Hash)算法的目标是:相同的请求尽可能落到同一个服务器上

一致性哈希 可以很好的解决 稳定性问题,可以将所有的 存储节点 排列在 首尾相接Hash 环上,每个 key 在计算 Hash 后会 顺时针 找到 临接存储节点 存放。而当有节点 加入退出 时,仅影响该节点在 Hash 环上 顺时针相邻后续节点

img

  • 相同的请求是指:一般在使用一致性哈希时,需要指定一个 key 用于 hash 计算,可能是:
    • 用户 ID
    • 请求方 IP
    • 请求服务名称,参数列表构成的串
  • 尽可能是指:服务器可能发生上下线,少数服务器的变化不应该影响大多数的请求。

当某台候选服务器宕机时,原本发往该服务器的请求,会基于虚拟节点,平摊到其它候选服务器,不会引起剧烈变动。

  • 优点

加入删除 节点只影响 哈希环顺时针方向相邻的节点,对其他节点无影响。

  • 缺点

加减节点 会造成 哈希环 中部分数据 无法命中。当使用 少量节点 时,节点变化 将大范围影响 哈希环数据映射,不适合 少量数据节点 的分布式方案。普通一致性哈希分区 在增减节点时需要 增加一倍减去一半 节点才能保证 数据负载的均衡

注意:因为 一致性哈希分区 的这些缺点,一些分布式系统采用 虚拟槽一致性哈希 进行改进,比如 Dynamo 系统。

Gossip 协议

Gossip 协议是集群中节点相互通信的内部通信技术。 Gossip 是一种高效、轻量级、可靠的节点间广播协议,用于传播数据。它是去中心化的、“流行病”的、容错的和点对点通信协议。

Goosip 协议的信息传播和扩散通常需要由种子节点发起。整个传播过程可能需要一定的时间,由于不能保证某个时刻所有节点都收到消息,但是理论上最终所有节点都会收到消息,因此它是一个最终一致性协议。

Gossip 的执行过程

Gossip 协议的执行过程:Gossip 过程是由种子节点发起,当一个种子节点有状态需要更新到网络中的其他节点时,它会随机的选择周围几个节点散播消息,收到消息的节点也会重复该过程,直至最终网络中所有的节点都收到了消息。这个过程可能需要一定的时间,由于不能保证某个时刻所有节点都收到消息,但是理论上最终所有节点都会收到消息,因此它是一个最终一致性协议。

Gossip 类型

Gossip 有两种类型:

  • **Anti-Entropy(反熵)**:以固定的概率传播所有的数据。Anti-Entropy 是 SI model,节点只有两种状态,Suspective 和 Infective,叫做 simple epidemics。
  • **Rumor-Mongering(谣言传播)**:仅传播新到达的数据。Rumor-Mongering 是 SIR model,节点有三种状态,Suspective,Infective 和 Removed,叫做 complex epidemics。

熵是物理学上的一个概念,代表杂乱无章,而反熵就是在杂乱无章中寻求一致。本质上,反熵是一种通过异步修复实现最终一致性的方法。反熵指的是集群中的节点,每隔段时间就随机选择某个其他节点,然后通过互相交换自己的所有数据来消除两者之间的差异,实现数据的最终一致性。由于消息会不断反复的交换,因此消息数量是非常庞大的,无限制的(unbounded),这对一个系统来说是一个巨大的开销。所以,反熵不适合动态变化或节点数比较多的分布式环境

谣言传播模型指的是当一个节点有了新数据后,这个节点变成活跃状态,并周期性地联系其他节点向其发送新数据,直到所有的节点都存储了该新数据。在谣言传播模型下,消息可以发送得更频繁,因为消息只包含最新 update,体积更小。而且,一个谣言消息在某个时间点之后会被标记为 removed,并且不再被传播,因此,谣言传播模型下,系统有一定的概率会不一致。而由于,谣言传播模型下某个时间点之后消息不再传播,因此消息是有限的,系统开销小。

一般来说,为了在通信代价和可靠性之间取得折中,需要将这两种方法结合使用。

Gossip 中的通信模式

在 Gossip 协议下,网络中两个节点之间有三种通信方式:

  • Push: 节点 A 将数据 (key,value,version) 及对应的版本号推送给 B 节点,B 节点更新 A 中比自己新的数据
  • Pull:A 仅将数据 key, version 推送给 B,B 将本地比 A 新的数据(Key, value, version)推送给 A,A 更新本地
  • Push/Pull:与 Pull 类似,只是多了一步,A 再将本地比 B 新的数据推送给 B,B 则更新本地

如果把两个节点数据同步一次定义为一个周期,则在一个周期内,Push 需通信 1 次,Pull 需 2 次,Push/Pull 则需 3 次。虽然消息数增加了,但从效果上来讲,Push/Pull 最好,理论上一个周期内可以使两个节点完全一致。直观上,Push/Pull 的收敛速度也是最快的。

Gossip 的优点

  • 扩展性:网络可以允许节点的任意增加和减少,新增加的节点的状态最终会与其他节点一致。
  • 容错:网络中任何节点的宕机和重启都不会影响 Gossip 消息的传播,Gossip 协议具有天然的分布式系统容错特性。
  • 去中心化:Gossip 协议不要求任何中心节点,所有节点都可以是对等的,任何一个节点无需知道整个网络状况,只要网络是连通的,任意一个节点就可以把消息散播到全网。
  • 一致性收敛:Gossip 协议中的消息会以一传十、十传百一样的指数级速度在网络中快速传播,因此系统状态的不一致可以在很快的时间内收敛到一致。消息传播速度达到了 logN。
  • 简单:Gossip 协议的过程极其简单,实现起来几乎没有太多复杂性。

Gossip 的缺陷

分布式网络中,没有一种完美的解决方案,Gossip 协议跟其他协议一样,也有一些不可避免的缺陷,主要是两个:

  • 消息的延迟:由于 Gossip 协议中,节点只会随机向少数几个节点发送消息,消息最终是通过多个轮次的散播而到达全网的,因此使用 Gossip 协议会造成不可避免的消息延迟。不适合用在对实时性要求较高的场景下。
  • 消息冗余:Gossip 协议规定,节点会定期随机选择周围节点发送消息,而收到消息的节点也会重复该步骤,因此就不可避免的存在消息重复发送给同一节点的情况,造成了消息的冗余,同时也增加了收到消息的节点的处理压力。而且,由于是定期发送,因此,即使收到了消息的节点还会反复收到重复消息,加重了消息的冗余。

QuorumNWR 算法

通过 Quorum NWR,你可以自定义一致性级别,通过临时调整写入或者查询的方式,当 W + R > N 时,就可以实现强一致性了。

Quorum NWR 的三要素

  • **N**:表示副本数,又叫做复制因子(Replication Factor)。也就是说,N 表示集群中同一份数据有多少个副本。在实现 Quorum NWR 的时候,你需要实现自定义副本的功能。也就是说,用户可以自定义指定数据的副本数。
  • **W**:又称写一致性级别(Write Consistency Level),表示成功完成 W 个副本更新,才完成写操作
  • **R**:又称读一致性级别(Read Consistency Level),表示读取一个数据对象时需要读 R 个副本。你可以这么理解,读取指定数据时,要读 R 副本,然后返回 R 个副本中最新的那份数据。

N、W、R 值的不同组合,会产生不同的一致性效果:

  • W + R > N 的时候,对于客户端来讲,整个系统能保证强一致性,一定能返回更新后的那份数据。
  • W + R < N 的时候,对于客户端来讲,整个系统只能保证最终一致性,可能会返回旧数据。

需要注意的是,副本数不能超过节点数:多副本的意义在于冗余备份,如果副本数超过节点数,就意味着在一个节点上会存在多个副本,那么冗余备份的意义就不大了。

PBFT 算法

PoW 算法

ZAB 协议

ZooKeeper 并没有直接采用 Paxos 算法,而是采用了名为 ZAB 的一致性协议。**ZAB 协议不是 Paxos 算法**,只是比较类似,二者在操作上并不相同。

ZAB 协议是 Zookeeper 专门设计的一种支持崩溃恢复的原子广播协议

ZAB 协议是 ZooKeeper 的数据一致性和高可用解决方案。

ZAB 协议定义了两个可以无限循环的流程:

  • 选举 Leader - 用于故障恢复,从而保证高可用。
  • 原子广播 - 用于主从同步,从而保证数据一致性。

选举 Leader

ZooKeeper 的故障恢复

ZooKeeper 集群采用一主(称为 Leader)多从(称为 Follower)模式,主从节点通过副本机制保证数据一致。

  • 如果 Follower 节点挂了 - ZooKeeper 集群中的每个节点都会单独在内存中维护自身的状态,并且各节点之间都保持着通讯,只要集群中有半数机器能够正常工作,那么整个集群就可以正常提供服务
  • 如果 Leader 节点挂了 - 如果 Leader 节点挂了,系统就不能正常工作了。此时,需要通过 ZAB 协议的选举 Leader 机制来进行故障恢复。

ZAB 协议的选举 Leader 机制简单来说,就是:基于过半选举机制产生新的 Leader,之后其他机器将从新的 Leader 上同步状态,当有过半机器完成状态同步后,就退出选举 Leader 模式,进入原子广播模式。

术语

  • myid - 每个 Zookeeper 服务器,都需要在数据文件夹下创建一个名为 myid 的文件,该文件包含整个 Zookeeper 集群唯一的 ID(整数)。
  • zxid - 类似于 RDBMS 中的事务 ID,用于标识一次更新操作的 Proposal ID。为了保证顺序性,该 zkid 必须单调递增。因此 Zookeeper 使用一个 64 位的数来表示,高 32 位是 Leader 的 epoch,从 1 开始,每次选出新的 Leader,epoch 加一。低 32 位为该 epoch 内的序号,每次 epoch 变化,都将低 32 位的序号重置。这样保证了 zkid 的全局递增性。

服务器状态

  • LOOKING - 不确定 Leader 状态。该状态下的服务器认为当前集群中没有 Leader,会发起 Leader 选举
  • FOLLOWING - 跟随者状态。表明当前服务器角色是 Follower,并且它知道 Leader 是谁
  • LEADING - 领导者状态。表明当前服务器角色是 Leader,它会维护与 Follower 间的心跳
  • OBSERVING - 观察者状态。表明当前服务器角色是 Observer,与 Folower 唯一的不同在于不参与选举,也不参与集群写操作时的投票

选票数据结构

每个服务器在进行领导选举时,会发送如下关键信息

  • logicClock - 每个服务器会维护一个自增的整数,名为 logicClock,它表示这是该服务器发起的第多少轮投票
  • state - 当前服务器的状态
  • self_id - 当前服务器的 myid
  • self_zxid - 当前服务器上所保存的数据的最大 zxid
  • vote_id - 被推举的服务器的 myid
  • vote_zxid - 被推举的服务器上所保存的数据的最大 zxid

投票流程

(1)自增选举轮次 - Zookeeper 规定所有有效的投票都必须在同一轮次中。每个服务器在开始新一轮投票时,会先对自己维护的 logicClock 进行自增操作。

(2)初始化选票 - 每个服务器在广播自己的选票前,会将自己的投票箱清空。该投票箱记录了所收到的选票。例:服务器 2 投票给服务器 3,服务器 3 投票给服务器 1,则服务器 1 的投票箱为(2, 3), (3, 1), (1, 1)。票箱中只会记录每一投票者的最后一票,如投票者更新自己的选票,则其它服务器收到该新选票后会在自己票箱中更新该服务器的选票。

(3)发送初始化选票 - 每个服务器最开始都是通过广播把票投给自己。

(4)接收外部投票 - 服务器会尝试从其它服务器获取投票,并记入自己的投票箱内。如果无法获取任何外部投票,则会确认自己是否与集群中其它服务器保持着有效连接。如果是,则再次发送自己的投票;如果否,则马上与之建立连接。

(5)判断选举轮次 - 收到外部投票后,首先会根据投票信息中所包含的 logicClock 来进行不同处理

  • 外部投票的 logicClock 大于自己的 logicClock。说明该服务器的选举轮次落后于其它服务器的选举轮次,立即清空自己的投票箱并将自己的 logicClock 更新为收到的 logicClock,然后再对比自己之前的投票与收到的投票以确定是否需要变更自己的投票,最终再次将自己的投票广播出去。
  • 外部投票的 logicClock 小于自己的 logicClock。当前服务器直接忽略该投票,继续处理下一个投票。
  • 外部投票的 logickClock 与自己的相等。当时进行选票 PK。

(6)选票 PK - 选票 PK 是基于(self_id, self_zxid)(vote_id, vote_zxid) 的对比

  • 外部投票的 logicClock 大于自己的 logicClock,则将自己的 logicClock 及自己的选票的 logicClock 变更为收到的 logicClock
  • 若 logicClock 一致,则对比二者的 vote_zxid,若外部投票的 vote_zxid 比较大,则将自己的票中的 vote_zxid 与 vote_myid 更新为收到的票中的 vote_zxid 与 vote_myid 并广播出去,另外将收到的票及自己更新后的票放入自己的票箱。如果票箱内已存在(self_myid, self_zxid)相同的选票,则直接覆盖
  • 若二者 vote_zxid 一致,则比较二者的 vote_myid,若外部投票的 vote_myid 比较大,则将自己的票中的 vote_myid 更新为收到的票中的 vote_myid 并广播出去,另外将收到的票及自己更新后的票放入自己的票箱

(7)统计选票 - 如果已经确定有过半服务器认可了自己的投票(可能是更新后的投票),则终止投票。否则继续接收其它服务器的投票。

(8)更新服务器状态 - 投票终止后,服务器开始更新自身状态。若过半的票投给了自己,则将自己的服务器状态更新为 LEADING,否则将自己的状态更新为 FOLLOWING

通过以上流程分析,我们不难看出:要使 Leader 获得多数 Server 的支持,则 ZooKeeper 集群节点数必须是奇数。且存活的节点数目不得少于 N + 1

每个 Server 启动后都会重复以上流程。在恢复模式下,如果是刚从崩溃状态恢复的或者刚启动的 server 还会从磁盘快照中恢复数据和会话信息,zk 会记录事务日志并定期进行快照,方便在恢复时进行状态恢复。

原子广播(Atomic Broadcast)

ZooKeeper 通过副本机制来实现高可用

那么,ZooKeeper 是如何实现副本机制的呢?答案是:ZAB 协议的原子广播。

img

ZAB 协议的原子广播要求:

**所有的写请求都会被转发给 Leader,Leader 会以原子广播的方式通知 Follow。当半数以上的 Follow 已经更新状态持久化后,Leader 才会提交这个更新,然后客户端才会收到一个更新成功的响应**。这有些类似数据库中的两阶段提交协议。

在整个消息的广播过程中,Leader 服务器会每个事务请求生成对应的 Proposal,并为其分配一个全局唯一的递增的事务 ID(ZXID),之后再对其进行广播。

InfluxDB 企业版一致性实现剖析

Hashicorp Raft

基于 Raft 的分布式 KV 系统开发实战

参考资料

《微服务架构核心 20 讲》笔记

什么是微服务架构

微服务是一种架构模式。

微服务的六个特点:

  • 一组小的服务
  • 独立的进程
  • 独立部署
  • 轻量级通信
  • 基于业务能力
  • 无集中式管理——这里指的是可以用不同的技术栈,不同的存储

微服务定义:基于有界上下文的、松散耦合的、面向服务的架构。

架构师如何权衡微服务的利弊

架构之道在于权衡利弊。

微服务架构的优点

  • 强模块化边界
  • 可独立部署
  • 技术多样性

微服务架构的缺点

  • 分布式系统复杂性
  • 最终一致性
  • 运维复杂性
  • 测试复杂性

分布式系统带来的一个挑战就是取终一致性。

康威法则和微服务给架构师怎样的启示

康威法则:设计系统的架构受制于产生这些设计的组织的沟通结构。

img

康威的原文中提出的各定律

  • 第一定律 组织沟通方式会通过系统设计表达出来
  • 第二定律 时间再多一件事情也不可能做的完美,但总有时间做完一件事情
  • 第三定律 线型系统和线型组织架构间有潜在的异质同态特性
  • 第四定律 大的系统组织总是比小系统更倾向于分解

其中心思想实际就是分而治之

企业应该在什么时候开始考虑引入微服务

微服务的适用性:

img

微服务重在服务治理,其对于平台基础设施有较高要求,所以企业刚开始应用微服务并不一定能提高生产力。简单来说:单体服务适用于小团队;微服务适用于大团队。

何时选择微服务,在于度的把控。当研发团队人员增长到一定程度,沟通成本不断增长时,就可以考虑微服务架构了。一个经验数据是,当团队达到 100 人规模时,就可以考虑使用微服务架构了。

罗马不是一天建成的:架构是一个演进的过程,不应该一开始就将系统设计的过于复杂。

什么样组织架构更适合微服务

img

  • 左边是比较传统的组织架构。产品从左到右流程走,可能出现的问题,反馈比较慢,对业务支持比较慢。沟通成本比较大。

  • 右边是比较合适微服务的组织架构, 每一个团队(基于微服务的跨职能的团队),有开发,有产品,有测试,团队都支持自己的微服务。交付的产口是平台,对外提供 API 接口支持多样的业务。

img

DevOps 理念:谁开发的,谁构建,谁支持。

如何理解阿里巴巴提出的微服务

中台战略和微服务的关系

img

业务中台和技术中台统称为大中台,支撑业务前台。正所谓,万丈高楼平地起,中台基础越扎实,前台发展就越快。

PaaS 和 核心业务层是和微服务相关的。这一些基本都可以用微服务来实现。

  • IaaS:Infrastructure-as-a-Service(基础设施即服务)

  • PaaS:Platform-as-a-Service(平台即服务)

如何给出一个清晰简洁的服务分层方式

大致的服务分层图:

img

SOA(Service-Oriented Architecture)或微服务大致可分为

  • 基础服务:也被称为:核心领域服务、中间层服务、公共服务
  • 聚合服务:对基础服务的聚合,以满足业务需求,提供给外部调用。

微服务总体技术架构体系是怎么设计的

img

  • 接入层:接入外部流量,内部做负载均衡
  • 网关层:反向路由,限流,安全,跨横切面的功能。
  • 业务服务层:可分为:聚合服务,基础服务
  • 支撑服务:各种公共性的后台服务
  • 平台服务:可以是一些管理系统
  • 基础设施:由运维团队运维

其中,与微服务相关的主要有:网关层、业务服务层、支撑服务、平台服务

微服务最经典的三种服务发现机制

消费者(客户端)如何发现生产者(服务端),有三种模式:

(1)通过 DNS 访问 LB(负载均衡),LB 分发

img

(2)消费者内置 LB, 生产者将自身信息注册到注册中心上,并通过发送定时心跳来确认自身服务可用。消费者定期从注册中心拉取生产者信息

img

(3)结全前面两种方式, 在 Consumer 的主机上也布置一个 LB。 LB 会定期同步注册中心的信息。 运维成本比较高一点。

img

微服务 API 服务网关(一)原理

网关用于屏蔽服务内部的逻辑,希望外部访问看到是统一的接口。

img

网关主要的功能:

  • 反向代理:将外部的请求换成内部调用。
  • 安全认证:防刷、防爬虫。
  • 限流熔断:处理可能会突发流量。
  • 日志监控:进行访问访问审计,监控流量。

一般不要把过多的业务逻辑写在网关当中。

img

服务 API 服务网关(二)开源网关 Zuul

Servlet 和 Filter Runner 过滤器:前置路由过滤器, 路由过滤器,后置路由过滤器

过滤器开发,可以通过脚本开发。开发完后上传到过滤器目录中, 被扫描后加到 Filter Runner 中。

各个 Filter 共享数据通过 Request Context 来实现。

img

过滤链的流程:

img

跟 Netflix 学习微服务路由发现体系

netflix 有两个比较重要的支撑服务

  • 服务注册中心 Eureka
  • 网关 zuul

img

集中式配置中心的作用和原理是什么

为什么要引入配置中心呢?

配置文件中的属性不方便管理,无法动态更新,无法审计。配置中心可以解决这些问题。

什么可做配置呢?

  • 业务开关
  • 调用/响应超时
  • 限流
  • 连接字符串
  • 动态参数

Svr 更新配置有两种方式:推和拉。

img

携程的 Apollo 配置中心:

img

github : https://github.com/ctripcorp/apollo

微服务通讯方式 RPC vs REST

RPC:远程过程调用

REST:Restful

img

微服务框架需要考虑哪些治理环节

一个公司的微服务多了,就要需要考虑服务治理:

  • 软负载:蓝绿发布,灰度发布

  • 指标(Metrics):服务的调用量,耗时监控

  • 调用链埋点:方便快速定位问题

契约生成代码: 定义结构体可自动生成 json 格式, vscode 有插件。

img

阿里巴巴微服务治理生态:Dubbo http://dubbo.apache.org/en-us/

微服务监控系统分层和监控架构

五个层次的监控:

  • 基础层施监控
  • 系统层监控
  • 应用层监控
    • url
    • sevice
    • mysql
    • cache 可用率
    • 性能
    • qps
  • 业务层监控
    • 核心指标监控
    • 登录注册
  • 端用户体验监控

img

  • 日志监控:Elasticsearch
  • metrics 监控
  • 健康检查
  • 调用链监控
  • 告警系统

比较典型的监控架构,大部分公司的流程

img

数据量比较大一般用 Kafka 作为缓冲队列。

Nagios 健康检测工具。

ELK:ELK 是 Elasticsearch、Logstash、Kibana 三大开源框架首字母大写简称。

微服务的调用链监控该如何选型

调用链的监控 谷歌 2010 年提出来的。

通过 Span 来跟踪, RootSpan ChildSpan 跨进程时 会有 Trace di + parant span id

img

三个主流调用链监控系统的比较:

img

微服务的容错限流是如何工作的

Netfiix Hystrix 具有熔断、隔离、限流、降级的功能 。

img

说明:

  • 3 Cirult OPen 判断是否可以熔断, 是则执行 getFAllBack() 降级处理函数
  • 5 run() 超时 也执行降级处理函数。
  • 6 不成功也 执行处理函数 。
  • Calculate Cirult Health 就是在正常执行成功后计算是否需要熔断。

Docker 容器部署技术 & 持续交付流水线

docker 容器治理就是解决:环境不一致的问题。把依赖的所有包都打在镜像中。

统一、标准化的交付流水线。

UAT 环境: User Acceptance Test (用户验收测试)

img

发布模式: 蓝绿布置,灰度发布(金丝雀发布)。

img

容器集群调度和基于容器的发布体系

资源调度框架 Mesos 架构

img

基于容器的云发布体系

img

RPC 高级篇

异步 RPC

链路跟踪

分布式链路跟踪就是将一次分布式请求还原为一个完整的调用链路,我们可以在整个调用链路中跟踪到这一次分布式请求的每一个环节的调用情况,比如调用是否成功,返回什么异常,调用的哪个服务节点以及请求耗时等等。

Trace 就是代表整个链路,每次分布式都会产生一个 Trace,每个 Trace 都有它的唯一标识即 TraceId,在分布式链路跟踪系统中,就是通过 TraceId 来区分每个 Trace 的。
Span 就是代表了整个链路中的一段链路,也就是说 Trace 是由多个 Span 组成的。在一个 Trace 下,每个 Span 也都有它的唯一标识 SpanId,而 Span 是存在父子关系的。还是以讲过的例子为例子,在 A->B->C->D 的情况下,在整个调用链中,正常情况下会产生 3 个 Span,分别是 Span1(A->B)、Span2(B->C)、Span3(C->D),这时 Span3 的父 Span 就是 Span2,而 Span2 的父 Span 就是 Span1。

RPC 在整合分布式链路跟踪需要做的最核心的两件事就是“埋点”和“传递”。

我们前面说是因为各子应用、子服务间复杂的依赖关系,所以通过日志难定位问题。那我们就想办法通过日志定位到是哪个子应用的子服务出现问题就行了。

其实,在 RPC 框架打印的异常信息中,是包括定位异常所需要的异常信息的,比如是哪类异常引起的问题(如序列化问题或网络超时问题),是调用端还是服务端出现的异常,调用端与服务端的 IP 是什么,以及服务接口与服务分组都是什么等等。具体如下图所示:

img

泛化调用

在一些特定场景下,需要在没有接口的情况下进行 RPC 调用。例如:

场景一:搭建一个统一的测试平台,可以让各个业务方在测试平台中通过输入接口、分组名、方法名以及参数值,在线测试自己发布的 RPC 服务。

img

场景二:搭建一个轻量级的服务网关,可以让各个业务方用 HTTP 的方式,通过服务网关调用其它服务。

img

为了解决这些场景的问题,可以使用泛化调用。

就是 RPC 框架提供统一的泛化调用接口(GenericService),调用端在创建 GenericService 代理时指定真正需要调用的接口的接口名以及分组名,通过调用 GenericService 代理的 $invoke 方法将服务端所需要的所有信息,包括接口名、业务分组名、方法名以及参数信息等封装成请求消息,发送给服务端,实现在没有接口的情况下进行
RPC 调用的功能。

1
2
3
4
class GenericService {
Object $invoke(String methodName, String[] paramTypes, Object[] params);
CompletableFuture<Object> $asyncInvoke(String methodName, String[] paramTypes
}

而通过泛化调用的方式发起调用,由于调用端没有服务端提供方提供的接口 API,不能正常地进行序列化与反序列化,我们可以为泛化调用提供专属的序列化插件,来解决实际问题。

时钟轮

时钟轮这个机制很好地解决了定时任务中,因每个任务都创建一个线程,导致的创建过多线程的问题,以及一个线程扫描所有的定时任务,让 CPU 做了很多额外的轮询遍历操作而浪费 CPU 的问题。

时钟轮的实现机制就是模拟现实生活中的时钟,将每个定时任务放到对应的时间槽位上,这样可以减少扫描任务时对其它时间槽位定时任务的额外遍历操作。

在时间轮的使用中,有些问题需要你额外注意:

时间槽位的单位时间越短,时间轮触发任务的时间就越精确。例如时间槽位的单位时间是 10 毫秒,那么执行定时任务的时间误差就在 10 毫秒内,如果是 100 毫秒,那么误差就在 100 毫秒内。

时间轮的槽位越多,那么一个任务被重复扫描的概率就越小,因为只有在多层时钟轮中的任务才会被重复扫描。比如一个时间轮的槽位有 1000 个,一个槽位的单位时间是 10 毫秒,那么下一层时间轮的一个槽位的单位时间就是 10 秒,超过 10 秒的定时任务会被放到下一层时间轮中,也就是只有超过 10 秒的定时任务会被扫描遍历两次,但如果槽位是 10 个,那么超过 100 毫秒的任务,就会被扫描遍历两次。

结合这些特点,我们就可以视具体的业务场景而定,对时钟轮的周期和时间槽数进行设置。

在 RPC 框架中,只要涉及到定时任务,我们都可以应用时钟轮,比较典型的就是调用端的超时处理、调用端与服务端的启动超时以及定时心跳等等。

流量回放

所谓的流量就是某个时间段内的所有请求,我们通过某种手段把发送到 A 应用的所有请求录制下来,然后把这些请求统一转发到 B 应用,让 B 应用接收到的请求参数跟 A 应用保持一致,从而实现 A 接收到的请求在 B 应用里面重新请求了一遍。整个过程称之为“流量回放”。

流量回放可以做什么?

为了保障应用升级后,我们的业务行为还能保持和升级前一样,我们在大多数情况下都是依靠已有的 TestCase 去验证,但这种方式在一定程度上并不是完全可靠的。最可靠的方式就是引入线上 Case 去验证改造后的应用,把线上的真实流量在改造后的应用里面进行回放,这样不仅节省整个上线时间,还能弥补手动维护 Case 存在的缺陷。

应用引入了 RPC 后,所有的请求流量都会被 RPC 接管,所以我们可以很自然地在 RPC 里面支持流量回放功能。虽然这个功能本身并不是 RPC 的核心功能,但对于使用 RPC 的人来说,他们有了这个功能之后,就可以更放心地升级自己的应用了。

RPC 高级

RPC 性能

如何提升单机吞吐量?

大多数情况下,影响到 RPC 调用的吞吐量的原因也就是业务逻辑处理慢了,CPU 大部分时间都在等待资源。

为了解决等待的耗时,可以使用异步。异步可以使用 Future 或 Callback 方式,Future 最为简单。

img

另外,我们可以通过对 CompletableFuture 的支持,实现 RPC 调用在调用端与服务端之间的完全异步,同时提升两端的单机吞吐量。

RPC 安全

虽然 RPC 经常用于解决内网应用之间的调用,内网环境相对公网也没有那么恶劣,但我们也有必要去建立一套可控的安全体系,去防止一些错误行为。对于 RPC 来说,我们所关心的安全问题不会有公网应用那么复杂,我们只要保证让服务调用方能拿到真实的服务提供方 IP 地址集合,且服务提供方可以管控调用自己的应用就够了。

服务提供方应用里面放一个用于 HMAC 签名的私钥,在授权平台上用这个私钥为申请调用的调用方应用进行签名,这个签名生成的串就变成了调用方唯一的身份。服务提供方在收到调用方的授权请求之后,我们只要需要验证下这个签名跟调用方应用信息是否对应得上就行了,这样集中式授权的瓶颈也就不存在了。

参考资料