10.限流
# 01.漏桶算法
# 1.1 漏桶算法介绍
- 水滴先进入漏桶,漏桶以一定速度向外出水,当水流入速度过大,桶会直接溢出。
- 即Request进入一个固定容量的Queue,若Queue满,则拒绝新的Request,可以阻塞,也可以抛异常。
- 这种模型其实非常类似MQ的思想,利用漏桶削峰填谷,使得Queue的下游具有一个稳定流量。
- 漏桶算法可以控制端口的流量输出速率,从而为网络提供一个稳定的流量。
- golang实现思路
- 使用结构体记录 lastAccessTime(上次访问时间),dropsNum(固定时间访问了多少次)
- 下一次访问计算出这段时间里 又新增了多少次可以访问的次数,把这些新增次数放到 dropsNum中
# 1.2 golang实现漏桶算法
package main
import (
"fmt"
"time"
)
// 桶
type LeakBucket struct {
capacity int // 容量,固定时间语序访问次数
interval time.Duration // 允许访问的时间间隔
dropsNum int // 固定时间访问了多少次
lastAccessTime time.Time // 最近一次的访问时间
}
func (b *LeakBucket) accessControl() bool {
now := time.Now()
pastTime := now.Sub(b.lastAccessTime) // 当前时间距离上次访问时间间隔
leaks := int(float64(pastTime) / float64(b.interval)) // 过去这段时间pastTime可以允许多少访问
// 更新已经访问次数
if leaks > 0 { // 说明这段时间可以有leaks可以访问,但没有用户访问
if b.dropsNum <= leaks { // 固定时间访问次数清零(从头开始统计)
b.dropsNum = 0
} else {
b.dropsNum -= leaks // 已经访问的资源次数减少这段时间新增加的次数(可以接受更多的资源)
}
b.lastAccessTime = now // 更新访问时间
}
fmt.Printf("=====> 当前已经访问 %d ,最多可以访问 %d \n", b.dropsNum, b.capacity)
if b.dropsNum < b.capacity { // 在允许访问之内
b.dropsNum ++
return true
} else {
return false
}
}
func main() {
bucket := &LeakBucket{
capacity: 5,
interval: time.Second,
}
for j := 0; j < 20; j++ {
allowed := bucket.accessControl()
fmt.Printf("已经访问次数=%d allowed=%v j=%v\n", bucket.dropsNum, allowed, j)
time.Sleep(time.Millisecond * 500)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# 02.令牌桶算法
# 2.1 令牌桶算法介绍
- 系统以一个恒定的速率往桶放入令牌,若有请求需要处理,则从令牌桶里获取令牌,当桶里没有令牌,则拒绝服务。
- 令牌桶算法并不能实际的控制速率,比如,10秒往桶里放入10000个令牌桶,即10秒内只能处理10000个请求,那么qps就是100。
- 但这种模型可以出现1秒内把10000个令牌全部消费完,即qps为10000。
- 所以令牌桶算法实际是限制的平均流速,具体控制的粒度以放令牌的间隔和每次的量来决定。
- 若想要把流速控制的更加稳定,就要缩短间隔时间,Google Guava中的RateLimter就是利用的令牌桶原理。
注:令牌桶算法和漏桶算法是相反的,一个“进水”,一个是“漏水”。
# 2.2 两种算法适用场景
- 漏桶算法与令牌桶算法的区别在于,漏桶算法能够强行限制数据的传输速率,不能够有效地使用网络资源。
- 令牌桶算法能够在限制数据的平均传输速率的同时还允许某种程度的突发传输。
- 因为漏桶的漏出速率是固定的,所以即使网络中没有发生拥塞,漏桶算法也不能使某一个单独的数据流达到端口速率
# 2.3 令牌桶使用
package main
import (
"fmt"
"net/http"
"time"
"golang.org/x/time/rate"
)
func main() {
//r := rate.Every(1 * time.Millisecond)
r := rate.Every(1 * time.Second)
limit := rate.NewLimiter(r, 10)
http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
if limit.Allow() {
fmt.Printf("请求成功,当前时间:%s\n", time.Now().Format("2006-01-02 15:04:05"))
} else {
fmt.Printf("请求成功,但是被限流了。。。\n")
}
})
_ = http.ListenAndServe(":8081", nil)
}
/*
请求成功,当前时间:2022-01-27 16:16:03
请求成功,但是被限流了。。。
请求成功,当前时间:2022-01-27 16:16:03
请求成功,但是被限流了。。。
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 3.4 令牌桶算法原理
- 限流器定义
- limit、burst 和 token 是这个限流器中核心的参数,请求并发的大小在这里实现的
type Limiter struct {
mu sync.Mutex // 互斥锁(排他锁)
limit Limit // 放入桶的频率 float64 类型
burst int // 桶的大小
tokens float64 // 令牌 token 当前剩余的数量
last time.Time // 最近取走 token 的时间
lastEvent time.Time // 最近限流事件的时间
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
- 在令牌发放之后,会存储在 Reservation 预约对象中
type Reservation struct {
ok bool // 是否满足条件分配了 token
lim *Limiter // 发送令牌的限流器
tokens int // 发送 token 令牌的数量
timeToAct time.Time // 满足令牌发放的时间
limit Limit // 令牌发放速度
}
1
2
3
4
5
6
7
2
3
4
5
6
7
- 具体源码
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
// 首先判断是否放入频率是否为无穷大
// 如果为无穷大,说明暂时不限流
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
// 拿到截至 now 时间时
// 可以获取的令牌 tokens 数量及上一次拿走令牌的时间 last
now, last, tokens := lim.advance(now)
// 更新 tokens 数量
tokens -= float64(n)
// 如果 tokens 为负数,代表当前没有 token 放入桶中
// 说明需要等待,计算等待的时间
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// 计算是否满足分配条件
// 1、需要分配的大小不超过桶的大小
// 2、等待时间不超过设定的等待时长
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// 预处理 reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
// 若当前满足分配条件
// 1、设置分配大小
// 2、满足令牌发放的时间 = 当前时间 + 等待时长
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// 更新 limiter 的值,并返回
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
lim.mu.Unlock()
return r
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# 03.滑动窗口
# 3.1 计数器限流
在一段时间间隔内,对请求进行计数,与阀值进行比较判断是否需要限流,一旦到了时间临界点,将计数器清零
这个就像你去坐车一样,车厢规定了多少个位置,满了就不让上车了,
可以在程序中设置一个变量
count
,当过来一个请求我就将这个数+1
,同时记录请求时间。当下一个请求来的时候判断
count
的计数值是否超过设定的频次,以及当前请求的时间和第一次请求时间是否在1
分钟内。如果在
1
分钟内并且超过设定的频次则证明请求过多,后面的请求就拒绝掉。如果该请求与第一个请求的间隔时间大于计数周期,且
count
值还在限流范围内,就重置count
。
# 3.2 滑动窗口原理
滑动窗口
是针对计数器存在的临界点缺陷,所谓滑动窗口(sliding window)
是一种流量控制技术,这个词出现在tcp
协议中。滑动窗口
把固定时间片进行划分,并且随着时间的流逝,进行移动,固定数量的可以移动的格子,进行计数并判断阀值。- 们用红色的虚线代表一个时间窗口(
一分钟
),每个时间窗口有6
个格子,每个格子是10
秒钟。 - 每过
10
秒钟时间窗口向右移动一格,可以看红色箭头的方向。 - 我们为每个格子都设置一个独立的计数器
counter
,假如一个请求在0:45
访问了那么我们将第五个格子的计数器+1
(也是就是0:40~0:50
) - 在判断限流的时候需要把所有格子的计数加起来和设定的频次进行比较即可。
- 当用户在
0:59
秒钟发送了200
个请求就会被第六个格子的计数器记录+200
- 当下一秒的时候时间窗口向右移动了一个,此时计数器已经记录了该用户发送的
200
个请求 - 所以再发送的话就会触发限流,则拒绝新的请求。
其实计数器就是滑动窗口啊,只不过只有一个格子而已
,所以想让限流做的更精确只需要划分更多的格子就可以了- 格子的数量影响着滑动窗口算法的精度,依然有时间片的概念,无法根本解决临界点问题
上次更新: 2024/3/13 15:35:10