强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

异步与协程精讲 / 第11章:Erlang 进程 —— Actor 模型的先驱

第11章:Erlang 进程 —— Actor 模型的先驱

11.1 Erlang 的并发哲学

Erlang 由 Joe Armstrong 在 1986 年为爱立信的电信系统设计,其核心设计目标是:

  • 高并发:同时处理数百万个连接
  • 高可用:系统运行数年不停机
  • 容错:部分失败不影响整体
  • 软实时:保证响应时间

Erlang 的格言“Let it crash”(让它崩溃)——与其防御性编程,不如快速失败并由监督者重启。


11.2 轻量级进程

创建进程

%% Erlang
Pid = spawn(fun() ->
    receive
        {From, hello} ->
            From ! {self(), hi},
            io:format("收到 hello~n");
        _ ->
            io:format("收到未知消息~n")
    end
end).

%% 发送消息
Pid ! {self(), hello}.
# Elixir(Erlang 之上的现代语言)
pid = spawn(fn ->
  receive do
    {:hello, sender} ->
      send(sender, {:hi, self()})
      IO.puts("收到 hello")
    _ ->
      IO.puts("收到未知消息")
  end
end)

send(pid, {:hello, self()})

进程 vs OS 线程 vs Goroutine

特性Erlang 进程OS 线程Go goroutine
内存占用~300 字节初始~1MB~2KB 初始
创建时间~1μs~1ms~0.3μs
最大数量数百万数千数百万
调度VM 抢占式(Reduction-based)OS 抢占式Go 运行时协作式
GC每进程独立 GC全局 GC全局 GC
通信消息传递(无共享)共享内存Channel/共享内存

11.3 消息传递

消息发送与接收

%% 发送是异步的 — 立即返回
Pid ! Message.

%% 接收是选择性的 — Pattern Matching
receive
    Pattern1 -> Body1;
    Pattern2 -> Body2;
after
    Timeout -> TimeoutBody
end.

消息队列(Mailbox)

进程的 Mailbox:

┌──────────────────────────────┐
│         进程 B 的 Mailbox     │
│                              │
│  消息3: {update, X=10}       │
│  消息2: {data, [1,2,3]}      │
│  消息1: {hello, Pid_A}       │
│                              │
│  receive 从上到下匹配         │
└──────────────────────────────┘

注意:发送方不会阻塞(异步发送)
接收方可能阻塞(如果没有匹配的消息)

选择性接收

%% 只处理感兴趣的消息,忽略其他
receive
    {reply, Ref, Data} ->
        handle_reply(Ref, Data)
    %% 不匹配 {ping, _} 或其他消息
    %% 它们留在 mailbox 中
after 5000 ->
    timeout
end.

关键优势:选择性接收(Selective Receive)使得 Erlang 进程可以按优先级处理消息,而不像 Go 的 Channel 那样只能按顺序接收。


11.4 调度器

Reduction-based 调度

Erlang VM(BEAM)使用**归约(Reduction)**计数来实现公平调度:

  • 每个进程每次被调度执行一定数量的"归约"(约 4000 个函数调用)
  • 归约用完后,进程被暂停,让出 CPU 给其他进程
  • 不需要显式 yield
调度器:

  进程A: ████░░░░████░░░░████    ← 每 4000 reductions 切换
  进程B: ░░░░████░░░░████░░░░
  进程C: ░░░░░░░░████░░░░████

  确保公平性:CPU 密集型进程不会独占 CPU

调度器数量

%% 默认调度器数量 = CPU 核心数
erlang:system_info(schedulers).       %% 查看调度器数量
erlang:system_info(schedulers_online). %% 查看在线调度器数量

%% VM 会自动处理 CPU 亲和性、负载均衡

11.5 容错机制 — Let It Crash

监督树(Supervision Tree)

              ┌──────────┐
              │ 顶级监督者 │
              └────┬─────┘
         ┌────────┼────────┐
         │        │        │
    ┌────┴───┐┌───┴───┐┌───┴────┐
    │子监督者 ││工作进程││子监督者 │
    └───┬────┘└───────┘└───┬────┘
    ┌───┼───┐          ┌───┼───┐
    │   │   │          │   │   │
   W1  W2  W3        W4  W5  W6

重启策略

%% 三种重启策略
%% one_for_one: 只重启崩溃的那个子进程
%% one_for_all: 重启所有子进程
%% rest_for_one: 重启崩溃进程及其之后启动的进程

init(_) ->
    {ok, {{one_for_one, 5, 10}, [
        {worker1, {worker1, start_link, []},
         permanent, 5000, worker, [worker1]},
        {worker2, {worker2, start_link, []},
         permanent, 5000, worker, [worker2]}
    ]}}.
# Elixir 的监督者
defmodule MyApp.Supervisor do
  use Supervisor

  def start_link(opts) do
    Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @impl true
  def init(_opts) do
    children = [
      {MyApp.Worker1, []},
      {MyApp.Worker2, []},
      {MyApp.Worker3, []}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

重启策略对比

策略行为适用场景
one_for_one只重启崩溃的子进程子进程相互独立
one_for_all重启所有子进程子进程相互依赖
rest_for_one重启崩溃进程及后续进程子进程有启动顺序

11.6 OTP 行为模式(Behaviours)

GenServer — 通用服务器

defmodule Counter do
  use GenServer

  # 客户端 API
  def start_link(initial_value) do
    GenServer.start_link(__MODULE__, initial_value, name: __MODULE__)
  end

  def increment, do: GenServer.cast(__MODULE__, :increment)
  def get_value, do: GenServer.call(__MODULE__, :get_value)

  # 服务端回调
  @impl true
  def init(initial_value), do: {:ok, initial_value}

  @impl true
  def handle_call(:get_value, _from, state) do
    {:reply, state, state}
  end

  @impl true
  def handle_cast(:increment, state) do
    {:noreply, state + 1}
  end
end

# 使用
{:ok, _pid} = Counter.start_link(0)
Counter.increment()
Counter.increment()
Counter.get_value()  # => 2

11.7 分布式 Erlang

%% 启动两个节点
%% node1@host> erl -name node1@192.168.1.1 -setcookie mycookie
%% node2@host> erl -name node2@192.168.1.2 -setcookie mycookie

%% 在 node1 上
Pid = spawn('node2@192.168.1.2', fun() ->
    io:format("我在远程节点运行!~n")
end).

%% 消息传递跨节点透明
Pid ! {hello, self()}.

11.8 业务场景:实时聊天系统

defmodule Chat.Room do
  use GenServer

  def start_link(room_id) do
    GenServer.start_link(__MODULE__, room_id, name: via_tuple(room_id))
  end

  def join(room_id, user_pid), do: GenServer.cast(via_tuple(room_id), {:join, user_pid})
  def send_msg(room_id, user, msg), do: GenServer.cast(via_tuple(room_id), {:msg, user, msg})

  @impl true
  def init(room_id), do: {:ok, %{room_id: room_id, members: MapSet.new()}}

  @impl true
  def handle_cast({:join, user_pid}, state) do
    Process.monitor(user_pid)
    {:noreply, %{state | members: MapSet.put(state.members, user_pid)}}
  end

  @impl true
  def handle_cast({:msg, user, msg}, state) do
    Enum.each(state.members, fn pid ->
      send(pid, {:chat_msg, user, msg})
    end)
    {:noreply, state}
  end

  @impl true
  def handle_info({:DOWN, _, :process, pid, _}, state) do
    {:noreply, %{state | members: MapSet.delete(state.members, pid)}}
  end

  defp via_tuple(room_id), do: {:via, Registry, {Chat.Registry, room_id}}
end

11.9 本章小结

要点说明
轻量级进程~300 字节,数百万级并发
消息传递异步发送,选择性接收,无共享状态
Reduction 调度公平的抢占式调度,无需显式 yield
Let It Crash监督树实现自动恢复
OTPGenServer、Supervisor 等成熟模式
分布式跨节点消息传递透明

下一章预告:C++ 的协程之路——从 Boost.Fiber 到 C++20 Coroutines。


扩展阅读