主题
限流方案
在高并发场景下,应用限流是保护系统稳定、防止资源过度消耗的有效手段。
应用限流需要结合具体业务场景和系统架构特点,灵活运用多种限流工具和技术,并结合监控系统实时调整限流阈值,以达到在保持系统稳定的同时最大化利用系统资源的目的。
常见的应用限流优化高并发的方案有以下几种。
常用的限流算法
- 计数器
- 滑动窗口
- 漏桶
- 令牌桶算法
1.计数器
就是当接口在一个时间单位中被访问时,就记下来访问次数,直到它访问的次数到达上限。
比如要求某一个接口,1分钟内的请求不能超过10次,可以在开始时设置一个计数器,每次请求,该计数器+1;
如果该计数器的值大于10并且与第一次请求的时间间隔在1分钟内,那么说明请求过多;
如果该请求与第一次请求的时间间隔大于1分钟,并且该计数器的值还在限流范围内,那么重置该计数器。
示例代码
java
public class CounterDemo {
public long timeStamp = System.currentTimeMillis();
public int reqCount = 0;
public final int limit = 100; // 时间窗口内最大请求数
public final long interval = 1000; // 时间窗口ms
public boolean grant() {
long now = System.currentTimeMillis();
if (now < timeStamp + interval) {
// 在时间窗口内
reqCount++;
// 判断当前时间窗口内是否超过最大请求控制数
return reqCount <= limit;
}
else {
timeStamp = now;
// 超时后重置
reqCount = 1;
return true;
}
}
}
以上代码有致命问题,当遇到恶意请求,在0:59时,瞬间请求100次,并且在1:00请求100次,那么这个用户在1秒内请求了200次,用户可以在重置节点突发请求,而瞬间超过我们设置的速率限制,用户可能通过算法漏洞击垮我们的应用。
限流某个接口的总并发/请求数如果接口可能会有突发访问情况,但又担心访问量太大造成崩溃,如抢购业务;
这个时候就需要限制这个接口的总并发/请求数总请求数了;因为粒度比较细,可以为每个接口都设置相应的阀值。可以使用Java中的AtomicLong进行限流。
单元测试
java
import cn.diyai.rate_limiter.CounterDemo;
import org.junit.Test;
import static org.junit.Assert.*;
public class CounterDemoTest {
@Test
public void testGrantWithinLimit() {
CounterDemo counter = new CounterDemo();
boolean result;
// 第一个请求
result = counter.grant();
assertTrue(result);
// 达到最大请求数
for (int i = 0; i < counter.limit - 1; i++) {
result = counter.grant();
assertTrue(result);
}
// 超过最大请求数
result = counter.grant();
assertFalse(result);
}
@Test
public void testGrantAfterInterval() throws InterruptedException {
CounterDemo counter = new CounterDemo();
boolean result;
// 达到最大请求数
for (int i = 0; i < counter.limit; i++) {
result = counter.grant();
assertTrue(result);
}
// 等待一个时间窗口
Thread.sleep(counter.interval);
// 第一个请求在新的时间窗口
result = counter.grant();
assertTrue(result);
// 达到最大请求数
for (int i = 0; i < counter.limit - 1; i++) {
result = counter.grant();
assertTrue(result);
}
// 超过最大请求数
result = counter.grant();
assertFalse(result);
}
}
计数器可以作为一种简单的限流算法,但它可能不够灵活和精确,适用于某些场景,但不适用于其他场景。下面是计数器作为限流算法的优缺点:
优点:
- 简单易用: 计数器算法非常简单,易于理解和实现。
- 实时性: 计数器可以实时统计请求的数量,不需要复杂的状态维护。
- 适用于固定速率限流: 如果需要简单地限制请求的处理速率,例如每秒处理的请求数量不超过某个固定值,计数器是一种有效的方法。
缺点:
- 无法处理突发流量: 计数器无法应对突发流量,因为它只是简单地对请求的数量进行计数,而不考虑请求的到达时间。
- 无法动态调整限流速率: 计数器无法根据系统负载或者其他因素动态调整限流速率,它通常需要在配置阶段指定限流速率。
- 资源浪费: 如果设置的限流速率过低,可能会导致系统资源浪费,因为部分资源可能没有得到充分利用。
- 精度问题: 计数器在实际使用中可能存在一定的精度问题,特别是在高并发场景下,可能会出现计数不准确的情况。
计数器作为限流算法是一种简单直观的方法,适用于一些简单的场景,但在处理复杂的限流需求时可能不够灵活和精确。
对于需要动态调整限流速率或者应对突发流量的场景,通常需要选择更为复杂和灵活的限流算法。
2.滑动窗口
滑动窗口是在计数器基础上,划分的更细。计数器是一格,滑动窗口可按照需求划分为多个格子
当滑动窗口的格子划分越多,限流的统计就会越精确
比如一个时间窗口就是1分钟,然后我们将时间窗口进行划分。比如把滑动窗口划分为6格,所以每一格代表10秒,每超过10秒,时间窗口就会向右滑动一格,每一格都有自己独立的计数器。
当滑动窗口的格子划分越多,限流的统计就会越精确
示例代码
java
public class SlidingWindow {
/* 循环队列 */
private volatile AtomicInteger[] timeSlices;
/* 队列的总长度 */
private volatile int timeSliceSize;
/* 每个时间片的时长 */
private volatile int timeMillisPerSlice;
/* 窗口长度 */
private volatile int windowSize;
/* 当前所使用的时间片位置 */
private AtomicInteger cursor = new AtomicInteger(0);
public SlidingWindow(int timeMillisPerSlice, int windowSize) {
this.timeMillisPerSlice = timeMillisPerSlice;
this.windowSize = windowSize;
// 保证存储在至少两个window
this.timeSliceSize = windowSize * 2 + 1;
}
/**
* 初始化队列,由于此初始化会申请一些内容空间,为了节省空间,延迟初始化
*/
private void initTimeSlices() {
if (timeSlices != null) {
return;
}
// 在多线程的情况下,会出现多次初始化的情况,没关系
// 我们只需要保证,获取到的值一定是一个稳定的,所有这里使用先初始化,最后赋值的方法
AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize];
for (int i = 0; i < timeSliceSize; i++) {
localTimeSlices[i] = new AtomicInteger(0);
}
timeSlices = localTimeSlices;
}
private int locationIndex() {
long time = System.currentTimeMillis();
return (int) ((time / timeMillisPerSlice) % timeSliceSize);
}
/**
* 对时间片计数+1,并返回窗口中所有的计数总和
* 该方法只要调用就一定会对某个时间片进行+1
*/
public int incrementAndSum() {
initTimeSlices();
int index = locationIndex();
int sum = 0;
// cursor等于index,返回true
// cursor不等于index,返回false,并会将cursor设置为index
int oldCursor = cursor.getAndSet(index);
if (oldCursor == index) {
// 在当前时间片里继续+1
sum += timeSlices[index].incrementAndGet();
} else {
// 可能有其他thread已经置过1,问题不大
timeSlices[index].set(1);
// 清零,访问量不大时会有时间片跳跃的情况
clearBetween(oldCursor, index);
// sum += 0;
}
for (int i = 1; i < windowSize; i++) {
sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
}
return sum;
}
/**
* 判断是否允许进行访问,未超过阈值的话才会对某个时间片+1
*/
public boolean allow(int threshold) {
initTimeSlices();
int index = locationIndex();
int sum = 0;
// cursor不等于index,将cursor设置为index
int oldCursor = cursor.getAndSet(index);
if (oldCursor != index) {
// 可能有其他thread已经置过1,问题不大
timeSlices[index].set(0);
// 清零,访问量不大时会有时间片跳跃的情况
clearBetween(oldCursor, index);
}
for (int i = 1; i < windowSize; i++) {
sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
}
// 阈值判断
if (sum <= threshold) {
// 未超过阈值才+1
sum += timeSlices[index].incrementAndGet();
return true;
}
return false;
}
/**
* 将fromIndex~toIndex之间的时间片计数都清零
* 极端情况下,当循环队列已经走了超过1个timeSliceSize以上,这里的清零并不能如期望的进行
*
* @param fromIndex 不包含
* @param toIndex 不包含
*/
private void clearBetween(int fromIndex, int toIndex) {
for (int index = (fromIndex + 1) % timeSliceSize; index != toIndex; index = (index + 1) % timeSliceSize) {
timeSlices[index].set(0);
}
}
}
单元测试
java
import org.junit.Test;
import static org.junit.Assert.*;
public class CounterDemoTest {
@Test
public void testGrantWithinLimit() {
CounterDemo counter = new CounterDemo();
boolean result;
// 第一个请求
result = counter.grant();
assertTrue(result);
// 达到最大请求数
for (int i = 0; i < counter.limit - 1; i++) {
result = counter.grant();
assertTrue(result);
}
// 超过最大请求数
result = counter.grant();
assertFalse(result);
}
@Test
public void testGrantAfterInterval() throws InterruptedException {
CounterDemo counter = new CounterDemo();
boolean result;
// 达到最大请求数
for (int i = 0; i < counter.limit; i++) {
result = counter.grant();
assertTrue(result);
}
// 等待一个时间窗口
Thread.sleep(counter.interval);
// 第一个请求在新的时间窗口
result = counter.grant();
assertTrue(result);
// 达到最大请求数
for (int i = 0; i < counter.limit - 1; i++) {
result = counter.grant();
assertTrue(result);
}
// 超过最大请求数
result = counter.grant();
assertFalse(result);
}
}
3.漏桶算法
所有请求以一定速率通过一个有固定容量的漏桶(队列),超过处理能力的请求被溢出拒绝或者等待桶中的请求处理完成
示例代码
java
public class LeakyDemo {
private long timeStamp = System.currentTimeMillis();
private int capacity; // 桶的容量
private int rate; // 水漏出的速度
private int water; // 当前水量(当前累积请求数)
public synchronized boolean grant() {
long now = System.currentTimeMillis();
water = (int)Math.max(0, water - (now - timeStamp) * rate); // 先执行漏水,计算剩余水量
timeStamp = now;
if ((water + 1) < capacity) {
// 尝试加水,并且水还未满
water += 1;
return true;
}
else {
// 水满,拒绝加水
return false;
}
}
}
在并发场景中,grant
方法需使用synchronized
修饰,确保在同一时刻只有一个线程可以进入该方法。这样可以确保在多线程环境下对共享变量的修改是原子性的,并且避免了数据竞争问题。
单元测试
java
import org.junit.Test;
import static org.junit.Assert.*;
public class LeakyDemoTest {
@Test
public void testGrant() throws InterruptedException {
LeakyDemo leakyDemo = new LeakyDemo();
leakyDemo.capacity = 10;
leakyDemo.rate = 1;
// 模拟连续请求
for (int i = 0; i < 9; i++) {
boolean result = leakyDemo.grant();
System.out.println(result);
assertTrue(result); // 确保前9次请求都能够被授权
}
// 再次请求,此时桶已满,应该被拒绝
boolean result = leakyDemo.grant();
assertFalse(result); // 确保第10次请求被拒绝
// 等待一段时间,保证水有足够的时间漏掉
Thread.sleep(2000);
// 再次请求,此时桶已经空了,应该能够被授权
result = leakyDemo.grant();
assertTrue(result); // 确保11次请求被授权
}
}
4.令牌桶
系统以一定的速率往桶里添加令牌,请求来时先尝试从桶中获取令牌,只有拿到令牌的请求才能继续执行,否则就被限流。
示例代码
java
public class TokenBucketDemo {
public long timeStamp = System.currentTimeMillis();
public int capacity; // 桶的容量
public int rate; // 令牌放入速度
public int tokens; // 当前令牌数量
public boolean grant() {
long now = System.currentTimeMillis();
// 先添加令牌
tokens = (int)Math.min(capacity, tokens + (now - timeStamp) * rate);
timeStamp = now;
if (tokens < 1) {
// 若不到1个令牌,则拒绝
return false;
}
else {
// 还有令牌,领取令牌
tokens -= 1;
return true;
}
}
}
Guava的RateLimiter、阿里巴巴开源的Sentinel以及Netflix的Hystrix都有实现这两种算法的限流器。
单元测试
java
import org.junit.Test;
import static org.junit.Assert.*;
public class TokenBucketDemoTest {
@Test
public void testGrantWithEnoughTokens() {
TokenBucketDemo tokenBucket = new TokenBucketDemo();
tokenBucket.capacity = 10;
tokenBucket.rate = 1;
tokenBucket.tokens = 5; // 设置初始令牌数量为5
// 领取令牌
boolean result = tokenBucket.grant();
assertTrue(result); // 应该允许领取令牌
assertEquals(4, tokenBucket.tokens); // 领取后令牌数量应减少1
// 再次领取令牌
result = tokenBucket.grant();
assertTrue(result); // 应该允许领取令牌
assertEquals(3, tokenBucket.tokens); // 领取后令牌数量应减少1
}
@Test
public void testGrantWithoutEnoughTokens() {
TokenBucketDemo tokenBucket = new TokenBucketDemo();
tokenBucket.capacity = 10;
tokenBucket.rate = 1;
tokenBucket.tokens = 0; // 设置初始令牌数量为0
// 尝试领取令牌
boolean result = tokenBucket.grant();
assertFalse(result); // 应该拒绝领取令牌
assertEquals(0, tokenBucket.tokens); // 令牌数量应该仍然为0
}
@Test
public void testGrantWithMaxCapacity() {
TokenBucketDemo tokenBucket = new TokenBucketDemo();
tokenBucket.capacity = 10;
tokenBucket.rate = 1;
tokenBucket.tokens = 10; // 设置初始令牌数量为最大容量10
// 领取令牌
boolean result = tokenBucket.grant();
assertTrue(result); // 应该允许领取令牌
assertEquals(9, tokenBucket.tokens); // 领取后令牌数量应减少1
}
}
Nginx作为网关层限流
Nginx可以通过ngx_http_limit_req_module
模块实现基本的限流功能,例如限制每秒请求数或每分钟请求数。
shell
http {
limit_req_zone $binary_remote_addr zone=one:10m rate=10r/s;
server {
location /service {
limit_req zone=one burst=20;
proxy_pass http://backend-service;
}
}
}
在上面的配置中,limit_req_zone
指令用于定义一个限制请求的区域,并指定了请求频率为每秒不超过 10 个(rate=10r/s
)。limit_req
指令则用于在 location
配置中应用这个限制请求的区域,burst=20
指定了允许的突发请求数量。
这样配置后,当某个客户端对 /service
接口的请求频率超过每秒 10 个时,Nginx 将会返回 503 错误,拒绝处理这些请求。而在突发情况下,如果请求数量超过了 burst
参数指定的值,Nginx 也会进行排队处理,而不是直接返回错误。
API Gateway服务限流
对于微服务架构,可以在API Gateway层设置全局流量控制,根据服务级别的QPS(每秒查询数)进行限流。
服务内部限流
使用Spring Cloud Alibaba Sentinel或Netflix Hystrix等框架,在服务内部对特定资源、方法或服务调用进行限流,可以设置线程池隔离策略或者信号量隔离策略。
数据库连接池限流
配置合理的数据库连接池大小和策略,确保不会因为过多的数据库连接导致数据库服务器压力过大。
分布式限流
利用Redis或其他分布式存储技术实现分布式限流,保证整个集群的限流效果一致。
熔断降级
结合限流与熔断机制,当服务负载过高时,除了限流还可以选择暂时拒绝部分请求或者将服务降级至备用方案。
自定义限流策略
根据业务需求定制限流规则,比如按照用户ID、IP地址、请求类型等维度进行限流。