流量控制
流量控制
在高并发场景下,为了应对瞬时海量请求的压力,保障系统的平稳运行,必须预估系统的流量阈值,通过限流规则阻断处理不过来的请求。
流量控制简介
什么是流量控制
流量控制(Flow Control),根据流量、并发线程数、响应时间等指标,把随机到来的流量调整成合适的形状,即流量塑形。避免应用被瞬时的流量高峰冲垮,从而保障应用的高可用性。
为什么需要流量控制
复杂的分布式系统架构中的应用程序往往具有数十个依赖项,每个依赖项都会不可避免地在某个时刻失败。 如果主机应用程序未与这些外部故障隔离开来,则可能会被波及。
例如,对于依赖于 30 个服务的应用程序,假设每个服务的正常运行时间为 99.99%,则可以期望:
99.9930 = 99.7% 的正常运行时间
10 亿个请求中的 0.3%= 3,000,000 个失败
即使所有依赖项都具有出色的正常运行时间,每月也会有 2 个小时以上的停机时间。
然而,现实情况一般比这种估量情况更糟糕。
当一切正常时,整体系统如下所示:
图片来自 Hystrix Wiki
在分布式系统架构下,这些强依赖的子服务稳定与否对系统的影响非常大。但是,依赖的子服务可能有很多不可控问题:如网络连接、资源繁忙、服务宕机等。例如:下图中有一个 QPS 为 50 的依赖服务 I 出现不可用,但是其他依赖服务是可用的。
图片来自 Hystrix Wiki
当流量很大的情况下,某个依赖的阻塞,会导致上游服务请求被阻塞。当这种级联故障愈演愈烈,就可能造成整个线上服务不可用的雪崩效应,如下图。这种情况若持续恶化,如果上游服务本身还被其他服务所依赖,就可能出现多米洛骨牌效应,导致多个服务都无法正常工作。
图片来自 Hystrix Wiki
流量控制有哪些保护机制
流量控制常见的手段就是限流、熔断、降级。
什么是降级?
降级是保障服务能够稳定运行的一种保护方式:面对突增的流量,牺牲一些吞吐量以换取系统的稳定。常见的降级实现方式有:开关降级、限流降级、熔断降级。
什么是限流?
限流一般针对下游服务,当上游流量较大时,避免被上游服务的请求撑爆。
限流就是限制系统的输入和输出流量,以达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。
限流规则包含三个部分:时间粒度,接口粒度,最大限流值。限流规则设置是否合理直接影响到限流是否合理有效。
什么是熔断?
熔断一般针对上游服务,当下游服务超时/异常较多时,避免被下游服务拖垮。
当调用链路中某个资源出现不稳定,例如,超时异常比例升高的时候,则对这个资源的调用进行限制,并让请求快速失败,避免影响到其它的资源,最终产生雪崩的效果。
熔断尽最大的可能去完成所有的请求,容忍一些失败,熔断也能自动恢复。熔断的常见策略有:
- 在每秒请求异常数超过多少时触发熔断降级
- 在每秒请求异常错误率超过多少时触发熔断降级
- 在每秒请求平均耗时超过多少时触发熔断降级
流量控制有哪些衡量指标
流量控制有以下几个角度:
- 流量指标,例如 QPS、并发线程数等。
- 资源的调用关系,例如资源的调用链路,资源和资源之间的关系,调用来源等。
- 控制效果,例如排队等待、直接拒绝、Warm Up(预热)等。
限流算法
限流的本质是:在一定的时间范围内,限制某一个资源被访问的频率。如何去限制流量,就需要采用一定的策略,即限流算法。常见的限流算法有:固定窗口限流算法、滑动窗口限流算法、漏桶限流算法、令牌桶限流算法。
下面,将对这几种算法进行一一介绍。
固定窗口限流算法
固定窗口限流算法的原理
固定窗口限流算法的基本策略是:
- 设置一个固定时间窗口,以及这个固定时间窗口内的最大请求数;
- 为每个固定时间窗口设置一个计数器,用于统计请求数;
- 一旦请求数超过最大请求数,则请求会被拦截。
固定窗口限流算法的利弊
固定窗口限流算法的优点是:实现简单。
固定窗口限流算法的缺点是:存在临界问题。所谓临界问题,是指:流量分别集中在一个固定时间窗口的尾部和一个固定时间窗口的头部。举例来说,假设限流规则为每分钟不超过 100 次请求。在第一个时间窗口中,起初没有任何请求,在最后 1 s,收到 100 次请求,由于没有达到阈值,所有请求都通过;在第二个时间窗口中,第 1 秒就收到 100 次请求,而后续没有任何请求。虽然,这两个时间窗口内的流量都符合限流要求,但是在两个时间窗口临界的这 2s 内,实际上有 200 次请求,显然是超过预期吞吐量的,存在压垮系统的可能。
固定窗口限流算法的实现
Java 版本的固定窗口限流算法
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class SlidingWindowRateLimiter implements RateLimiter {
/**
* 允许的最大请求数
*/
private final long maxPermits;
/**
* 窗口期时长
*/
private long periodMillis;
/**
* 分片窗口期时长
*/
private final long shardPeriodMillis;
/**
* 窗口期截止时间
*/
private long lastPeriodMillis;
/**
* 分片窗口数
*/
private final int shardNum;
/**
* 请求总计数
*/
private final AtomicLong totalCount = new AtomicLong(0);
/**
* 分片窗口计数列表
*/
private final List<AtomicLong> countList = new LinkedList<>();
public SlidingWindowRateLimiter(long qps, int shardNum) {
this(qps, 1000, TimeUnit.MILLISECONDS, shardNum);
}
public SlidingWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, int shardNum) {
this.maxPermits = maxPermits;
this.periodMillis = timeUnit.toMillis(period);
this.lastPeriodMillis = System.currentTimeMillis();
this.shardPeriodMillis = timeUnit.toMillis(period) / shardNum;
this.shardNum = shardNum;
for (int i = 0; i < shardNum; i++) {
countList.add(new AtomicLong(0));
}
}
@Override
public synchronized boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
if (now > lastPeriodMillis) {
for (int shardId = 0; shardId < shardNum; shardId++) {
long shardCount = countList.get(shardId).get();
totalCount.addAndGet(-shardCount);
countList.set(shardId, new AtomicLong(0));
lastPeriodMillis += shardPeriodMillis;
}
}
int shardId = (int) (now % periodMillis / shardPeriodMillis);
if (totalCount.get() + permits <= maxPermits) {
countList.get(shardId).addAndGet(permits);
totalCount.addAndGet(permits);
return true;
} else {
return false;
}
}
}
滑动窗口限流算法
滑动窗口限流算法的原理
滑动窗口限流算法是对固定窗口限流算法的改进,解决了临界问题。
滑动窗口限流算法的基本策略是:
- 将固定时间窗口分片为多个子窗口,每个子窗口的访问次数独立统计;
- 当请求时间大于当前子窗口的最大时间时,则将当前子窗口废弃,并将计时窗口向前滑动,并将下一个子窗口置为当前窗口。
- 要保证所有子窗口的统计数之和不能超过阈值。
滑动窗口限流算法就是针对固定窗口限流算法的更细粒度的控制,分片越多,则限流越精准。
滑动窗口限流算法的利弊
滑动窗口限流算法的优点是:在滑动窗口限流算法中,临界位置的突发请求都会被算到时间窗口内,因此可以解决计数器算法的临界问题。
滑动窗口限流算法的缺点是:
- 额外的内存开销 - 滑动时间窗口限流算法的时间窗口是持续滑动的,并且除了需要一个计数器来记录时间窗口内接口请求次数之外,还需要记录在时间窗口内每个接口请求到达的时间点,所以存在额外的内存开销。
- 限流的控制粒度受限于窗口分片粒度 - 滑动窗口限流算法,只能在选定的时间粒度上限流,对选定时间粒度内的更加细粒度的访问频率不做限制。但是,由于每个分片窗口都有额外的内存开销,所以也并不是分片数越多越好的。
滑动窗口限流算法的实现
Java 版本的滑动窗口限流算法
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class SlidingWindowRateLimiter implements RateLimiter {
/**
* 允许的最大请求数
*/
private final long maxPermits;
/**
* 窗口期时长
*/
private final long periodMillis;
/**
* 分片窗口期时长
*/
private final long shardPeriodMillis;
/**
* 窗口期截止时间
*/
private long lastPeriodMillis;
/**
* 分片窗口数
*/
private final int shardNum;
/**
* 请求总计数
*/
private final AtomicLong totalCount = new AtomicLong(0);
/**
* 分片窗口计数列表
*/
private final List<AtomicLong> countList = new LinkedList<>();
public SlidingWindowRateLimiter(long qps, int shardNum) {
this(qps, 1000, TimeUnit.MILLISECONDS, shardNum);
}
public SlidingWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, int shardNum) {
this.maxPermits = maxPermits;
this.periodMillis = timeUnit.toMillis(period);
this.lastPeriodMillis = System.currentTimeMillis();
this.shardPeriodMillis = timeUnit.toMillis(period) / shardNum;
this.shardNum = shardNum;
for (int i = 0; i < shardNum; i++) {
countList.add(new AtomicLong(0));
}
}
@Override
public synchronized boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
if (now > lastPeriodMillis) {
for (int shardId = 0; shardId < shardNum; shardId++) {
long shardCount = countList.get(shardId).get();
totalCount.addAndGet(-shardCount);
countList.set(shardId, new AtomicLong(0));
lastPeriodMillis += shardPeriodMillis;
}
}
int shardId = (int) (now % periodMillis / shardPeriodMillis);
if (totalCount.get() + permits <= maxPermits) {
countList.get(shardId).addAndGet(permits);
totalCount.addAndGet(permits);
return true;
} else {
return false;
}
}
}
漏桶限流算法
漏桶限流算法的原理
漏桶限流算法的基本策略是:
- 水(请求)以任意速率由入口进入到漏桶中;
- 水以固定的速率由出口出水(请求通过);
- 漏桶的容量是固定的,如果水的流入速率大于流出速率,最终会导致漏桶中的水溢出(这意味着请求拒绝)。
漏桶限流算法的利弊
漏桶限流算法的优点是:流量速率固定——即无论流量多大,即便是突发的大流量,处理请求的速度始终是固定的。
漏桶限流算法的缺点是:不能灵活的调整流量。例如:一个集群通过增减节点的方式,弹性伸缩了其吞吐能力,漏桶限流算法无法随之调整。
漏桶策略适用于间隔性突发流量且流量不用即时处理的场景。
漏桶限流算法的实现
Java 版本的漏桶限流算法
import java.util.concurrent.atomic.AtomicLong;
public class LeakyBucketRateLimiter implements RateLimiter {
/**
* QPS
*/
private final int qps;
/**
* 桶的容量
*/
private final long capacity;
/**
* 计算的起始时间
*/
private long beginTimeMillis;
/**
* 桶中当前的水量
*/
private final AtomicLong waterNum = new AtomicLong(0);
public LeakyBucketRateLimiter(int qps, int capacity) {
this.qps = qps;
this.capacity = capacity;
}
@Override
public synchronized boolean tryAcquire(int permits) {
// 如果桶中没有水,直接通过
if (waterNum.get() == 0) {
beginTimeMillis = System.currentTimeMillis();
waterNum.addAndGet(permits);
return true;
}
// 计算水量
long leakedWaterNum = ((System.currentTimeMillis() - beginTimeMillis) / 1000) * qps;
long currentWaterNum = waterNum.get() - leakedWaterNum;
waterNum.set(Math.max(0, currentWaterNum));
// 重置时间
beginTimeMillis = System.currentTimeMillis();
if (waterNum.get() + permits < capacity) {
waterNum.addAndGet(permits);
return true;
} else {
return false;
}
}
}
令牌桶限流算法
令牌桶限流算法的原理
令牌桶算法的原理:
- 接口限制 T 秒内最大访问次数为 N,则每隔 T/N 秒会放一个 token 到桶中
- 桶内最多存放 M 个 token,如果 token 到达时令牌桶已经满了,那么这个 token 就会被丢弃
- 接口请求会先从令牌桶中取 token,拿到 token 则处理接口请求,拿不到 token 则进行限流处理
令牌桶限流算法的利弊
因为令牌桶存放了很多令牌,那么大量的突发请求会被执行,但是它不会出现临界问题,在令牌用完之后,令牌是以一个恒定的速率添加到令牌桶中的,因此不能再次发送大量突发请求。
规定固定容量的桶,token 以固定速度往桶内填充,当桶满时 token 不会被继续放入,每过来一个请求把 token 从桶中移除,如果桶中没有 token 不能请求。
令牌桶算法适用于有突发特性的流量,且流量需要即时处理的场景。
令牌桶限流算法的实现
Java 实现令牌桶算法
import java.util.concurrent.atomic.AtomicLong;
/**
* 令牌桶限流算法
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2024-01-18
*/
public class TokenBucketRateLimiter implements RateLimiter {
/**
* QPS
*/
private final long qps;
/**
* 桶的容量
*/
private final long capacity;
/**
* 上一次令牌发放时间
*/
private long endTimeMillis;
/**
* 桶中当前的令牌数量
*/
private final AtomicLong tokenNum = new AtomicLong(0);
public TokenBucketRateLimiter(long qps, long capacity) {
this.qps = qps;
this.capacity = capacity;
this.endTimeMillis = System.currentTimeMillis();
}
@Override
public synchronized boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
long gap = now - endTimeMillis;
// 计算令牌数
long newTokenNum = (gap * qps / 1000);
long currentTokenNum = tokenNum.get() + newTokenNum;
tokenNum.set(Math.min(capacity, currentTokenNum));
if (tokenNum.get() < permits) {
return false;
} else {
tokenNum.addAndGet(-permits);
endTimeMillis = now;
return true;
}
}
}
扩展
Guava 的 RateLimiter 工具类就是基于令牌桶算法实现,其源码分析可以参考:RateLimiter 基于漏桶算法,但它参考了令牌桶算法
限流算法测试
限流算法测试
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class RateLimiterDemo {
public static void main(String[] args) {
// ============================================================================
int qps = 20;
System.out.println("======================= 固定时间窗口限流算法 =======================");
FixedWindowRateLimiter fixedWindowRateLimiter = new FixedWindowRateLimiter(qps);
testRateLimit(fixedWindowRateLimiter, qps);
System.out.println("======================= 滑动时间窗口限流算法 =======================");
SlidingWindowRateLimiter slidingWindowRateLimiter = new SlidingWindowRateLimiter(qps, 10);
testRateLimit(slidingWindowRateLimiter, qps);
System.out.println("======================= 漏桶限流算法 =======================");
LeakyBucketRateLimiter leakyBucketRateLimiter = new LeakyBucketRateLimiter(qps, 100);
testRateLimit(leakyBucketRateLimiter, qps);
System.out.println("======================= 令牌桶限流算法 =======================");
TokenBucketRateLimiter tokenBucketRateLimiter = new TokenBucketRateLimiter(qps, 100);
testRateLimit(tokenBucketRateLimiter, qps);
}
private static void testRateLimit(RateLimiter rateLimiter, int qps) {
AtomicInteger okNum = new AtomicInteger(0);
AtomicInteger limitNum = new AtomicInteger(0);
ExecutorService executorService = ThreadUtil.newFixedExecutor(10, "限流测试", true);
long beginTime = System.currentTimeMillis();
int threadNum = 4;
final CountDownLatch latch = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
executorService.submit(() -> {
try {
batchRequest(rateLimiter, okNum, limitNum, 1000);
} catch (Exception e) {
log.error("发生异常!", e);
} finally {
latch.countDown();
}
});
}
try {
latch.await(10, TimeUnit.SECONDS);
long endTime = System.currentTimeMillis();
long gap = endTime - beginTime;
log.info("限流 QPS: {} -> 实际结果:耗时 {} ms,{} 次请求成功,{} 次请求被限流,实际 QPS: {}",
qps, gap, okNum.get(), limitNum.get(), okNum.get() * 1000 / gap);
if (okNum.get() == qps) {
log.info("限流符合预期");
}
} catch (Exception e) {
log.error("发生异常!", e);
} finally {
executorService.shutdown();
}
}
private static void batchRequest(RateLimiter rateLimiter, AtomicInteger okNum, AtomicInteger limitNum, int num)
throws InterruptedException {
for (int j = 0; j < num; j++) {
if (rateLimiter.tryAcquire(1)) {
log.info("请求成功");
okNum.getAndIncrement();
} else {
log.info("请求限流");
limitNum.getAndIncrement();
}
TimeUnit.MILLISECONDS.sleep(RandomUtil.randomInt(0, 10));
}
}
}
限流框架 - Hystrix
Hystrix 是由 Netflix 开源,用于处理分布式系统的延迟和容错的一个开源组件。在分布式系统里,许多依赖不可避免的会调用失败,比如超时、异常等。Hystrix 采用断路器模式来实现服务间的彼此隔离,从而避免级联故障,以提高分布式系统整体的弹性。
“断路器”本身是一种开关装置,当某个服务单元发生故障之后,通过断路器的故障监控(类似熔断保险丝),向调用方返回一个符合预期的、可处理的备选响应(FallBack),而不是长时间的等待或者抛出调用方无法处理的异常,这样就保证了服务调用方的线程不会被长时间、不必要地占用,从而避免了故障在分布式系统中的蔓延,乃至雪崩。
Hystrix 官方已宣布不再发布新版本。但是,Hystrix 的断路器设计理念,有非常高的学习价值。
如果使用 Hystrix 对每个基础依赖服务进行过载保护,则整个系统架构将会类似下图所示,每个依赖项彼此隔离,受到延迟时发生饱和的资源的被限制访问,并包含 fallback 逻辑(用于降级处理),该逻辑决定了在依赖项中发生任何类型的故障时做出对应的处理。
Hystrix 原理
如下图所示,Hystrix 的工作流程大致可以分为 9 个步骤。
(一)构建一个 HystrixCommand 或 HystrixObservableCommand 对象
Hystrix 进行资源隔离,其实是提供了一个抽象,叫做命令模式。这也是 Hystrix 最基本的资源隔离技术。
在使用 Hystrix 的过程中,会对依赖服务的调用请求封装成命令对象,Hystrix 对 命令对象抽象了两个抽象类:HystrixCommand
和 HystrixObservableCommand
。
HystrixCommand
表示的命令对象会返回一个唯一返回值。HystrixObservableCommand
表示的命令对象会返回多个返回值。
HystrixCommand command = new HystrixCommand(arg1, arg2);
HystrixObservableCommand command = new HystrixObservableCommand(arg1, arg2);
(二)执行命令
Hystrix 中共有 4 种方式执行命令,如下所示:
执行方式 | 说明 | 可用对象 |
---|---|---|
execute() | 阻塞式同步执行,返回依赖服务的单一返回结果(或者抛出异常) | HystrixCommand |
queue() | 异步执行,通过 Future 返回依赖服务的单一返回结果(或者抛出异常) | HystrixCommand |
observe() | 基于 Rxjava 的 Observable 方式,返回通过 Observable 表示的依赖服务返回结果。代调用代码先执行 (Hot Obserable) | HystrixObservableCommand |
toObservable() | 基于 Rxjava 的 Observable 方式,返回通过 Observable 表示的依赖服务返回结果。执行代码等到真正订阅的时候才会执行 (cold observable) | HystrixObservableCommand |
这四种命令中,exeucte()
、queue()
、observe()
的表示其实是通过 toObservable()
实现的,其转换关系如下图所示:
HystrixCommand
执行方式
K value = command.execute();
// 等价语句:
K value = command.execute().queue().get();
Future<K> fValue = command.queue();
//等价语句:
Future<K> fValue = command.toObservable().toBlocking().toFuture();
Observable<K> ohValue = command.observe(); //hot observable,立刻订阅,命令立刻执行
//等价语句:
Observable<K> ohValue = command.toObservable().subscribe(subject);
// 上述执行最终实现还是基于 toObservable()
Observable<K> ocValue = command.toObservable(); //cold observable,延后订阅,订阅发生后,执行才真正执行
(三)是否缓存
如果当前命令对象启用了请求缓存,并且请求的响应存在于缓存中,则缓存的响应会立刻以 Observable
的形式返回。
(四)是否开启断路器
如果第三步没有缓存没有命中,则判断一下当前断路器的断路状态是否打开。如果断路器状态为打开状态,则 Hystrix 将不会执行此 Command 命令,直接执行步骤 8 调用 Fallback;
如果断路器状态是关闭,则执行步骤 5 检查是否有足够的资源运行 Command 命令
(五)信号量、线程池是否拒绝
当您执行该命令时,Hystrix 会检查断路器以查看电路是否打开。
如果电路开路(或“跳闸”),则 Hystrix 将不会执行该命令,而是将流程路由到 (8) 获取回退。
如果电路闭合,则流程前进至 (5) 以检查是否有可用容量来运行命令。
如果当前要执行的 Command 命令 先关连的线程池 和队列(或者信号量)资源已经满了,Hystrix 将不会运行 Command 命令,直接执行 步骤 8 的 Fallback 降级处理;如果未满,表示有剩余的资源执行 Command 命令,则执行步骤 6
(六)construct()
或 run()
当经过步骤 5 判断,有足够的资源执行 Command 命令时,本步骤将调用 Command 命令运行方法,基于不同类型的 Command,有如下两种两种运行方式:
运行方式 | 说明 |
---|---|
HystrixCommand.run() | 返回一个处理结果或者抛出一个异常 |
HystrixObservableCommand.construct() | 返回一个 Observable 表示的结果(可能多个),或者 基于onError 的错误通知 |
如果run()
或者construct()
方法 的真实执行时间
超过了 Command 设置的超时时间阈值
, 则当前则执行线程(或者是独立的定时器线程)将会抛出TimeoutException
。抛出超时异常 TimeoutException,后,将执行**步骤 8 **的 Fallback 降级处理。即使run()
或者construct()
执行没有被取消或中断,最终能够处理返回结果,但在降级处理逻辑中,将会抛弃run()
或construct()
方法的返回结果,而返回 Fallback 降级处理结果。
注意事项
需要注意的是,Hystrix 无法强制 将正在运行的线程停止掉--Hystrix 能够做的最好的方式就是在 JVM 中抛出一个InterruptedException
。如果 Hystrix 包装的工作不抛出中断异常InterruptedException
, 则在 Hystrix 线程池中的线程将会继续执行,尽管调用的客户端
已经接收到了TimeoutException
。这种方式会使 Hystrix 的线程池处于饱和状态。大部分的 Java Http Client 开源库并不会解析InterruptedException
。所以确认 HTTP client 相关的连接和读/写相关的超时时间设置。
如果 Command 命令没有抛出任何异常,并且有返回结果,则 Hystrix 将会在做完日志记录和统计之后会将结果返回。 如果是通过run()
方式运行,则返回一个Obserable
对象,包含一个唯一值,并且发送一个onCompleted
通知;如果是通过consturct()
方式运行 ,则返回一个Observable 对象
。
(七)健康检查
Hystrix 会统计 Command 命令执行执行过程中的成功数、失败数、拒绝数和超时数, 将这些信息记录到断路器 (Circuit Breaker) 中。断路器将上述统计按照时间窗的形式记录到一个定长数组中。断路器根据时间窗内的统计数据去判定请求什么时候可以被熔断,熔断后,在接下来一段恢复周期内,相同的请求过来后会直接被熔断。当再次校验,如果健康监测通过后,熔断开关将会被关闭。
(八)获取 Fallback
当以下场景出现后,Hystrix 将会尝试触发 Fallback
:
- 步骤 6 Command 执行时抛出了任何异常;
- 步骤 4 断路器已经被打开
- 步骤 5 执行命令的线程池、队列或者信号量资源已满
- 命令执行的时间超过阈值
(九)返回结果
如果 Hystrix 命令对象执行成功,将会返回结果,或者以Observable
形式包装的结果。根据**步骤 2 **的 command 调用方式,返回的Observable
会按照如下图说是的转换关系进行返回:
execute()
— 用和.queue()
相同的方式获取Future
,然后调用Future
的get()
以获取Observable
的单个值。queue()
—将Observable
转换为BlockingObservable
,以便可以将其转换为Future
并返回。watch()
—订阅Observable
并开始执行命令的流程; 返回一个Observable
,当订阅该Observable
时,它会重新通知。toObservable()
—返回不变的Observable
; 必须订阅它才能真正开始执行命令的流程。
断路器工作原理
- 断路器时间窗内的请求数是否超过了请求数断路器生效阈值
circuitBreaker.requestVolumeThreshold
,如果超过了阈值,则将会触发断路,断路状态为开启
例如,如果当前阈值设置的是20
,则当时间窗内统计的请求数共计 19 个,即使 19 个全部失败了,都不会触发断路器。 - 并且请求错误率超过了请求错误率阈值
errorThresholdPercentage
- 如果两个都满足,则将断路器由关闭迁移到开启
- 如果断路器开启,则后续的所有相同请求将会被断路掉;
- 直到过了沉睡时间窗
sleepWindowInMilliseconds
后,再发起请求时,允许其通过(此时的状态为半开起状态)。如果请求失败了,则保持断路器状态为开启状态,并更新沉睡时间窗。如果请求成功了,则将断路器状态改为关闭状态;
核心的逻辑如下:
@Override
public void onNext(HealthCounts hc) {
// check if we are past the statisticalWindowVolumeThreshold
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
//we are not past the minimum error threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
// our failure rate is too high, we need to set the state to OPEN
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}
限流框架 - Sentinel
其他限流解决方案
Guava RateLimiter
Guava 是 Google 开源的 Java 类库,提供了一个工具类 RateLimiter
,它基于令牌桶算法实现了本地限流器。
RateLimiter 限流示例
// 限流器流速:2 个请求/秒
RateLimiter limiter = RateLimiter.create(2.0);
// 执行任务的线程池
ExecutorService es = Executors.newFixedThreadPool(1);
// 记录上一次执行时间
prev = System.nanoTime();
// 测试执行 20 次
for (int i = 0; i < 20; i++) {
// 限流器限流
limiter.acquire();
// 提交任务异步执行
es.execute(() -> {
long cur = System.nanoTime();
// 打印时间间隔:毫秒
System.out.println((cur - prev) / 1000000);
prev = cur;
});
}
// 输出结果:
// ...
// 500
// 499
// 500
// 499
Redis + Lua
如果想要针对分布式系统资源进行限流,则必须具备两个要素:
- 对于资源的访问统计,必须是所有分布式节点都可以共享访问的数据存储;并且,由于在高并发场景下,读写访问统计数据会很频繁,该数据存储必须有很高的读写性能。
- 访问统计、限流计算都以原子操作方式进行。
满足以上要素的一种简单解决方案是,采用 Redis + Lua 来实现,原因在于:
- Redis 数据库的读写性能极高;
- Redis 支持以原子操作的方式执行 Lua 脚本。
Redis + Lua 实现的固定窗口限流算法
Redis + Lua 实现的固定窗口限流算法实现思路:
- 根据实际需要,将当前时间格式化为天(
yyyyMMdd
)、时(yyyyMMddHH
)、分(yyyyMMddHHmm
)、秒(yyyyMMddHHmmss
),并作为 Redis 的 String 类型 Key。该 Key 可以视为一个固定时间窗口,其中的 value 用于统计访问量; - 用于代表不同粒度的时间窗口按需设置过期时间;
- 一旦达到窗口的限流阈值时,请求被限流;否则请求通过。
Redis + Lua 实现的固定窗口限流算法
下面的代码片段模拟通过一个大小为 1 分钟的固定时间窗口进行限流,阈值为 100,过期时间 60s。
限流脚本 fixed_window_rate_limit.lua
代码:
-- 缓存 Key
local key = KEYS[1]
-- 访问请求数
local permits = tonumber(ARGV[1])
-- 过期时间
local seconds = tonumber(ARGV[2])
-- 限流阈值
local limit = tonumber(ARGV[3])
-- 获取统计值
local count = tonumber(redis.call('GET', key) or "0")
if count + permits > limit then
-- 触发限流
return 0
else
redis.call('INCRBY', key, permits)
redis.call('EXPIRE', key, seconds)
return count + permits
end
调用 lua 的实际限流代码:
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.resource.ResourceUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class RedisFixedWindowRateLimiter implements RateLimiter {
private static final String REDIS_HOST = "localhost";
private static final int REDIS_PORT = 6379;
private static final Jedis JEDIS;
public static final String SCRIPT;
static {
// Jedis 有多种构造方法,这里选用最简单的一种情况
JEDIS = new Jedis(REDIS_HOST, REDIS_PORT);
// 触发 ping 命令
try {
JEDIS.ping();
System.out.println("jedis 连接成功");
} catch (JedisConnectionException e) {
e.printStackTrace();
}
SCRIPT = FileUtil.readString(ResourceUtil.getResource("scripts/fixed_window_rate_limit.lua"),
StandardCharsets.UTF_8);
}
private final long maxPermits;
private final long periodSeconds;
private final String key;
public RedisFixedWindowRateLimiter(long qps, String key) {
this(qps * 60, 60, TimeUnit.SECONDS, key);
}
public RedisFixedWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, String key) {
this.maxPermits = maxPermits;
this.periodSeconds = timeUnit.toSeconds(period);
this.key = key;
}
@Override
public boolean tryAcquire(int permits) {
List<String> keys = Collections.singletonList(key);
List<String> args = CollectionUtil.newLinkedList(String.valueOf(permits), String.valueOf(periodSeconds),
String.valueOf(maxPermits));
Object eval = JEDIS.eval(SCRIPT, keys, args);
long value = (long) eval;
return value != -1;
}
public static void main(String[] args) throws InterruptedException {
int qps = 20;
RateLimiter jedisFixedWindowRateLimiter = new RedisFixedWindowRateLimiter(qps, "rate:limit:20240122210000");
// 模拟在一分钟内,不断收到请求,限流是否有效
int seconds = 60;
long okNum = 0L;
long total = 0L;
long beginTime = System.currentTimeMillis();
int num = RandomUtil.randomInt(qps, 100);
for (int second = 0; second < seconds; second++) {
for (int i = 0; i < num; i++) {
total++;
if (jedisFixedWindowRateLimiter.tryAcquire(1)) {
okNum++;
System.out.println("请求成功");
} else {
System.out.println("请求限流");
}
}
TimeUnit.SECONDS.sleep(1);
}
long endTime = System.currentTimeMillis();
long time = (endTime - beginTime) / 1000;
System.out.println(StrUtil.format("请求通过数:{},总请求数:{},实际 QPS:{}", okNum, total, okNum / time));
}
}
Redis + Lua 实现的令牌桶限流算法
Redis + Lua 实现的令牌桶限流算法
限流脚本 token_bucket_rate_limit.lua
代码:
local tokenKey = KEYS[1]
local timeKey = KEYS[2]
-- 申请令牌数
local permits = tonumber(ARGV[1])
-- QPS
local qps = tonumber(ARGV[2])
-- 桶的容量
local capacity = tonumber(ARGV[3])
-- 当前时间(单位:毫秒)
local nowMillis = tonumber(ARGV[4])
-- 填满令牌桶所需要的时间
local fillTime = capacity / qps
local ttl = math.min(capacity, math.floor(fillTime * 2))
local currentTokenNum = tonumber(redis.call("GET", tokenKey))
if currentTokenNum == nil then
currentTokenNum = capacity
end
local endTimeMillis = tonumber(redis.call("GET", timeKey))
if endTimeMillis == nil then
endTimeMillis = 0
end
local gap = nowMillis - endTimeMillis
local newTokenNum = math.max(0, gap * qps / 1000)
local currentTokenNum = math.min(capacity, currentTokenNum + newTokenNum)
if currentTokenNum < permits then
-- 请求拒绝
return -1
else
-- 请求通过
local finalTokenNum = currentTokenNum - permits
redis.call("SETEX", tokenKey, ttl, finalTokenNum)
redis.call("SETEX", timeKey, ttl, nowMillis)
return finalTokenNum
end
调用 lua 的实际限流代码:
package io.github.dunwu.distributed.ratelimit;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.resource.ResourceUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* 基于 Redis + Lua 实现的令牌桶限流算法
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2024-01-23
*/
public class RedisTokenBucketRateLimiter implements RateLimiter {
private static final String REDIS_HOST = "localhost";
private static final int REDIS_PORT = 6379;
private static final Jedis JEDIS;
public static final String SCRIPT;
static {
// Jedis 有多种构造方法,这里选用最简单的一种情况
JEDIS = new Jedis(REDIS_HOST, REDIS_PORT);
// 触发 ping 命令
try {
JEDIS.ping();
System.out.println("jedis 连接成功");
} catch (JedisConnectionException e) {
e.printStackTrace();
}
SCRIPT = FileUtil.readString(ResourceUtil.getResource("scripts/token_bucket_rate_limit.lua"),
StandardCharsets.UTF_8);
}
private final long qps;
private final long capacity;
private final String tokenKey;
private final String timeKey;
public RedisTokenBucketRateLimiter(long qps, long capacity, String tokenKey, String timeKey) {
this.qps = qps;
this.capacity = capacity;
this.tokenKey = tokenKey;
this.timeKey = timeKey;
}
@Override
public boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
List<String> keys = CollectionUtil.newLinkedList(tokenKey, timeKey);
List<String> args = CollectionUtil.newLinkedList(String.valueOf(permits), String.valueOf(qps),
String.valueOf(capacity), String.valueOf(now));
Object eval = JEDIS.eval(SCRIPT, keys, args);
long value = (long) eval;
return value != -1;
}
public static void main(String[] args) throws InterruptedException {
int qps = 20;
int bucket = 100;
RedisTokenBucketRateLimiter redisTokenBucketRateLimiter =
new RedisTokenBucketRateLimiter(qps, bucket, "token:rate:limit", "token:rate:limit:time");
// 先将令牌桶预热令牌申请完,后续才能真实反映限流 QPS
redisTokenBucketRateLimiter.tryAcquire(bucket);
TimeUnit.SECONDS.sleep(1);
// 模拟在一分钟内,不断收到请求,限流是否有效
int seconds = 60;
long okNum = 0L;
long total = 0L;
long beginTime = System.currentTimeMillis();
for (int second = 0; second < seconds; second++) {
int num = RandomUtil.randomInt(qps, 100);
for (int i = 0; i < num; i++) {
total++;
if (redisTokenBucketRateLimiter.tryAcquire(1)) {
okNum++;
System.out.println("请求成功");
} else {
System.out.println("请求限流");
}
}
TimeUnit.SECONDS.sleep(1);
}
long endTime = System.currentTimeMillis();
long time = (endTime - beginTime) / 1000;
System.out.println(StrUtil.format("请求通过数:{},总请求数:{},实际 QPS:{}", okNum, total, okNum / time));
}
}