跳至主要內容

流量控制

钝悟...大约 17 分钟分布式分布式调度分布式服务治理调度流量控制限流熔断降级

流量控制

在高并发场景下,为了应对瞬时海量请求的压力,保障系统的平稳运行,必须预估系统的流量阈值,通过限流规则阻断处理不过来的请求。

流量控制简介

什么是流量控制

流量控制(Flow Control),根据流量、并发线程数、响应时间等指标,把随机到来的流量调整成合适的形状,即流量塑形。避免应用被瞬时的流量高峰冲垮,从而保障应用的高可用性。

为什么需要流量控制

复杂的分布式系统架构中的应用程序往往具有数十个依赖项,每个依赖项都会不可避免地在某个时刻失败。 如果主机应用程序未与这些外部故障隔离开来,则可能会被波及。

例如,对于依赖于 30 个服务的应用程序,假设每个服务的正常运行时间为 99.99%,则可以期望:

99.9930 = 99.7% 的正常运行时间

10 亿个请求中的 0.3%= 3,000,000 个失败

即使所有依赖项都具有出色的正常运行时间,每月也会有 2 个小时以上的停机时间。

然而,现实情况一般比这种估量情况更糟糕。


当一切正常时,整体系统如下所示:

图片来自 Hystrix Wikiopen in new window

在分布式系统架构下,这些强依赖的子服务稳定与否对系统的影响非常大。但是,依赖的子服务可能有很多不可控问题:如网络连接、资源繁忙、服务宕机等。例如:下图中有一个 QPS 为 50 的依赖服务 I 出现不可用,但是其他依赖服务是可用的。

图片来自 Hystrix Wikiopen in new window

当流量很大的情况下,某个依赖的阻塞,会导致上游服务请求被阻塞。当这种级联故障愈演愈烈,就可能造成整个线上服务不可用的雪崩效应,如下图。这种情况若持续恶化,如果上游服务本身还被其他服务所依赖,就可能出现多米洛骨牌效应,导致多个服务都无法正常工作。

img
img

图片来自 Hystrix Wikiopen in new window

流量控制有哪些保护机制

流量控制常见的手段就是限流、熔断、降级。

什么是降级?

降级是保障服务能够稳定运行的一种保护方式:面对突增的流量,牺牲一些吞吐量以换取系统的稳定。常见的降级实现方式有:开关降级、限流降级、熔断降级。

什么是限流?

限流一般针对下游服务,当上游流量较大时,避免被上游服务的请求撑爆。

限流就是限制系统的输入和输出流量,以达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。

限流规则包含三个部分:时间粒度,接口粒度,最大限流值。限流规则设置是否合理直接影响到限流是否合理有效。

什么是熔断?

熔断一般针对上游服务,当下游服务超时/异常较多时,避免被下游服务拖垮。

当调用链路中某个资源出现不稳定,例如,超时异常比例升高的时候,则对这个资源的调用进行限制,并让请求快速失败,避免影响到其它的资源,最终产生雪崩的效果。

熔断尽最大的可能去完成所有的请求,容忍一些失败,熔断也能自动恢复。熔断的常见策略有:

  • 在每秒请求异常数超过多少时触发熔断降级
  • 在每秒请求异常错误率超过多少时触发熔断降级
  • 在每秒请求平均耗时超过多少时触发熔断降级

流量控制有哪些衡量指标

流量控制有以下几个角度:

  • 流量指标,例如 QPS、并发线程数等。
  • 资源的调用关系,例如资源的调用链路,资源和资源之间的关系,调用来源等。
  • 控制效果,例如排队等待、直接拒绝、Warm Up(预热)等。

限流算法

限流的本质是:在一定的时间范围内,限制某一个资源被访问的频率。如何去限制流量,就需要采用一定的策略,即限流算法。常见的限流算法有:固定窗口限流算法、滑动窗口限流算法、漏桶限流算法、令牌桶限流算法。

下面,将对这几种算法进行一一介绍。

固定窗口限流算法

固定窗口限流算法的原理

固定窗口限流算法的基本策略是:

  1. 设置一个固定时间窗口,以及这个固定时间窗口内的最大请求数;
  2. 为每个固定时间窗口设置一个计数器,用于统计请求数;
  3. 一旦请求数超过最大请求数,则请求会被拦截。

固定窗口限流算法的利弊

固定窗口限流算法的优点是:实现简单。

固定窗口限流算法的缺点是:存在临界问题。所谓临界问题,是指:流量分别集中在一个固定时间窗口的尾部和一个固定时间窗口的头部。举例来说,假设限流规则为每分钟不超过 100 次请求。在第一个时间窗口中,起初没有任何请求,在最后 1 s,收到 100 次请求,由于没有达到阈值,所有请求都通过;在第二个时间窗口中,第 1 秒就收到 100 次请求,而后续没有任何请求。虽然,这两个时间窗口内的流量都符合限流要求,但是在两个时间窗口临界的这 2s 内,实际上有 200 次请求,显然是超过预期吞吐量的,存在压垮系统的可能。

固定窗口限流算法的实现

【示例】Java 版本的固定窗口限流算法

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class SlidingWindowRateLimiter implements RateLimiter {

    /**
     * 允许的最大请求数
     */
    private final long maxPermits;

    /**
     * 窗口期时长
     */
    private long periodMillis;

    /**
     * 分片窗口期时长
     */
    private final long shardPeriodMillis;

    /**
     * 窗口期截止时间
     */
    private long lastPeriodMillis;

    /**
     * 分片窗口数
     */
    private final int shardNum;

    /**
     * 请求总计数
     */
    private final AtomicLong totalCount = new AtomicLong(0);

    /**
     * 分片窗口计数列表
     */
    private final List<AtomicLong> countList = new LinkedList<>();

    public SlidingWindowRateLimiter(long qps, int shardNum) {
        this(qps, 1000, TimeUnit.MILLISECONDS, shardNum);
    }

    public SlidingWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, int shardNum) {
        this.maxPermits = maxPermits;
        this.periodMillis = timeUnit.toMillis(period);
        this.lastPeriodMillis = System.currentTimeMillis();
        this.shardPeriodMillis = timeUnit.toMillis(period) / shardNum;
        this.shardNum = shardNum;
        for (int i = 0; i < shardNum; i++) {
            countList.add(new AtomicLong(0));
        }
    }

    @Override
    public synchronized boolean tryAcquire(int permits) {
        long now = System.currentTimeMillis();
        if (now > lastPeriodMillis) {
            for (int shardId = 0; shardId < shardNum; shardId++) {
                long shardCount = countList.get(shardId).get();
                totalCount.addAndGet(-shardCount);
                countList.set(shardId, new AtomicLong(0));
                lastPeriodMillis += shardPeriodMillis;
            }
        }
        int shardId = (int) (now % periodMillis / shardPeriodMillis);
        if (totalCount.get() + permits <= maxPermits) {
            countList.get(shardId).addAndGet(permits);
            totalCount.addAndGet(permits);
            return true;
        } else {
            return false;
        }
    }

}

滑动窗口限流算法

滑动窗口限流算法的原理

滑动窗口限流算法是对固定窗口限流算法的改进,解决了临界问题。

滑动窗口限流算法的基本策略是:

  • 将固定时间窗口分片为多个子窗口,每个子窗口的访问次数独立统计;
  • 当请求时间大于当前子窗口的最大时间时,则将当前子窗口废弃,并将计时窗口向前滑动,并将下一个子窗口置为当前窗口。
  • 要保证所有子窗口的统计数之和不能超过阈值。

滑动窗口限流算法就是针对固定窗口限流算法的更细粒度的控制,分片越多,则限流越精准。

滑动窗口限流算法的利弊

滑动窗口限流算法的优点是:在滑动窗口限流算法中,临界位置的突发请求都会被算到时间窗口内,因此可以解决计数器算法的临界问题。

滑动窗口限流算法的缺点是:

  • 额外的内存开销 - 滑动时间窗口限流算法的时间窗口是持续滑动的,并且除了需要一个计数器来记录时间窗口内接口请求次数之外,还需要记录在时间窗口内每个接口请求到达的时间点,所以存在额外的内存开销。
  • 限流的控制粒度受限于窗口分片粒度 - 滑动窗口限流算法,只能在选定的时间粒度上限流,对选定时间粒度内的更加细粒度的访问频率不做限制。但是,由于每个分片窗口都有额外的内存开销,所以也并不是分片数越多越好的。

滑动窗口限流算法的实现

【示例】Java 版本的滑动窗口限流算法

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class SlidingWindowRateLimiter implements RateLimiter {

    /**
     * 允许的最大请求数
     */
    private final long maxPermits;

    /**
     * 窗口期时长
     */
    private final long periodMillis;

    /**
     * 分片窗口期时长
     */
    private final long shardPeriodMillis;

    /**
     * 窗口期截止时间
     */
    private long lastPeriodMillis;

    /**
     * 分片窗口数
     */
    private final int shardNum;

    /**
     * 请求总计数
     */
    private final AtomicLong totalCount = new AtomicLong(0);

    /**
     * 分片窗口计数列表
     */
    private final List<AtomicLong> countList = new LinkedList<>();

    public SlidingWindowRateLimiter(long qps, int shardNum) {
        this(qps, 1000, TimeUnit.MILLISECONDS, shardNum);
    }

    public SlidingWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, int shardNum) {
        this.maxPermits = maxPermits;
        this.periodMillis = timeUnit.toMillis(period);
        this.lastPeriodMillis = System.currentTimeMillis();
        this.shardPeriodMillis = timeUnit.toMillis(period) / shardNum;
        this.shardNum = shardNum;
        for (int i = 0; i < shardNum; i++) {
            countList.add(new AtomicLong(0));
        }
    }

    @Override
    public synchronized boolean tryAcquire(int permits) {
        long now = System.currentTimeMillis();
        if (now > lastPeriodMillis) {
            for (int shardId = 0; shardId < shardNum; shardId++) {
                long shardCount = countList.get(shardId).get();
                totalCount.addAndGet(-shardCount);
                countList.set(shardId, new AtomicLong(0));
                lastPeriodMillis += shardPeriodMillis;
            }
        }
        int shardId = (int) (now % periodMillis / shardPeriodMillis);
        if (totalCount.get() + permits <= maxPermits) {
            countList.get(shardId).addAndGet(permits);
            totalCount.addAndGet(permits);
            return true;
        } else {
            return false;
        }
    }

}

漏桶限流算法

漏桶限流算法的原理

漏桶限流算法的基本策略是:

  • 水(请求)以任意速率由入口进入到漏桶中;
  • 水以固定的速率由出口出水(请求通过);
  • 漏桶的容量是固定的,如果水的流入速率大于流出速率,最终会导致漏桶中的水溢出(这意味着请求拒绝)。

漏桶限流算法的利弊

漏桶限流算法的优点是:流量速率固定——即无论流量多大,即便是突发的大流量,处理请求的速度始终是固定的。

漏桶限流算法的缺点是:不能灵活的调整流量。例如:一个集群通过增减节点的方式,弹性伸缩了其吞吐能力,漏桶限流算法无法随之调整。

漏桶策略适用于间隔性突发流量且流量不用即时处理的场景

漏桶限流算法的实现

【示例】Java 版本的漏桶限流算法

import java.util.concurrent.atomic.AtomicLong;

public class LeakyBucketRateLimiter implements RateLimiter {

    /**
     * QPS
     */
    private final int qps;

    /**
     * 桶的容量
     */
    private final long capacity;

    /**
     * 计算的起始时间
     */
    private long beginTimeMillis;

    /**
     * 桶中当前的水量
     */
    private final AtomicLong waterNum = new AtomicLong(0);

    public LeakyBucketRateLimiter(int qps, int capacity) {
        this.qps = qps;
        this.capacity = capacity;
    }

    @Override
    public synchronized boolean tryAcquire(int permits) {

        // 如果桶中没有水,直接通过
        if (waterNum.get() == 0) {
            beginTimeMillis = System.currentTimeMillis();
            waterNum.addAndGet(permits);
            return true;
        }

        // 计算水量
        long leakedWaterNum = ((System.currentTimeMillis() - beginTimeMillis) / 1000) * qps;
        long currentWaterNum = waterNum.get() - leakedWaterNum;
        waterNum.set(Math.max(0, currentWaterNum));

        // 重置时间
        beginTimeMillis = System.currentTimeMillis();

        if (waterNum.get() + permits < capacity) {
            waterNum.addAndGet(permits);
            return true;
        } else {
            return false;
        }
    }

}

令牌桶限流算法

令牌桶限流算法的原理

令牌桶算法的原理

  1. 接口限制 T 秒内最大访问次数为 N,则每隔 T/N 秒会放一个 token 到桶中
  2. 桶内最多存放 M 个 token,如果 token 到达时令牌桶已经满了,那么这个 token 就会被丢弃
  3. 接口请求会先从令牌桶中取 token,拿到 token 则处理接口请求,拿不到 token 则进行限流处理

令牌桶限流算法的利弊

因为令牌桶存放了很多令牌,那么大量的突发请求会被执行,但是它不会出现临界问题,在令牌用完之后,令牌是以一个恒定的速率添加到令牌桶中的,因此不能再次发送大量突发请求。

规定固定容量的桶,token 以固定速度往桶内填充,当桶满时 token 不会被继续放入,每过来一个请求把 token 从桶中移除,如果桶中没有 token 不能请求。

令牌桶算法适用于有突发特性的流量,且流量需要即时处理的场景

令牌桶限流算法的实现

【示例】Java 实现令牌桶算法

import java.util.concurrent.atomic.AtomicLong;

/**
 * 令牌桶限流算法
 *
 * @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
 * @date 2024-01-18
 */
public class TokenBucketRateLimiter implements RateLimiter {

    /**
     * QPS
     */
    private final long qps;

    /**
     * 桶的容量
     */
    private final long capacity;

    /**
     * 上一次令牌发放时间
     */
    private long endTimeMillis;

    /**
     * 桶中当前的令牌数量
     */
    private final AtomicLong tokenNum = new AtomicLong(0);

    public TokenBucketRateLimiter(long qps, long capacity) {
        this.qps = qps;
        this.capacity = capacity;
        this.endTimeMillis = System.currentTimeMillis();
    }

    @Override
    public synchronized boolean tryAcquire(int permits) {

        long now = System.currentTimeMillis();
        long gap = now - endTimeMillis;

        // 计算令牌数
        long newTokenNum = (gap * qps / 1000);
        long currentTokenNum = tokenNum.get() + newTokenNum;
        tokenNum.set(Math.min(capacity, currentTokenNum));

        if (tokenNum.get() < permits) {
            return false;
        } else {
            tokenNum.addAndGet(-permits);
            endTimeMillis = now;
            return true;
        }
    }

}

扩展

Guava 的 RateLimiter 工具类就是基于令牌桶算法实现,其源码分析可以参考:RateLimiter 基于漏桶算法,但它参考了令牌桶算法open in new window

限流算法测试

import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class RateLimiterDemo {

    public static void main(String[] args) {

        // ============================================================================

        int qps = 20;

        System.out.println("======================= 固定时间窗口限流算法 =======================");
        FixedWindowRateLimiter fixedWindowRateLimiter = new FixedWindowRateLimiter(qps);
        testRateLimit(fixedWindowRateLimiter, qps);

        System.out.println("======================= 滑动时间窗口限流算法 =======================");
        SlidingWindowRateLimiter slidingWindowRateLimiter = new SlidingWindowRateLimiter(qps, 10);
        testRateLimit(slidingWindowRateLimiter, qps);

        System.out.println("======================= 漏桶限流算法 =======================");
        LeakyBucketRateLimiter leakyBucketRateLimiter = new LeakyBucketRateLimiter(qps, 100);
        testRateLimit(leakyBucketRateLimiter, qps);

        System.out.println("======================= 令牌桶限流算法 =======================");
        TokenBucketRateLimiter tokenBucketRateLimiter = new TokenBucketRateLimiter(qps, 100);
        testRateLimit(tokenBucketRateLimiter, qps);
    }

    private static void testRateLimit(RateLimiter rateLimiter, int qps) {

        AtomicInteger okNum = new AtomicInteger(0);
        AtomicInteger limitNum = new AtomicInteger(0);
        ExecutorService executorService = ThreadUtil.newFixedExecutor(10, "限流测试", true);
        long beginTime = System.currentTimeMillis();

        int threadNum = 4;
        final CountDownLatch latch = new CountDownLatch(threadNum);
        for (int i = 0; i < threadNum; i++) {
            executorService.submit(() -> {
                try {
                    batchRequest(rateLimiter, okNum, limitNum, 1000);
                } catch (Exception e) {
                    log.error("发生异常!", e);
                } finally {
                    latch.countDown();
                }
            });
        }

        try {
            latch.await(10, TimeUnit.SECONDS);
            long endTime = System.currentTimeMillis();
            long gap = endTime - beginTime;
            log.info("限流 QPS: {} -> 实际结果:耗时 {} ms,{} 次请求成功,{} 次请求被限流,实际 QPS: {}",
                qps, gap, okNum.get(), limitNum.get(), okNum.get() * 1000 / gap);
            if (okNum.get() == qps) {
                log.info("限流符合预期");
            }
        } catch (Exception e) {
            log.error("发生异常!", e);
        } finally {
            executorService.shutdown();
        }
    }

    private static void batchRequest(RateLimiter rateLimiter, AtomicInteger okNum, AtomicInteger limitNum, int num)
        throws InterruptedException {
        for (int j = 0; j < num; j++) {
            if (rateLimiter.tryAcquire(1)) {
                log.info("请求成功");
                okNum.getAndIncrement();
            } else {
                log.info("请求限流");
                limitNum.getAndIncrement();
            }
            TimeUnit.MILLISECONDS.sleep(RandomUtil.randomInt(0, 10));
        }
    }

}

分布式限流算法

前文中,展示了一些基于 Java 实现的限流算法,但这些实现都是在 Java 内存中进行访问统计,所以只能应用于单机限流。

如果想要针对分布式系统资源进行限流,则必须具备两个要素:

  1. 对于资源的访问统计,必须是所有分布式节点都可以共享访问的数据存储;并且,由于在高并发场景下,读写访问统计数据会很频繁,该数据存储必须有很高的读写性能。
  2. 访问统计、限流计算都以原子操作方式进行。

满足以上要素的一种简单解决方案是,采用 Redis + Lua 来实现,原因在于:1. Redis 数据库的读写性能极高;2. Redis 支持以原子操作的方式执行 Lua 脚本。

Redis + Lua 实现的固定窗口限流算法

Redis + Lua 实现的固定窗口限流算法实现思路:

  • 根据实际需要,将当前时间格式化为天(yyyyMMdd)、时(yyyyMMddHH)、分(yyyyMMddHHmm)、秒(yyyyMMddHHmmss),并作为 Redis 的 String 类型 Key。该 Key 可以视为一个固定时间窗口,其中的 value 用于统计访问量;
  • 用于代表不同粒度的时间窗口按需设置过期时间;
  • 一旦达到窗口的限流阈值时,请求被限流;否则请求通过。

【示例】Redis + Lua 实现的固定窗口限流算法

下面的代码片段模拟通过一个大小为 1 分钟的固定时间窗口进行限流,阈值为 100,过期时间 60s。

限流脚本 fixed_window_rate_limit.lua 代码:

-- 缓存 Key
local key = KEYS[1]
-- 访问请求数
local permits = tonumber(ARGV[1])
-- 过期时间
local seconds = tonumber(ARGV[2])
-- 限流阈值
local limit = tonumber(ARGV[3])

-- 获取统计值
local count = tonumber(redis.call('GET', key) or "0")

if count + permits > limit then
    -- 触发限流
    return 0
else
    redis.call('INCRBY', key, permits)
    redis.call('EXPIRE', key, seconds)
    return count + permits
end

调用 lua 的实际限流代码:

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.resource.ResourceUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class RedisFixedWindowRateLimiter implements RateLimiter {

    private static final String REDIS_HOST = "localhost";

    private static final int REDIS_PORT = 6379;

    private static final Jedis JEDIS;

    public static final String SCRIPT;

    static {
        // Jedis 有多种构造方法,这里选用最简单的一种情况
        JEDIS = new Jedis(REDIS_HOST, REDIS_PORT);

        // 触发 ping 命令
        try {
            JEDIS.ping();
            System.out.println("jedis 连接成功");
        } catch (JedisConnectionException e) {
            e.printStackTrace();
        }

        SCRIPT = FileUtil.readString(ResourceUtil.getResource("scripts/fixed_window_rate_limit.lua"),
            StandardCharsets.UTF_8);
    }

    private final long maxPermits;
    private final long periodSeconds;
    private final String key;

    public RedisFixedWindowRateLimiter(long qps, String key) {
        this(qps * 60, 60, TimeUnit.SECONDS, key);
    }

    public RedisFixedWindowRateLimiter(long maxPermits, long period, TimeUnit timeUnit, String key) {
        this.maxPermits = maxPermits;
        this.periodSeconds = timeUnit.toSeconds(period);
        this.key = key;
    }

    @Override
    public boolean tryAcquire(int permits) {
        List<String> keys = Collections.singletonList(key);
        List<String> args = CollectionUtil.newLinkedList(String.valueOf(permits), String.valueOf(periodSeconds),
            String.valueOf(maxPermits));
        Object eval = JEDIS.eval(SCRIPT, keys, args);
        long value = (long) eval;
        return value != -1;
    }

    public static void main(String[] args) throws InterruptedException {

        int qps = 20;
        RateLimiter jedisFixedWindowRateLimiter = new RedisFixedWindowRateLimiter(qps, "rate:limit:20240122210000");

        // 模拟在一分钟内,不断收到请求,限流是否有效
        int seconds = 60;
        long okNum = 0L;
        long total = 0L;
        long beginTime = System.currentTimeMillis();
        int num = RandomUtil.randomInt(qps, 100);
        for (int second = 0; second < seconds; second++) {
            for (int i = 0; i < num; i++) {
                total++;
                if (jedisFixedWindowRateLimiter.tryAcquire(1)) {
                    okNum++;
                    System.out.println("请求成功");
                } else {
                    System.out.println("请求限流");
                }
            }
            TimeUnit.SECONDS.sleep(1);
        }
        long endTime = System.currentTimeMillis();
        long time = (endTime - beginTime) / 1000;
        System.out.println(StrUtil.format("请求通过数:{},总请求数:{},实际 QPS:{}", okNum, total, okNum / time));
    }

}

Redis + Lua 实现的令牌桶限流算法

限流脚本 token_bucket_rate_limit.lua 代码:

local tokenKey = KEYS[1]
local timeKey = KEYS[2]

-- 申请令牌数
local permits = tonumber(ARGV[1])
-- QPS
local qps = tonumber(ARGV[2])
-- 桶的容量
local capacity = tonumber(ARGV[3])
-- 当前时间(单位:毫秒)
local nowMillis = tonumber(ARGV[4])
-- 填满令牌桶所需要的时间
local fillTime = capacity / qps
local ttl = math.min(capacity, math.floor(fillTime * 2))

local currentTokenNum = tonumber(redis.call("GET", tokenKey))
if currentTokenNum == nil then
    currentTokenNum = capacity
end

local endTimeMillis = tonumber(redis.call("GET", timeKey))
if endTimeMillis == nil then
    endTimeMillis = 0
end

local gap = nowMillis - endTimeMillis
local newTokenNum = math.max(0, gap * qps / 1000)
local currentTokenNum = math.min(capacity, currentTokenNum + newTokenNum)

if currentTokenNum < permits then
    -- 请求拒绝
    return -1
else
    -- 请求通过
    local finalTokenNum = currentTokenNum - permits
    redis.call("SETEX", tokenKey, ttl, finalTokenNum)
    redis.call("SETEX", timeKey, ttl, nowMillis)
    return finalTokenNum
end

调用 lua 的实际限流代码:

package io.github.dunwu.distributed.ratelimit;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.resource.ResourceUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * 基于 Redis + Lua 实现的令牌桶限流算法
 *
 * @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
 * @date 2024-01-23
 */
public class RedisTokenBucketRateLimiter implements RateLimiter {

    private static final String REDIS_HOST = "localhost";

    private static final int REDIS_PORT = 6379;

    private static final Jedis JEDIS;

    public static final String SCRIPT;

    static {
        // Jedis 有多种构造方法,这里选用最简单的一种情况
        JEDIS = new Jedis(REDIS_HOST, REDIS_PORT);

        // 触发 ping 命令
        try {
            JEDIS.ping();
            System.out.println("jedis 连接成功");
        } catch (JedisConnectionException e) {
            e.printStackTrace();
        }

        SCRIPT = FileUtil.readString(ResourceUtil.getResource("scripts/token_bucket_rate_limit.lua"),
            StandardCharsets.UTF_8);
    }

    private final long qps;
    private final long capacity;
    private final String tokenKey;
    private final String timeKey;

    public RedisTokenBucketRateLimiter(long qps, long capacity, String tokenKey, String timeKey) {
        this.qps = qps;
        this.capacity = capacity;
        this.tokenKey = tokenKey;
        this.timeKey = timeKey;
    }

    @Override
    public boolean tryAcquire(int permits) {
        long now = System.currentTimeMillis();
        List<String> keys = CollectionUtil.newLinkedList(tokenKey, timeKey);
        List<String> args = CollectionUtil.newLinkedList(String.valueOf(permits), String.valueOf(qps),
            String.valueOf(capacity), String.valueOf(now));
        Object eval = JEDIS.eval(SCRIPT, keys, args);
        long value = (long) eval;
        return value != -1;
    }

    public static void main(String[] args) throws InterruptedException {

        int qps = 20;
        int bucket = 100;
        RedisTokenBucketRateLimiter redisTokenBucketRateLimiter =
            new RedisTokenBucketRateLimiter(qps, bucket, "token:rate:limit", "token:rate:limit:time");

        // 先将令牌桶预热令牌申请完,后续才能真实反映限流 QPS
        redisTokenBucketRateLimiter.tryAcquire(bucket);
        TimeUnit.SECONDS.sleep(1);

        // 模拟在一分钟内,不断收到请求,限流是否有效
        int seconds = 60;
        long okNum = 0L;
        long total = 0L;
        long beginTime = System.currentTimeMillis();
        for (int second = 0; second < seconds; second++) {
            int num = RandomUtil.randomInt(qps, 100);
            for (int i = 0; i < num; i++) {
                total++;
                if (redisTokenBucketRateLimiter.tryAcquire(1)) {
                    okNum++;
                    System.out.println("请求成功");
                } else {
                    System.out.println("请求限流");
                }
            }
            TimeUnit.SECONDS.sleep(1);
        }
        long endTime = System.currentTimeMillis();
        long time = (endTime - beginTime) / 1000;
        System.out.println(StrUtil.format("请求通过数:{},总请求数:{},实际 QPS:{}", okNum, total, okNum / time));
    }

}

参考资料

评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.7