限流算法主要有如下几种:
/**
* The minimalistic token-bucket implementation
*/
public class MinimalisticTokenBucket {
private final long capacity;
private final double refillTokensPerOneMillis;
private double availableTokens;
private long lastRefillTimestamp;
/**
* Creates token-bucket with specified capacity and refill rate equals to refillTokens/refillPeriodMillis
*/
public MinimalisticTokenBucket(long capacity, long refillTokens, long refillPeriodMillis) {
this.capacity = capacity;
this.refillTokensPerOneMillis = (double) refillTokens / (double) refillPeriodMillis;
this.availableTokens = capacity;
this.lastRefillTimestamp = System.currentTimeMillis();
}
synchronized public boolean tryConsume(int numberTokens) {
refill();
if (availableTokens < numberTokens) {
return false;
} else {
availableTokens -= numberTokens;
return true;
}
}
private void refill() {
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis > lastRefillTimestamp) {
long millisSinceLastRefill = currentTimeMillis - lastRefillTimestamp;
double refill = millisSinceLastRefill * refillTokensPerOneMillis;
this.availableTokens = Math.min(capacity, availableTokens + refill);
this.lastRefillTimestamp = currentTimeMillis;
}
}
private static final class Selftest {
public static void main(String[] args) {
// 100 tokens per 1 second
MinimalisticTokenBucket limiter = new MinimalisticTokenBucket(100, 100, 1000);
long startMillis = System.currentTimeMillis();
long consumed = 0;
while (System.currentTimeMillis() - startMillis < 10000) {
if (limiter.tryConsume(1)) {
consumed++;
}
}
System.out.println(consumed);
}
}
}
以上是bucket4j给出的一个简单实现,用于理解token bucket算法
这个算法没有采用线程去refill token,因为bucket太多的话,线程太多,耗cpu
这个算法没有存储每个period使用的token,设计了lastRefillTimestamp字段,用于计算需要填充的token
每次tryConsume的时候,方法内部首先调用refill,根据设定的速度以及时间差计算这个时间段需要补充的token,更新availableTokens以及lastRefillTimestamp
之后限流判断,就是判断availableTokens与请求的numberTokens
package main
import (
"log"
)
type ConnLimiter struct {
concurrentConn int
bucket chan int
}
// go 无原生构造函数,必须手动定义并实现
func NewConnLimiter(cc int) *ConnLimiter {
return &ConnLimiter {
concurrentConn: cc,
bucket: make(chan int, cc),
}
}
func (cl *ConnLimiter) GetConn() bool {
if len(cl.bucket) >= cl.concurrentConn {
log.Printf("Reached the rate limitation.")
return false
}
cl.bucket <- 1
return true
}
func (cl *ConnLimiter) ReleaseConn() {
c :=<- cl.bucket // 释放写进去的token
log.Printf("New connction coming: %d", c)
}
令牌桶算法,其核心是想通过限流器,必须拿到令牌。
只要我们能够限制发放令牌的速率,那么就能控制流速:
b 其实是burst的简写,意义是限流器允许的最大突发流量。比如b=10,而且令牌桶中的令牌已满,此时限流器允许10个请求同时通过限流器,这只是突发流量,这10个请求会带走10个令牌,所以后续流量只能按照速率 r 通过限流器。
如何实现呢?基于生产者-消费者模式?
设计看上去很完美,实现也简单,若并发量不大,这没有什么问题。可使用限流大部分都是高并发场景,而且系统压力已经临近极限了,此时这个实现就有问题了。
问题出在定时器,高并发下,系统压力已临近极限,定时器精度误差会很大,定时器本身还会创建调度线程,对系统性能影响极大。
所以Guava没有使用定时器,它是如何实现的呢?
Guava的令牌桶算法关键是记录并动态计算下一令牌的发放时间。
假设令牌桶的容量为 b=1,限流速率 r = 1个请求/s。如下所示,若当前令牌桶无令牌,下一个令牌的发放时间是在第3s,而在第2s时,有个线程T1请求令牌,此时该如何处理?
由于下一个令牌产生的时间是第4s,所以线程T2要等待2s,才能获取到令牌,同时由于T2预占第4s令牌,所以下一令牌产生时间还要增加1s
所以我们只需要记录一个下一令牌产生的时间,并动态更新它。
依然假设令牌桶的容量是1。关键是reserve()方法,这个方法会为请求令牌的线程预分配令牌,同时返回该线程能够获取令牌的时间。其实现逻辑就是上面提到的:如果线程请求令牌的时间在下一令牌产生时间之后,那么该线程立刻就能够获取令牌;反之,如果请求时间在下一令牌产生时间之前,那么该线程是在下一令牌产生的时间获取令牌。由于此时下一令牌已经被该线程预占,所以下一令牌产生的时间需要加上1秒。
3 小结token bucket算法,是基于QPS来限流,其简单的实现,就是计算单位时间补充token的速率,然后每次tryConsume的时候根据速率修正availableTokens。
参考
联系客服