go的限流

背景

服务请求下游,oom,排查下来发现是一个下游组件qps陡增导致
但是司内网络框架比较挫,竟然不负责框架内存问题(有内存管理模块,但逻辑又是无限制使用内存)
每个请求一个r、w buffer,请求无限制,内存不够直接就oom,然后就被linux给迁移掉了
所以才有了加限流的必要性(粉饰太平)
所以站在更高维度去考虑这个问题,就变成了一个网络框架是否要去管理内存?

作用

限制请求速率,保护服务,以免服务过载

常用的限流方法

固定窗口、滑动窗口、漏桶、令牌桶

令牌桶:
一个固定大小的桶,系统会以恒定速率向桶中放 Token,桶满则暂时不放
如果桶中有剩余 Token 就可以一直取,如果没有剩余 Token,则需要等到桶中被放置 Token/直接返回失败

本次学习令牌

golang.org/x/time/rate

代码实现

// A Limiter controls how frequently events are allowed to happen.
// Limiter 用来限制发生事件的频率
// 
// It implements a "token bucket" of size b, initially full and refilled
// at rate r tokens per second.
// 初始的时候满的,然后以每秒r tokens的速率来填充,bucket大小为b
// 
// Informally, in any large enough time interval, the Limiter limits the
// rate to r tokens per second, with a maximum burst size of b events.
// 
// As a special case, if r == Inf (the infinite rate), b is ignored.
// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.
// r还能设置为 Inf?
// 
// The zero value is a valid Limiter, but it will reject all events.
// Use NewLimiter to create non-zero Limiters.
// 还能有拒绝全部的零值Limiter?妙啊
//
// Limiter has three main methods, Allow, Reserve, and Wait.
// Most callers should use Wait.
// 三个方法:Allow, Reserve, Wait,每个方法都会消耗一个token
//
// Each of the three methods consumes a single token.
// They differ in their behavior when no token is available.
// If no token is available, Allow returns false.
// Allow:非阻塞
// 
// If no token is available, Reserve returns a reservation for a future token
// and the amount of time the caller must wait before using it.
// Reserve 还能预定? 牛哇,还能返回如果一定要使用的话,要等多久
// 
// If no token is available, Wait blocks until one can be obtained
// or its associated context.Context is canceled.
// Wait:阻塞
//
// The methods AllowN, ReserveN, and WaitN consume n tokens.
// AllowN、ReserveN、WaitN:消费n个token
type Limiter struct {
	mu     sync.Mutex
	// 每秒的事件数,即事件速率
	limit  Limit
	// 单次调用(Allow, Reserve, Wait)中消费token的最大数
	// 更高的 Burst 的值,将会一次允许更多的事件发生(at once)
	burst  int
	
	tokens float64
	
	// last is the last time the limiter's tokens field was updated
	// limiter的tokens字段的前一次被更新的事件
	last time.Time
	// lastEvent is the latest time of a rate-limited event (past or future)
	// 速率受限事件(past or future)的前一次时间
	lastEvent time.Time
}

// A zero Limit allows no events.
type Limit float64

看了结构体就知道如何设计一个很粗糙的限流器了
没有用复杂的结构,就是一个简单的原子计数

使用

// NewLimiter returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
// 速率r,一次b个突发
func NewLimiter(r Limit, b int) *Limiter {
	return &Limiter{
		limit: r,
		burst: b,
	}
}

NewLimiter的参数除了用Limit传,还能传间隔

func Every(interval time.Duration) Limit {
	if interval <= 0 {
		return Inf
	}
	return 1 / Limit(interval.Seconds())
}

Allow、AllowN、Reserve、ReserveN、Wait、WaitN
里面用的都是 reserveN

// Allow reports whether an event may happen now.
func (lim *Limiter) Allow() bool {
	return lim.AllowN(time.Now(), 1)
}

// AllowN reports whether n events may happen at time t.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(t time.Time, n int) bool {
	return lim.reserveN(t, n, 0).ok
}

// Reserve is shorthand for ReserveN(time.Now(), 1).
func (lim *Limiter) Reserve() *Reservation {
	return lim.ReserveN(time.Now(), 1)
}

// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
// The Limiter takes this Reservation into account when allowing future events.
// The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size.
// Usage example:
//
//	r := lim.ReserveN(time.Now(), 1)
//	if !r.OK() {
//	  // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
//	  return
//	}
//	time.Sleep(r.Delay())
//	Act()
//
// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
// If you need to respect a deadline or cancel the delay, use Wait instead.
// To drop or skip events exceeding rate limit, use Allow instead.
func (lim *Limiter) ReserveN(t time.Time, n int) *Reservation {
	r := lim.reserveN(t, n, InfDuration)
	return &r
}

// Wait is shorthand for WaitN(ctx, 1).
func (lim *Limiter) Wait(ctx context.Context) (err error) {
	return lim.WaitN(ctx, 1)
}

// WaitN blocks until lim permits n events to happen.
// It returns an error if n exceeds the Limiter's burst size, the Context is
// canceled, or the expected wait time exceeds the Context's Deadline.
// The burst limit is ignored if the rate limit is Inf.
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
	// The test code calls lim.wait with a fake timer generator.
	// This is the real timer generator.
	newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) {
		timer := time.NewTimer(d)
		return timer.C, timer.Stop, func() {}
	}

	return lim.wait(ctx, n, time.Now(), newTimer)
}

// wait is the internal implementation of WaitN.
func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {
	lim.mu.Lock()
	burst := lim.burst
	limit := lim.limit
	lim.mu.Unlock()

	// 在限流的情况下,一次请求索要超过 burst 的令牌
	if n > burst && limit != Inf {
		return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
	}
	// Check if ctx is already cancelled
	// 用这种方式检查ctx是否结束
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
		// 这里有 default,所以不会卡住
	}
	// Determine wait limit
	waitLimit := InfDuration
	
	// 它竟然还照顾了ctx的Deadline,牛哇牛哇
	if deadline, ok := ctx.Deadline(); ok {
		// t是当前时间,deadline-t就是等待时长
		waitLimit = deadline.Sub(t)
	}
	// Reserve
	r := lim.reserveN(t, n, waitLimit)
	if !r.ok {
		return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
	}
	// Wait if necessary
	// DelayFrom returns the duration for which the reservation holder must wait
	// before taking the reserved action.  Zero duration means act immediately.
	// InfDuration means the limiter cannot grant the tokens requested in this
	// Reservation within the maximum wait time.
	// 0时长意味着立即执行,不用等limiter
	// InfDuration时长意味着在此次最大的等待时间里,无法授权这么多的token
	delay := r.DelayFrom(t)
	if delay == 0 {
		return nil
	}
	
	// newTimer 中创建了一个定时器,这里用完要停止,不然系统中的定时器越来越多
	// ch:delay定时器
	// stop:定时器取消函数
	// advance:仅用于测试,设置钩子,牛哇牛哇
	ch, stop, advance := newTimer(delay)
	defer stop()
	advance() // only has an effect when testing

	// 等待谁先来
	select {
	case <-ch:
		// We can proceed.
		return nil
	case <-ctx.Done():
		// Context was canceled before we could proceed.  Cancel the
		// reservation, which may permit other events to proceed sooner.
		r.Cancel()
		return ctx.Err()
	}
}

源码学习

在一个快变现的现状下,lim.reserveN都没有心思学,悲哀

lim.reserveN

问题

它是单实例还是分布式的?

在单个实例中对资源访问或操作进行限速,属于单实例限流

分布式限流通常涉及到跨进程或跨机器的状态共享与同步,通常需要额外的基础设施支持,比如分布式缓存(例如 Redis)或数据库,来保持限流状态的一致性
golang.org/x/time/rate 包并不为这些提供内置支持

如果需要在分布式环境中实现限流,需要考虑使用一个中心化的存储解决方案来同步不同节点之间的限流状态
或者采用其他的分布式限流策略。这可能涉及到一些复杂性
因为需要管理共享状态和处理分布式系统中可能出现的各种问题
例如网络分区、延迟波动、以及同步状态时的竞态条件等