强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

异步与协程精讲 / 第16章:异步测试 —— 驯服不确定性

第16章:异步测试 —— 驯服不确定性

16.1 异步测试的挑战

异步代码比同步代码更难测试,主要因为:

挑战描述影响
不确定性执行顺序不可预测测试结果不稳定(flaky tests)
超时测试可能挂起不返回CI 卡住
竞态并发操作的交错难以复现 bug
副作用异步操作的副作用难以验证验证困难
时序依赖依赖时间流逝测试慢且不可靠

16.2 测试策略概览

策略目的工具
超时控制防止测试挂起各语言的超时机制
Mock/Stub隔离外部依赖mock 库
虚拟时间加速时间相关测试时间模拟库
竞态检测发现数据竞争race detector
确定性测试保证结果一致可控调度

16.3 超时控制

Go

func TestAsyncOperation(t *testing.T) {
    done := make(chan bool)

    go func() {
        result := asyncOperation()
        // 验证结果
        if result != expected {
            t.Errorf("期望 %v, 得到 %v", expected, result)
        }
        done <- true
    }()

    select {
    case <-done:
        // 测试通过
    case <-time.After(5 * time.Second):
        t.Fatal("测试超时")
    }
}

// 使用 context 控制超时
func TestWithContext(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    result, err := asyncOperationWithContext(ctx)
    if err != nil {
        t.Fatalf("错误: %v", err)
    }
    if result != expected {
        t.Errorf("期望 %v, 得到 %v", expected, result)
    }
}

Python

import pytest
import asyncio

# 方式一:pytest-asyncio
@pytest.mark.asyncio
async def test_fetch_data():
    # 自动管理事件循环
    result = await fetch_data("https://api.example.com")
    assert result["status"] == "ok"

# 方式二:设置超时
@pytest.mark.asyncio
@pytest.mark.timeout(5)  # 5 秒超时
async def test_with_timeout():
    result = await slow_operation()
    assert result is not None

# 方式三:asyncio.wait_for
@pytest.mark.asyncio
async def test_with_wait_for():
    result = await asyncio.wait_for(
        slow_operation(),
        timeout=5.0
    )
    assert result is not None

JavaScript (Jest)

// Jest 超时控制
test('fetch data', async () => {
    const data = await fetchData('/api/test');
    expect(data.status).toBe('ok');
}, 10000); // 10 秒超时

// 使用 AbortController
test('fetch with abort', async () => {
    const controller = new AbortController();
    setTimeout(() => controller.abort(), 5000);

    try {
        await fetch('/api/slow', { signal: controller.signal });
    } catch (err) {
        expect(err.name).toBe('AbortError');
    }
});

16.4 Mock 与 Stub

异步 Mock 的挑战

# 同步 Mock 很简单
def test_sync():
    with mock.patch('module.api_call', return_value={'ok': True}):
        result = my_function()
        assert result['ok']

# 异步 Mock 需要特殊处理
@pytest.mark.asyncio
async def test_async():
    with mock.patch('module.async_api_call', 
                    new_callable=mock.AsyncMock,
                    return_value={'ok': True}):
        result = await my_async_function()
        assert result['ok']

Python async Mock

from unittest.mock import AsyncMock, patch
import pytest

# 创建异步 Mock
mock_db = AsyncMock()
mock_db.query.return_value = [{'id': 1, 'name': 'test'}]

@pytest.mark.asyncio
async def test_get_user():
    user = await get_user(mock_db, user_id=1)
    assert user['name'] == 'test'
    mock_db.query.assert_awaited_once_with('SELECT * FROM users WHERE id = 1')

# 模拟异常
mock_db.query.side_effect = ConnectionError("数据库连接失败")

@pytest.mark.asyncio
async def test_get_user_db_error():
    with pytest.raises(ConnectionError):
        await get_user(mock_db, user_id=1)

Go 接口 Mock

// 定义接口
type UserRepo interface {
    GetUser(ctx context.Context, id int) (*User, error)
}

// 实现 Mock
type MockUserRepo struct {
    GetUserFunc func(ctx context.Context, id int) (*User, error)
}

func (m *MockUserRepo) GetUser(ctx context.Context, id int) (*User, error) {
    return m.GetUserFunc(ctx, id)
}

// 测试
func TestGetUser(t *testing.T) {
    mockRepo := &MockUserRepo{
        GetUserFunc: func(ctx context.Context, id int) (*User, error) {
            return &User{ID: id, Name: "测试用户"}, nil
        },
    }

    svc := NewUserService(mockRepo)
    user, err := svc.GetUser(context.Background(), 1)
    
    assert.NoError(t, err)
    assert.Equal(t, "测试用户", user.Name)
}

JavaScript Mock (Jest)

// Mock 异步函数
jest.mock('./api');

test('fetch user', async () => {
    api.getUser.mockResolvedValue({ id: 1, name: 'test' });
    
    const user = await service.getUser(1);
    expect(user.name).toBe('test');
    expect(api.getUser).toHaveBeenCalledWith(1);
});

test('fetch user error', async () => {
    api.getUser.mockRejectedValue(new Error('网络错误'));
    
    await expect(service.getUser(1)).rejects.toThrow('网络错误');
});

16.5 竞态检测

Go Race Detector

# 使用 -race 标志运行测试
go test -race ./...

# 示例:检测竞态条件
func TestRaceCondition(t *testing.T) {
    counter := 0
    var wg sync.WaitGroup

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++ // 这里有竞态!
        }()
    }
    wg.Wait()
    // -race 会检测到竞态并报错
}

Rust 的安全保证

// Rust 编译器在编译期阻止大多数数据竞争
use std::sync::{Arc, Mutex};

#[tokio::test]
async fn test_concurrent() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..100 {
        let counter = Arc::clone(&counter);
        handles.push(tokio::spawn(async move {
            let mut num = counter.lock().unwrap();
            *num += 1;
        }));
    }

    for handle in handles {
        handle.await.unwrap();
    }

    assert_eq!(*counter.lock().unwrap(), 100);
}

16.6 确定性测试

虚拟时间

# Python — 使用 freezegun 模拟时间
from freezegun import freeze_time
import asyncio

@freeze_time("2024-01-01 12:00:00")
@pytest.mark.asyncio
async def test_cache_expiry():
    cache = TTLCache(ttl=60)
    await cache.set("key", "value")
    
    # 时间前进 30 秒 — 仍然有效
    with freeze_time("2024-01-01 12:00:30"):
        assert await cache.get("key") == "value"
    
    # 时间前进 61 秒 — 已过期
    with freeze_time("2024-01-01 12:01:01"):
        assert await cache.get("key") is None

Go — 时间模拟

// 使用 clock 接口模拟时间
type Clock interface {
    Now() time.Time
    After(d time.Duration) <-chan time.Time
}

type MockClock struct {
    now time.Time
}

func (c *MockClock) Now() time.Time {
    return c.now
}

func (c *MockClock) After(d time.Duration) <-chan time.Time {
    ch := make(chan time.Time, 1)
    ch <- c.now.Add(d) // 立即返回
    return ch
}

func (c *MockClock) Advance(d time.Duration) {
    c.now = c.now.Add(d)
}

// 使用模拟时钟测试
func TestCacheExpiry(t *testing.T) {
    clock := &MockClock{now: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)}
    cache := NewTTLCache(clock, 60*time.Second)
    
    cache.Set("key", "value")
    assert.Equal(t, "value", cache.Get("key"))
    
    clock.Advance(61 * time.Second)
    assert.Nil(t, cache.Get("key"))
}

16.7 集成测试

容器化依赖

import pytest
import testcontainers.postgres

@pytest.fixture(scope="session")
def postgres():
    with testcontainers.postgres.PostgresContainer("postgres:15") as pg:
        yield pg

@pytest.mark.asyncio
async def test_user_repo(postgres):
    repo = UserRepo(postgres.get_connection_url())
    await repo.create_table()
    
    user = await repo.create_user("test", "test@example.com")
    assert user.name == "test"
    
    fetched = await repo.get_user(user.id)
    assert fetched.email == "test@example.com"

16.8 测试最佳实践

实践说明
总是设置超时防止测试挂起
隔离外部依赖使用 Mock 隔离网络、数据库
使用 Race DetectorGo -race,Rust 编译器检查
模拟时间时间相关测试用虚拟时间
避免 sleep用信号量/通道代替 time.sleep()
测试边界条件超时、取消、网络错误
幂等性测试验证重试安全性
并发压力测试高并发下验证正确性

16.9 业务场景:测试支付服务

@pytest.fixture
def payment_service():
    mock_gateway = AsyncMock()
    mock_gateway.charge.return_value = {"status": "success", "transaction_id": "tx_123"}
    return PaymentService(gateway=mock_gateway)

@pytest.mark.asyncio
async def test_payment_success(payment_service):
    mock_gateway = payment_service.gateway
    
    result = await payment_service.charge(user_id=1, amount=100)
    
    assert result.status == "success"
    mock_gateway.charge.assert_awaited_once_with(user_id=1, amount=100)

@pytest.mark.asyncio
async def test_payment_timeout(payment_service):
    payment_service.gateway.charge.side_effect = asyncio.TimeoutError()
    
    with pytest.raises(PaymentTimeoutError):
        await payment_service.charge(user_id=1, amount=100)

@pytest.mark.asyncio
async def test_payment_retry(payment_service):
    # 前两次失败,第三次成功
    payment_service.gateway.charge.side_effect = [
        ConnectionError("网络错误"),
        ConnectionError("网络错误"),
        {"status": "success", "transaction_id": "tx_123"},
    ]
    
    result = await payment_service.charge_with_retry(user_id=1, amount=100, retries=3)
    assert result.status == "success"
    assert payment_service.gateway.charge.await_count == 3

16.10 本章小结

要点说明
超时控制每个异步测试都要有超时
Mock异步函数需要 AsyncMock
竞态检测Go -race,Rust 编译器
确定性测试虚拟时间、可控调度
集成测试容器化依赖
最佳实践隔离依赖、测试边界条件、幂等性

下一章预告:异步服务如何容器化部署?资源限制和性能调优有哪些技巧?


扩展阅读