强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

异步与协程精讲 / 第15章:背压机制 —— 流控的艺术

第15章:背压机制 —— 流控的艺术

15.1 什么是背压?

背压(Backpressure)是一种下游向上游传递压力的流控机制。当消费者(下游)的处理速度跟不上生产者(上游)的产生速度时,消费者需要有一种方式告诉生产者:“慢一点,我处理不过来了。”

没有背压时会怎样?

生产者(快) → → → → → → → → 消费者(慢)
   1000 msg/s                    100 msg/s

           ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓
       缓冲区不断增长...
           ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓
       内存溢出(OOM)!💥

有背压时

生产者(调整为 100 msg/s) → → → 消费者(100 msg/s)
        ↑
        │ 反压信号:"我处理不过来了"
        │
      消费者

15.2 推送式 vs 拉取式

推送式(Push-based)

生产者 ──主动发送──► 消费者

特点:
- 生产者控制速率
- 消费者被动接收
- 快生产者 + 慢消费者 → 溢出

代表:
- Node.js EventEmitter
- Kafka 生产者
- HTTP/2 Server Push

拉取式(Pull-based)

消费者 ──主动请求──► 生产者

特点:
- 消费者控制速率
- 生产者被动响应
- 天然背压:消费者不拉取,生产者就不生产

代表:
- Go Channel(发送方阻塞直到接收方准备好)
- Kafka 消费者
- Iterable/Iterator 模式

对比

特性 推送式 拉取式
速率控制 生产者主导 消费者主导
背压实现 需要额外机制 天然支持
延迟 低(立即推送) 略高(需请求)
复杂度 需要流控逻辑 简单
适用场景 实时通知 数据管道

15.3 各语言的背压机制

Go Channel(天然拉取式)

// Go Channel 天然支持背压
// 当缓冲区满时,发送方阻塞

func producer(ch chan<- int) {
    for i := 0; ; i++ {
        ch <- i  // 如果缓冲区满,这里会阻塞 → 天然背压
        fmt.Printf("生产: %d\n", i)
    }
}

func consumer(ch <-chan int) {
    for v := range ch {
        time.Sleep(100 * time.Millisecond)  // 慢消费者
        fmt.Printf("消费: %d\n", v)
    }
}

func main() {
    ch := make(chan int, 10)  // 缓冲区大小 10
    go producer(ch)
    consumer(ch)
}

Python asyncio(Semaphore 限流)

import asyncio

async def bounded_pipeline(source, max_in_flight: int = 10):
    semaphore = asyncio.Semaphore(max_in_flight)
    
    async def process_item(item):
        async with semaphore:  # 限制并发数
            result = await expensive_operation(item)
            return result
    
    tasks = [process_item(item) async for item in source]
    return await asyncio.gather(*tasks)

JavaScript(可读流)

const { Readable, Transform, Writable } = require('stream');

// Node.js Stream 内置背压
const readable = new Readable({
    read(size) {
        // 当消费者消费完后,这里会被调用
        // 生产更多数据
    }
});

const writable = new Writable({
    write(chunk, encoding, callback) {
        // 如果写入速度跟不上
        // writable.write() 返回 false
        // readable 自动暂停
        slowDatabaseInsert(chunk, callback);
    }
});

readable.pipe(writable);
// Stream.pipe() 自动处理背压

Rust(异步 Stream)

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // 带缓冲的通道,支持背压
    let (tx, mut rx) = mpsc::channel(100);

    // 生产者
    tokio::spawn(async move {
        for i in 0..10000 {
            tx.send(i).await.unwrap();  // 缓冲区满时阻塞
        }
    });

    // 慢消费者
    while let Some(value) = rx.recv().await {
        tokio::time::sleep(Duration::from_millis(100)).await;
        println!("处理: {}", value);
    }
}

15.4 缓冲策略

策略 行为 内存占用 数据安全 适用场景
无缓冲 生产者直接等待消费者 最低 不丢失 精确处理
有界缓冲 固定大小缓冲区 可控 不丢失 大多数场景
无界缓冲 无限增长 危险 不丢失 短暂突发
丢弃新数据 满时丢弃最新 固定 可能丢失 实时指标
丢弃旧数据 满时丢弃最旧 固定 可能丢失 日志、事件流
采样 按比例丢弃 固定 丢失部分 监控、统计

丢弃策略实现

// 丢弃新数据(Drop Newest)
func dropNewestProducer(ch chan<- int, value int) bool {
    select {
    case ch <- value:
        return true  // 发送成功
    default:
        return false  // 缓冲区满,丢弃
    }
}

// 丢弃旧数据(Drop Oldest)
func dropOldestProducer(ch chan int, value int) {
    select {
    case ch <- value:
        return  // 发送成功
    default:
        select {
        case <-ch:  // 丢弃一个旧数据
        default:
        }
        ch <- value  // 再尝试发送
    }
}

15.5 动态背压

在实际系统中,背压阈值应该是动态调整的:

class AdaptiveRateLimiter:
    """自适应速率限制器"""
    
    def __init__(self, initial_rate: float = 100.0):
        self.rate = initial_rate
        self.min_rate = 10.0
        self.max_rate = 10000.0
        self.success_count = 0
        self.error_count = 0
        self.window_size = 100
    
    def on_success(self):
        self.success_count += 1
        self._adjust()
    
    def on_error(self):
        self.error_count += 1
        self._adjust()
    
    def _adjust(self):
        total = self.success_count + self.error_count
        if total >= self.window_size:
            error_rate = self.error_count / total
            if error_rate > 0.1:  # 错误率 > 10%,降低速率
                self.rate = max(self.min_rate, self.rate * 0.8)
            elif error_rate < 0.01:  # 错误率 < 1%,提高速率
                self.rate = min(self.max_rate, self.rate * 1.1)
            self.success_count = 0
            self.error_count = 0
    
    async def acquire(self):
        interval = 1.0 / self.rate
        await asyncio.sleep(interval)

15.6 业务场景:日志收集系统

场景

一个日志收集系统,日志产生速度可能瞬间飙升(如电商大促),但写入存储的速度是有限的。

import asyncio
from collections import deque

class LogCollector:
    def __init__(self, max_buffer: int = 10000, flush_interval: float = 1.0):
        self.buffer = deque(maxlen=max_buffer)  # 有界缓冲,丢弃旧数据
        self.max_buffer = max_buffer
        self.flush_interval = flush_interval
        self.lock = asyncio.Lock()
    
    async def ingest(self, log_entry: dict):
        """接收日志(可能非常快)"""
        async with self.lock:
            if len(self.buffer) >= self.max_buffer:
                dropped = self.buffer.popleft()  # 丢弃最旧的
                print(f"缓冲区满,丢弃旧日志: {dropped.get('id')}")
            self.buffer.append(log_entry)
    
    async def flush_loop(self):
        """定期刷写到存储"""
        while True:
            await asyncio.sleep(self.flush_interval)
            async with self.lock:
                if not self.buffer:
                    continue
                batch = list(self.buffer)
                self.buffer.clear()
            
            try:
                await self.write_to_storage(batch)
            except Exception as e:
                print(f"刷写失败: {e}")
                # 可以重新放回缓冲区或写入死信队列
    
    async def write_to_storage(self, batch: list[dict]):
        """批量写入存储(有速率限制)"""
        await asyncio.sleep(0.1)  # 模拟写入延迟
        print(f"写入 {len(batch)} 条日志")

15.7 Reactive Streams 规范

Reactive Streams 是一个跨语言的背压标准,定义了四个接口:

Publisher<T> ──────► Subscriber<T>
     │                    ▲
     │   onNext(T)        │
     │ ──────────────────► │
     │                    │
     │   request(n)       │
     │ ◄────────────────── │
     │                    │
     │   onError(e)       │
     │ ──────────────────► │
     │                    │
     │   onComplete()     │
     │ ──────────────────► │
接口 方法 作用
Publisher subscribe(Subscriber) 订阅
Subscriber onNext(T) 接收数据
Subscriber request(n) 请求 n 个数据(背压)
Subscription cancel() 取消订阅

15.8 本章小结

要点 说明
背压定义 下游向上游传递压力
推送式 生产者主导,需要额外流控
拉取式 消费者主导,天然背压
缓冲策略 无缓冲、有界、无界、丢弃
动态调整 根据错误率自适应调整速率
Reactive Streams 跨语言背压标准

下一章预告:异步代码的测试比同步代码更复杂——如何处理超时、竞态和确定性?


扩展阅读