强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

异步与协程精讲 / 第15章:背压机制 —— 流控的艺术

第15章:背压机制 —— 流控的艺术

15.1 什么是背压?

背压(Backpressure)是一种下游向上游传递压力的流控机制。当消费者(下游)的处理速度跟不上生产者(上游)的产生速度时,消费者需要有一种方式告诉生产者:“慢一点,我处理不过来了。”

没有背压时会怎样?

生产者(快) → → → → → → → → 消费者(慢)
   1000 msg/s                    100 msg/s

           ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓
       缓冲区不断增长...
           ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓
       内存溢出(OOM)!💥

有背压时

生产者(调整为 100 msg/s) → → → 消费者(100 msg/s)
        ↑
        │ 反压信号:"我处理不过来了"
        │
      消费者

15.2 推送式 vs 拉取式

推送式(Push-based)

生产者 ──主动发送──► 消费者

特点:
- 生产者控制速率
- 消费者被动接收
- 快生产者 + 慢消费者 → 溢出

代表:
- Node.js EventEmitter
- Kafka 生产者
- HTTP/2 Server Push

拉取式(Pull-based)

消费者 ──主动请求──► 生产者

特点:
- 消费者控制速率
- 生产者被动响应
- 天然背压:消费者不拉取,生产者就不生产

代表:
- Go Channel(发送方阻塞直到接收方准备好)
- Kafka 消费者
- Iterable/Iterator 模式

对比

特性推送式拉取式
速率控制生产者主导消费者主导
背压实现需要额外机制天然支持
延迟低(立即推送)略高(需请求)
复杂度需要流控逻辑简单
适用场景实时通知数据管道

15.3 各语言的背压机制

Go Channel(天然拉取式)

// Go Channel 天然支持背压
// 当缓冲区满时,发送方阻塞

func producer(ch chan<- int) {
    for i := 0; ; i++ {
        ch <- i  // 如果缓冲区满,这里会阻塞 → 天然背压
        fmt.Printf("生产: %d\n", i)
    }
}

func consumer(ch <-chan int) {
    for v := range ch {
        time.Sleep(100 * time.Millisecond)  // 慢消费者
        fmt.Printf("消费: %d\n", v)
    }
}

func main() {
    ch := make(chan int, 10)  // 缓冲区大小 10
    go producer(ch)
    consumer(ch)
}

Python asyncio(Semaphore 限流)

import asyncio

async def bounded_pipeline(source, max_in_flight: int = 10):
    semaphore = asyncio.Semaphore(max_in_flight)
    
    async def process_item(item):
        async with semaphore:  # 限制并发数
            result = await expensive_operation(item)
            return result
    
    tasks = [process_item(item) async for item in source]
    return await asyncio.gather(*tasks)

JavaScript(可读流)

const { Readable, Transform, Writable } = require('stream');

// Node.js Stream 内置背压
const readable = new Readable({
    read(size) {
        // 当消费者消费完后,这里会被调用
        // 生产更多数据
    }
});

const writable = new Writable({
    write(chunk, encoding, callback) {
        // 如果写入速度跟不上
        // writable.write() 返回 false
        // readable 自动暂停
        slowDatabaseInsert(chunk, callback);
    }
});

readable.pipe(writable);
// Stream.pipe() 自动处理背压

Rust(异步 Stream)

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // 带缓冲的通道,支持背压
    let (tx, mut rx) = mpsc::channel(100);

    // 生产者
    tokio::spawn(async move {
        for i in 0..10000 {
            tx.send(i).await.unwrap();  // 缓冲区满时阻塞
        }
    });

    // 慢消费者
    while let Some(value) = rx.recv().await {
        tokio::time::sleep(Duration::from_millis(100)).await;
        println!("处理: {}", value);
    }
}

15.4 缓冲策略

策略行为内存占用数据安全适用场景
无缓冲生产者直接等待消费者最低不丢失精确处理
有界缓冲固定大小缓冲区可控不丢失大多数场景
无界缓冲无限增长危险不丢失短暂突发
丢弃新数据满时丢弃最新固定可能丢失实时指标
丢弃旧数据满时丢弃最旧固定可能丢失日志、事件流
采样按比例丢弃固定丢失部分监控、统计

丢弃策略实现

// 丢弃新数据(Drop Newest)
func dropNewestProducer(ch chan<- int, value int) bool {
    select {
    case ch <- value:
        return true  // 发送成功
    default:
        return false  // 缓冲区满,丢弃
    }
}

// 丢弃旧数据(Drop Oldest)
func dropOldestProducer(ch chan int, value int) {
    select {
    case ch <- value:
        return  // 发送成功
    default:
        select {
        case <-ch:  // 丢弃一个旧数据
        default:
        }
        ch <- value  // 再尝试发送
    }
}

15.5 动态背压

在实际系统中,背压阈值应该是动态调整的:

class AdaptiveRateLimiter:
    """自适应速率限制器"""
    
    def __init__(self, initial_rate: float = 100.0):
        self.rate = initial_rate
        self.min_rate = 10.0
        self.max_rate = 10000.0
        self.success_count = 0
        self.error_count = 0
        self.window_size = 100
    
    def on_success(self):
        self.success_count += 1
        self._adjust()
    
    def on_error(self):
        self.error_count += 1
        self._adjust()
    
    def _adjust(self):
        total = self.success_count + self.error_count
        if total >= self.window_size:
            error_rate = self.error_count / total
            if error_rate > 0.1:  # 错误率 > 10%,降低速率
                self.rate = max(self.min_rate, self.rate * 0.8)
            elif error_rate < 0.01:  # 错误率 < 1%,提高速率
                self.rate = min(self.max_rate, self.rate * 1.1)
            self.success_count = 0
            self.error_count = 0
    
    async def acquire(self):
        interval = 1.0 / self.rate
        await asyncio.sleep(interval)

15.6 业务场景:日志收集系统

场景

一个日志收集系统,日志产生速度可能瞬间飙升(如电商大促),但写入存储的速度是有限的。

import asyncio
from collections import deque

class LogCollector:
    def __init__(self, max_buffer: int = 10000, flush_interval: float = 1.0):
        self.buffer = deque(maxlen=max_buffer)  # 有界缓冲,丢弃旧数据
        self.max_buffer = max_buffer
        self.flush_interval = flush_interval
        self.lock = asyncio.Lock()
    
    async def ingest(self, log_entry: dict):
        """接收日志(可能非常快)"""
        async with self.lock:
            if len(self.buffer) >= self.max_buffer:
                dropped = self.buffer.popleft()  # 丢弃最旧的
                print(f"缓冲区满,丢弃旧日志: {dropped.get('id')}")
            self.buffer.append(log_entry)
    
    async def flush_loop(self):
        """定期刷写到存储"""
        while True:
            await asyncio.sleep(self.flush_interval)
            async with self.lock:
                if not self.buffer:
                    continue
                batch = list(self.buffer)
                self.buffer.clear()
            
            try:
                await self.write_to_storage(batch)
            except Exception as e:
                print(f"刷写失败: {e}")
                # 可以重新放回缓冲区或写入死信队列
    
    async def write_to_storage(self, batch: list[dict]):
        """批量写入存储(有速率限制)"""
        await asyncio.sleep(0.1)  # 模拟写入延迟
        print(f"写入 {len(batch)} 条日志")

15.7 Reactive Streams 规范

Reactive Streams 是一个跨语言的背压标准,定义了四个接口:

Publisher<T> ──────► Subscriber<T>
     │                    ▲
     │   onNext(T)        │
     │ ──────────────────► │
     │                    │
     │   request(n)       │
     │ ◄────────────────── │
     │                    │
     │   onError(e)       │
     │ ──────────────────► │
     │                    │
     │   onComplete()     │
     │ ──────────────────► │
接口方法作用
Publishersubscribe(Subscriber)订阅
SubscriberonNext(T)接收数据
Subscriberrequest(n)请求 n 个数据(背压)
Subscriptioncancel()取消订阅

15.8 本章小结

要点说明
背压定义下游向上游传递压力
推送式生产者主导,需要额外流控
拉取式消费者主导,天然背压
缓冲策略无缓冲、有界、无界、丢弃
动态调整根据错误率自适应调整速率
Reactive Streams跨语言背压标准

下一章预告:异步代码的测试比同步代码更复杂——如何处理超时、竞态和确定性?


扩展阅读