Skip to content

限流方案

在高并发场景下,应用限流是保护系统稳定、防止资源过度消耗的有效手段。

应用限流需要结合具体业务场景和系统架构特点,灵活运用多种限流工具和技术,并结合监控系统实时调整限流阈值,以达到在保持系统稳定的同时最大化利用系统资源的目的

常见的应用限流优化高并发的方案有以下几种。

常用的限流算法

  1. 计数器
  2. 滑动窗口
  3. 漏桶
  4. 令牌桶算法

1.计数器

就是当接口在一个时间单位中被访问时,就记下来访问次数,直到它访问的次数到达上限。

比如要求某一个接口,1分钟内的请求不能超过10次,可以在开始时设置一个计数器,每次请求,该计数器+1

如果该计数器的值大于10并且与第一次请求的时间间隔在1分钟内,那么说明请求过多;

如果该请求与第一次请求的时间间隔大于1分钟,并且该计数器的值还在限流范围内,那么重置该计数器。

示例代码

java
public class CounterDemo {
    public long timeStamp = System.currentTimeMillis();
    public int reqCount = 0;
    public final int limit = 100; // 时间窗口内最大请求数
    public final long interval = 1000; // 时间窗口ms
    public boolean grant() {
        long now = System.currentTimeMillis();
        if (now < timeStamp + interval) {
            // 在时间窗口内
            reqCount++;
            // 判断当前时间窗口内是否超过最大请求控制数
            return reqCount <= limit;
        }
        else {
            timeStamp = now;
            // 超时后重置
            reqCount = 1;
            return true;
        }
    }
}

以上代码有致命问题,当遇到恶意请求,在0:59时,瞬间请求100次,并且在1:00请求100次,那么这个用户在1秒内请求了200次,用户可以在重置节点突发请求,而瞬间超过我们设置的速率限制,用户可能通过算法漏洞击垮我们的应用。

限流某个接口的总并发/请求数如果接口可能会有突发访问情况,但又担心访问量太大造成崩溃,如抢购业务;

这个时候就需要限制这个接口的总并发/请求数总请求数了;因为粒度比较细,可以为每个接口都设置相应的阀值。可以使用Java中的AtomicLong进行限流。

单元测试

java
import cn.diyai.rate_limiter.CounterDemo;

import org.junit.Test;
import static org.junit.Assert.*;

public class CounterDemoTest {
    @Test
    public void testGrantWithinLimit() {
        CounterDemo counter = new CounterDemo();
        boolean result;

        // 第一个请求
        result = counter.grant();
        assertTrue(result);

        // 达到最大请求数
        for (int i = 0; i < counter.limit - 1; i++) {
            result = counter.grant();
            assertTrue(result);
        }

        // 超过最大请求数
        result = counter.grant();
        assertFalse(result);
    }

    @Test
    public void testGrantAfterInterval() throws InterruptedException {
        CounterDemo counter = new CounterDemo();
        boolean result;

        // 达到最大请求数
        for (int i = 0; i < counter.limit; i++) {
            result = counter.grant();
            assertTrue(result);
        }

        // 等待一个时间窗口
        Thread.sleep(counter.interval);

        // 第一个请求在新的时间窗口
        result = counter.grant();
        assertTrue(result);

        // 达到最大请求数
        for (int i = 0; i < counter.limit - 1; i++) {
            result = counter.grant();
            assertTrue(result);
        }

        // 超过最大请求数
        result = counter.grant();
        assertFalse(result);
    }
}

计数器可以作为一种简单的限流算法,但它可能不够灵活和精确,适用于某些场景,但不适用于其他场景。下面是计数器作为限流算法的优缺点:

优点:

  1. 简单易用: 计数器算法非常简单,易于理解和实现。
  2. 实时性: 计数器可以实时统计请求的数量,不需要复杂的状态维护。
  3. 适用于固定速率限流: 如果需要简单地限制请求的处理速率,例如每秒处理的请求数量不超过某个固定值,计数器是一种有效的方法。

缺点:

  1. 无法处理突发流量: 计数器无法应对突发流量,因为它只是简单地对请求的数量进行计数,而不考虑请求的到达时间。
  2. 无法动态调整限流速率: 计数器无法根据系统负载或者其他因素动态调整限流速率,它通常需要在配置阶段指定限流速率。
  3. 资源浪费: 如果设置的限流速率过低,可能会导致系统资源浪费,因为部分资源可能没有得到充分利用。
  4. 精度问题: 计数器在实际使用中可能存在一定的精度问题,特别是在高并发场景下,可能会出现计数不准确的情况。

计数器作为限流算法是一种简单直观的方法,适用于一些简单的场景,但在处理复杂的限流需求时可能不够灵活和精确。

对于需要动态调整限流速率或者应对突发流量的场景,通常需要选择更为复杂和灵活的限流算法。

2.滑动窗口

滑动窗口是在计数器基础上,划分的更细。计数器是一格,滑动窗口可按照需求划分为多个格子

当滑动窗口的格子划分越多,限流的统计就会越精确

比如一个时间窗口就是1分钟,然后我们将时间窗口进行划分。比如把滑动窗口划分为6格,所以每一格代表10秒,每超过10秒,时间窗口就会向右滑动一格,每一格都有自己独立的计数器。

当滑动窗口的格子划分越多,限流的统计就会越精确

示例代码

java
public class SlidingWindow {
    /* 循环队列 */
    private volatile AtomicInteger[] timeSlices;
    /* 队列的总长度  */
    private volatile int timeSliceSize;
    /* 每个时间片的时长 */
    private volatile int timeMillisPerSlice;
    /* 窗口长度 */
    private volatile int windowSize;

    /* 当前所使用的时间片位置 */
    private AtomicInteger cursor = new AtomicInteger(0);

    public SlidingWindow(int timeMillisPerSlice, int windowSize) {
        this.timeMillisPerSlice = timeMillisPerSlice;
        this.windowSize = windowSize;
        // 保证存储在至少两个window
        this.timeSliceSize = windowSize * 2 + 1;
    }
    
    /**
     * 初始化队列,由于此初始化会申请一些内容空间,为了节省空间,延迟初始化
     */
    private void initTimeSlices() {
        if (timeSlices != null) {
            return;
        }
        // 在多线程的情况下,会出现多次初始化的情况,没关系
        // 我们只需要保证,获取到的值一定是一个稳定的,所有这里使用先初始化,最后赋值的方法
        AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize];
        for (int i = 0; i < timeSliceSize; i++) {
            localTimeSlices[i] = new AtomicInteger(0);
        }
        timeSlices = localTimeSlices;
    }

    private int locationIndex() {
        long time = System.currentTimeMillis();
        return (int) ((time / timeMillisPerSlice) % timeSliceSize);
    }

    /**
     * 对时间片计数+1,并返回窗口中所有的计数总和
     * 该方法只要调用就一定会对某个时间片进行+1
     */
    public int incrementAndSum() {
        initTimeSlices();
        int index = locationIndex();
        int sum = 0;
        // cursor等于index,返回true
        // cursor不等于index,返回false,并会将cursor设置为index
        int oldCursor = cursor.getAndSet(index);
        if (oldCursor == index) {
            // 在当前时间片里继续+1
            sum += timeSlices[index].incrementAndGet();
        } else {
            // 可能有其他thread已经置过1,问题不大
            timeSlices[index].set(1);
            // 清零,访问量不大时会有时间片跳跃的情况
            clearBetween(oldCursor, index);
            // sum += 0;
        }
        for (int i = 1; i < windowSize; i++) {
            sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
        }
        return sum;
    }

    /**
     * 判断是否允许进行访问,未超过阈值的话才会对某个时间片+1
     */
    public boolean allow(int threshold) {
        initTimeSlices();
        int index = locationIndex();
        int sum = 0;
        // cursor不等于index,将cursor设置为index
        int oldCursor = cursor.getAndSet(index);
        if (oldCursor != index) {
            // 可能有其他thread已经置过1,问题不大
            timeSlices[index].set(0);
            // 清零,访问量不大时会有时间片跳跃的情况
            clearBetween(oldCursor, index);
        }
        for (int i = 1; i < windowSize; i++) {
            sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
        }

        // 阈值判断
        if (sum <= threshold) {
            // 未超过阈值才+1
            sum += timeSlices[index].incrementAndGet();
            return true;
        }
        return false;
    }

    /**
     * 将fromIndex~toIndex之间的时间片计数都清零
     * 极端情况下,当循环队列已经走了超过1个timeSliceSize以上,这里的清零并不能如期望的进行
     *
     * @param fromIndex 不包含
     * @param toIndex   不包含
     */
    private void clearBetween(int fromIndex, int toIndex) {
        for (int index = (fromIndex + 1) % timeSliceSize; index != toIndex; index = (index + 1) % timeSliceSize) {
            timeSlices[index].set(0);
        }
    }
}

单元测试

java
import org.junit.Test;
import static org.junit.Assert.*;

public class CounterDemoTest {
    @Test
    public void testGrantWithinLimit() {
        CounterDemo counter = new CounterDemo();
        boolean result;

        // 第一个请求
        result = counter.grant();
        assertTrue(result);

        // 达到最大请求数
        for (int i = 0; i < counter.limit - 1; i++) {
            result = counter.grant();
            assertTrue(result);
        }

        // 超过最大请求数
        result = counter.grant();
        assertFalse(result);
    }

    @Test
    public void testGrantAfterInterval() throws InterruptedException {
        CounterDemo counter = new CounterDemo();
        boolean result;

        // 达到最大请求数
        for (int i = 0; i < counter.limit; i++) {
            result = counter.grant();
            assertTrue(result);
        }

        // 等待一个时间窗口
        Thread.sleep(counter.interval);

        // 第一个请求在新的时间窗口
        result = counter.grant();
        assertTrue(result);

        // 达到最大请求数
        for (int i = 0; i < counter.limit - 1; i++) {
            result = counter.grant();
            assertTrue(result);
        }

        // 超过最大请求数
        result = counter.grant();
        assertFalse(result);
    }
}

3.漏桶算法

所有请求以一定速率通过一个有固定容量的漏桶(队列),超过处理能力的请求被溢出拒绝或者等待桶中的请求处理完成

示例代码

java
public class LeakyDemo {
    private  long timeStamp = System.currentTimeMillis();
    private  int capacity; // 桶的容量
    private  int rate; // 水漏出的速度
    private  int water; // 当前水量(当前累积请求数)

    public synchronized  boolean grant() {
        long now = System.currentTimeMillis();
        water = (int)Math.max(0, water - (now - timeStamp) * rate); // 先执行漏水,计算剩余水量
        timeStamp = now;
        if ((water + 1) < capacity) {
            // 尝试加水,并且水还未满
            water += 1;
            return true;
        }
        else {
            // 水满,拒绝加水
            return false;
        }
    }
}

在并发场景中,grant方法需使用synchronized修饰,确保在同一时刻只有一个线程可以进入该方法。这样可以确保在多线程环境下对共享变量的修改是原子性的,并且避免了数据竞争问题。

单元测试

java
import org.junit.Test;
import static org.junit.Assert.*;

public class LeakyDemoTest {
    @Test
    public void testGrant() throws InterruptedException {
        LeakyDemo leakyDemo = new LeakyDemo();
        leakyDemo.capacity = 10;
        leakyDemo.rate = 1;

        // 模拟连续请求
        for (int i = 0; i < 9; i++) {
            boolean result = leakyDemo.grant();
            System.out.println(result);
            assertTrue(result); // 确保前9次请求都能够被授权
        }

        // 再次请求,此时桶已满,应该被拒绝
        boolean result = leakyDemo.grant();
        assertFalse(result); // 确保第10次请求被拒绝

        // 等待一段时间,保证水有足够的时间漏掉
        Thread.sleep(2000);

        // 再次请求,此时桶已经空了,应该能够被授权
        result = leakyDemo.grant();
        assertTrue(result); // 确保11次请求被授权
    }
}

4.令牌桶

系统以一定的速率往桶里添加令牌,请求来时先尝试从桶中获取令牌,只有拿到令牌的请求才能继续执行,否则就被限流。

示例代码

java
public class TokenBucketDemo {
    public long timeStamp = System.currentTimeMillis();
    public int capacity; // 桶的容量
    public int rate; // 令牌放入速度
    public int tokens; // 当前令牌数量
    public boolean grant() {
        long now = System.currentTimeMillis();
        // 先添加令牌
        tokens = (int)Math.min(capacity, tokens + (now - timeStamp) * rate);
        timeStamp = now;
        if (tokens < 1) {
            // 若不到1个令牌,则拒绝
            return false;
        }
        else {
            // 还有令牌,领取令牌
            tokens -= 1;
            return true;
        }
    }
}

Guava的RateLimiter、阿里巴巴开源的Sentinel以及Netflix的Hystrix都有实现这两种算法的限流器。

单元测试

java

import org.junit.Test;
import static org.junit.Assert.*;

public class TokenBucketDemoTest {

    @Test
    public void testGrantWithEnoughTokens() {
        TokenBucketDemo tokenBucket = new TokenBucketDemo();
        tokenBucket.capacity = 10;
        tokenBucket.rate = 1;
        tokenBucket.tokens = 5; // 设置初始令牌数量为5

        // 领取令牌
        boolean result = tokenBucket.grant();
        assertTrue(result); // 应该允许领取令牌
        assertEquals(4, tokenBucket.tokens); // 领取后令牌数量应减少1

        // 再次领取令牌
        result = tokenBucket.grant();
        assertTrue(result); // 应该允许领取令牌
        assertEquals(3, tokenBucket.tokens); // 领取后令牌数量应减少1
    }

    @Test
    public void testGrantWithoutEnoughTokens() {
        TokenBucketDemo tokenBucket = new TokenBucketDemo();
        tokenBucket.capacity = 10;
        tokenBucket.rate = 1;
        tokenBucket.tokens = 0; // 设置初始令牌数量为0

        // 尝试领取令牌
        boolean result = tokenBucket.grant();
        assertFalse(result); // 应该拒绝领取令牌
        assertEquals(0, tokenBucket.tokens); // 令牌数量应该仍然为0
    }

    @Test
    public void testGrantWithMaxCapacity() {
        TokenBucketDemo tokenBucket = new TokenBucketDemo();
        tokenBucket.capacity = 10;
        tokenBucket.rate = 1;
        tokenBucket.tokens = 10; // 设置初始令牌数量为最大容量10

        // 领取令牌
        boolean result = tokenBucket.grant();
        assertTrue(result); // 应该允许领取令牌
        assertEquals(9, tokenBucket.tokens); // 领取后令牌数量应减少1
    }
}

Nginx作为网关层限流

Nginx可以通过ngx_http_limit_req_module 模块实现基本的限流功能,例如限制每秒请求数或每分钟请求数。

shell
http {
    limit_req_zone $binary_remote_addr zone=one:10m rate=10r/s;
    
    server {
        location /service {
            limit_req zone=one burst=20;
            proxy_pass http://backend-service;
        }
    }
}

在上面的配置中,limit_req_zone 指令用于定义一个限制请求的区域,并指定了请求频率为每秒不超过 10 个(rate=10r/s)。limit_req 指令则用于在 location 配置中应用这个限制请求的区域,burst=20 指定了允许的突发请求数量。

这样配置后,当某个客户端对 /service 接口的请求频率超过每秒 10 个时,Nginx 将会返回 503 错误,拒绝处理这些请求。而在突发情况下,如果请求数量超过了 burst 参数指定的值,Nginx 也会进行排队处理,而不是直接返回错误。

API Gateway服务限流

对于微服务架构,可以在API Gateway层设置全局流量控制,根据服务级别的QPS(每秒查询数)进行限流。

服务内部限流

使用Spring Cloud Alibaba Sentinel或Netflix Hystrix等框架,在服务内部对特定资源、方法或服务调用进行限流,可以设置线程池隔离策略或者信号量隔离策略

数据库连接池限流

配置合理的数据库连接池大小和策略,确保不会因为过多的数据库连接导致数据库服务器压力过大。

分布式限流

利用Redis或其他分布式存储技术实现分布式限流,保证整个集群的限流效果一致。

熔断降级

结合限流与熔断机制,当服务负载过高时,除了限流还可以选择暂时拒绝部分请求或者将服务降级至备用方案。

自定义限流策略

根据业务需求定制限流规则,比如按照用户ID、IP地址、请求类型等维度进行限流。