Python 编程教程 / 13 - 并发编程
第 13 章:并发编程
掌握 Python 的线程、多进程和并发编程模型。
13.1 并发 vs 并行
| 概念 | 说明 | 适用场景 |
|---|---|---|
| 并发(Concurrency) | 交替执行多个任务 | I/O 密集型 |
| 并行(Parallelism) | 同时执行多个任务 | CPU 密集型 |
并发(单核):
线程 A: ████░░░░████░░░░
线程 B: ░░░░████░░░░████
并行(多核):
线程 A: ████████████
线程 B: ████████████
13.2 GIL(全局解释器锁)
CPython 的 GIL 限制同一时刻只有一个线程执行 Python 字节码。
| 影响 | 说明 |
|---|---|
| I/O 密集型 | 几乎不受影响,线程在等待 I/O 时释放 GIL |
| CPU 密集型 | 严重受限,多线程无法利用多核 |
| 解决方案 | 使用 multiprocessing 或 C 扩展 |
import sys
print(sys._is_gil_enabled()) # Python 3.13+: False 表示 free-threaded 模式
13.3 threading(多线程)
13.3.1 基本用法
import threading
import time
def worker(name: str, seconds: float):
print(f"线程 {name} 开始")
time.sleep(seconds)
print(f"线程 {name} 结束")
# 创建线程
t1 = threading.Thread(target=worker, args=("A", 2))
t2 = threading.Thread(target=worker, args=("B", 1))
# 启动
t1.start()
t2.start()
# 等待完成
t1.join()
t2.join()
print("所有线程完成")
13.3.2 线程安全
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100_000):
with lock: # 获取锁
counter += 1
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Counter: {counter}") # 1000000(正确)
13.3.3 线程同步原语
| 原语 | 用途 |
|---|---|
Lock | 互斥锁 |
RLock | 可重入锁 |
Semaphore | 信号量(限制并发数) |
Event | 事件通知 |
Condition | 条件变量 |
Barrier | 屏障(等待所有线程就绪) |
import threading
# 生产者-消费者模式
buffer = []
not_empty = threading.Condition()
def producer():
for i in range(5):
with not_empty:
buffer.append(i)
not_empty.notify()
def consumer():
for _ in range(5):
with not_empty:
while not buffer:
not_empty.wait()
item = buffer.pop(0)
print(f"消费: {item}")
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()
13.3.4 ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def fetch_url(url: str) -> str:
time.sleep(1) # 模拟网络请求
return f"Response from {url}"
urls = [f"https://api.example.com/page/{i}" for i in range(5)]
with ThreadPoolExecutor(max_workers=3) as executor:
# 方式一:map
results = list(executor.map(fetch_url, urls))
# 方式二:submit + as_completed(先完成先返回)
futures = {executor.submit(fetch_url, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
result = future.result()
print(f"{url}: {result}")
13.4 multiprocessing(多进程)
13.4.1 基本用法
import multiprocessing
import os
def worker(name: str):
print(f"进程 {name} (PID: {os.getpid()})")
if __name__ == "__main__":
p1 = multiprocessing.Process(target=worker, args=("A",))
p2 = multiprocessing.Process(target=worker, args=("B",))
p1.start(); p2.start()
p1.join(); p2.join()
13.4.2 进程间通信
import multiprocessing
def sender(conn):
conn.send({"msg": "hello"})
conn.close()
def receiver(conn):
data = conn.recv()
print(f"收到: {data}")
if __name__ == "__main__":
parent_conn, child_conn = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=sender, args=(child_conn,))
p2 = multiprocessing.Process(target=receiver, args=(parent_conn,))
p1.start(); p2.start()
p1.join(); p2.join()
13.4.3 共享内存
import multiprocessing
def increment(counter):
for _ in range(1000):
with counter.get_lock():
counter.value += 1
if __name__ == "__main__":
counter = multiprocessing.Value("i", 0) # "i" = int
processes = [multiprocessing.Process(target=increment, args=(counter,)) for _ in range(10)]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Counter: {counter.value}")
13.4.4 ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import math
def compute_heavy(n: int) -> float:
return sum(math.sin(i) * math.cos(i) for i in range(n))
if __name__ == "__main__":
numbers = [10**6] * 8
with ProcessPoolExecutor() as executor:
results = list(executor.map(compute_heavy, numbers))
print(f"结果总和: {sum(results):.2f}")
13.5 线程 vs 进程选型
| 特性 | 线程 | 进程 |
|---|---|---|
| 内存 | 共享内存 | 独立内存 |
| 创建开销 | 低 | 高 |
| GIL 影响 | 有(CPU 密集受限) | 无 |
| 通信方式 | 共享变量 | Pipe / Queue / 共享内存 |
| 适用场景 | I/O 密集型 | CPU 密集型 |
13.6 queue 模块
import queue
import threading
q = queue.Queue(maxsize=10)
def producer():
for i in range(20):
q.put(i)
print(f"生产: {i}")
def consumer():
while True:
item = q.get()
if item is None:
break
print(f"消费: {item}")
q.task_done()
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
q.join()
q.put(None) # 停止信号
t2.join()
13.7 注意事项
🔴 注意:
- GIL 使多线程不能并行执行 CPU 密集任务
- 多进程的启动方式在 Windows 上必须使用
if __name__ == "__main__"守卫 - 线程共享内存时必须使用锁,否则会出现竞态条件
- 进程间通信比线程间通信开销大
💡 提示:
- I/O 密集型任务使用
ThreadPoolExecutor - CPU 密集型任务使用
ProcessPoolExecutor - 使用
as_completed()获取先完成的结果 - Python 3.13+ 的 free-threaded 模式移除了 GIL 限制
📌 业务场景:
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import time
@dataclass
class HealthCheck:
service: str
status: bool
latency: float
def check_service(name: str) -> HealthCheck:
start = time.monotonic()
time.sleep(0.5) # 模拟请求
return HealthCheck(name, True, time.monotonic() - start)
def check_all_services(services: list[str]) -> list[HealthCheck]:
results = []
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(check_service, s): s for s in services}
for future in as_completed(futures):
results.append(future.result())
return results
services = ["user-api", "order-api", "payment-api", "notification-api"]
for check in check_all_services(services):
print(f"{check.service}: {'✅' if check.status else '❌'} ({check.latency:.3f}s)")
13.8 扩展阅读
- threading 模块
- multiprocessing 模块
- concurrent.futures
- PEP 703 - Free-threaded CPython
- 《Python 并发编程实战》by Jason Kowalczewski