强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Erlang/OTP 完全指南 / 25 - 实战项目

第 25 章:实战项目 — 消息队列、聊天服务器、实时系统

本章通过三个完整的实战项目,综合运用前面章节所学的 Erlang/OTP 知识。


25.1 项目一:轻量级消息队列

25.1.1 需求

  • 支持创建多个队列
  • 生产者发送消息,消费者接收消息
  • 支持持久化(ETS + 文件)
  • 支持多消费者竞争消费

25.1.2 核心模块

%% src/mini_mq.erl
-module(mini_mq).
-behaviour(gen_server).

%% API
-export([start_link/0, create_queue/1, publish/2, subscribe/2,
         ack/2, list_queues/0]).

%% Callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).

-record(state, {
    queues = #{} :: #{binary() => queue()},
    consumers = #{} :: #{binary() => [pid()]}
}).

-record(queue, {
    name :: binary(),
    messages = queue:new() :: queue:queue(term()),
    consumers = [] :: [pid()],
    next_id = 1 :: pos_integer()
}).

%% ===== API =====

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

%% 创建队列
-spec create_queue(binary()) -> ok | {error, already_exists}.
create_queue(QueueName) ->
    gen_server:call(?MODULE, {create_queue, QueueName}).

%% 发布消息
-spec publish(binary(), term()) -> ok | {error, queue_not_found}.
publish(QueueName, Message) ->
    gen_server:call(?MODULE, {publish, QueueName, Message}).

%% 订阅队列
-spec subscribe(binary(), pid()) -> ok | {error, queue_not_found}.
subscribe(QueueName, ConsumerPid) ->
    gen_server:call(?MODULE, {subscribe, QueueName, ConsumerPid}).

%% 确认消息
-spec ack(binary(), pos_integer()) -> ok.
ack(QueueName, MsgId) ->
    gen_server:cast(?MODULE, {ack, QueueName, MsgId}).

%% 列出所有队列
-spec list_queues() -> [binary()].
list_queues() ->
    gen_server:call(?MODULE, list_queues).

%% ===== Callbacks =====

init([]) ->
    %% 创建 ETS 表存储消息统计
    ets:new(mq_stats, [set, public, named_table]),
    {ok, #state{}}.

handle_call({create_queue, Name}, _From, #state{queues = Queues} = State) ->
    case maps:is_key(Name, Queues) of
        true ->
            {reply, {error, already_exists}, State};
        false ->
            Q = #queue{name = Name},
            NewQueues = Queues#{Name => Q},
            {reply, ok, State#state{queues = NewQueues}}
    end;

handle_call({publish, QueueName, Message}, _From, #state{queues = Queues} = State) ->
    case maps:find(QueueName, Queues) of
        {ok, #queue{messages = Msgs, next_id = Id, consumers = Consumers}} ->
            %% 添加消息到队列
            NewMsgs = queue:in({Id, Message, false}, Msgs),
            NewQ = (maps:get(QueueName, Queues))#queue{
                messages = NewMsgs,
                next_id = Id + 1
            },
            NewQueues = Queues#{QueueName => NewQ},
            
            %% 通知消费者
            case Consumers of
                [Consumer | _] ->
                    Consumer ! {new_message, QueueName, Id, Message};
                [] ->
                    ok
            end,
            
            %% 更新统计
            ets:update_counter(mq_stats, {QueueName, published}, 1, {{QueueName, published}, 0}),
            
            {reply, ok, State#state{queues = NewQueues}};
        error ->
            {reply, {error, queue_not_found}, State}
    end;

handle_call({subscribe, QueueName, ConsumerPid}, _From, #state{queues = Queues} = State) ->
    case maps:find(QueueName, Queues) of
        {ok, #queue{consumers = Consumers} = Q} ->
            monitor(process, ConsumerPid),
            NewQ = Q#queue{consumers = [ConsumerPid | Consumers]},
            NewQueues = Queues#{QueueName => NewQ},
            {reply, ok, State#state{queues = NewQueues}};
        error ->
            {reply, {error, queue_not_found}, State}
    end;

handle_call(list_queues, _From, #state{queues = Queues} = State) ->
    {reply, maps:keys(Queues), State}.

handle_cast({ack, QueueName, MsgId}, #state{queues = Queues} = State) ->
    case maps:find(QueueName, Queues) of
        {ok, Q} ->
            %% 标记消息为已确认(简化实现)
            ets:update_counter(mq_stats, {QueueName, acked}, 1, {{QueueName, acked}, 0}),
            {noreply, State};
        error ->
            {noreply, State}
    end.

handle_info({'DOWN', _, process, Pid, _}, #state{queues = Queues} = State) ->
    %% 从所有队列中移除断开的消费者
    NewQueues = maps:map(fun(_Name, #queue{consumers = C} = Q) ->
        Q#queue{consumers = lists:delete(Pid, C)}
    end, Queues),
    {noreply, State#state{queues = NewQueues}}.

terminate(_Reason, _State) ->
    ok.

25.1.3 消费者进程

%% src/mq_consumer.erl
-module(mq_consumer).
-export([start_link/2, loop/1]).

start_link(QueueName, Handler) ->
    Pid = spawn_link(?MODULE, loop, [#{queue => QueueName, handler => Handler}]),
    mini_mq:subscribe(QueueName, Pid),
    {ok, Pid}.

loop(#{queue := QueueName, handler := Handler} = State) ->
    receive
        {new_message, QueueName, MsgId, Message} ->
            %% 调用处理函数
            Handler(Message),
            %% 确认消息
            mini_mq:ack(QueueName, MsgId),
            loop(State);
        stop ->
            ok
    end.

25.1.4 使用示例

%% 启动
mini_mq:start_link().

%% 创建队列
mini_mq:create_queue(<<"orders">>).

%% 订阅并消费
Handler = fun(Msg) -> io:format("Processing: ~p~n", [Msg]) end,
{ok, Consumer} = mq_consumer:start_link(<<"orders">>, Handler).

%% 发送消息
mini_mq:publish(<<"orders">>, #{order_id => 1, amount => 99.99}).

25.2 项目二:聊天服务器

25.2.1 架构

客户端 ──WebSocket──→ Cowboy ──→ Chat Room GenServer
                                      │
                              ┌───────┼───────┐
                              ↓       ↓       ↓
                           User1   User2   User3

25.2.2 聊天室 GenServer

%% src/chat_room.erl
-module(chat_room).
-behaviour(gen_server).

-export([start_link/1, join/2, leave/2, send_message/3,
         get_users/1, get_history/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).

-record(state, {
    name :: binary(),
    users = #{} :: #{pid() => #{name => binary(), joined_at => integer()}},
    history = [] :: [map()],
    max_history = 100 :: pos_integer()
}).

start_link(Name) ->
    gen_server:start_link({via, pg, {chat_rooms, Name}}, ?MODULE, Name, []).

join(RoomPid, UserName) ->
    gen_server:call(RoomPid, {join, self(), UserName}).

leave(RoomPid, UserName) ->
    gen_server:cast(RoomPid, {leave, self(), UserName}).

send_message(RoomPid, UserName, Message) ->
    gen_server:cast(RoomPid, {message, UserName, Message}).

get_users(RoomPid) ->
    gen_server:call(RoomPid, get_users).

get_history(RoomPid) ->
    gen_server:call(RoomPid, get_history).

init(Name) ->
    {ok, #state{name = Name}}.

handle_call({join, Pid, UserName}, _From, #state{users = Users} = State) ->
    monitor(process, Pid),
    NewUsers = Users#{Pid => #{name => UserName, joined_at => erlang:system_time(second)}),
    broadcast_to_users(State#state.users, {user_joined, UserName}),
    {reply, ok, State#state{users = NewUsers}};

handle_call(get_users, _From, #state{users = Users} = State) ->
    UserNames = [maps:get(name, U) || {_, U} <- maps:to_list(Users)],
    {reply, UserNames, State};

handle_call(get_history, _From, #state{history = History} = State) ->
    {reply, lists:reverse(History), State}.

handle_cast({leave, Pid, UserName}, #state{users = Users} = State) ->
    NewUsers = maps:remove(Pid, Users),
    broadcast_to_users(NewUsers, {user_left, UserName}),
    {noreply, State#state{users = NewUsers}};

handle_cast({message, UserName, Message}, #state{users = Users, history = History, max_history = Max} = State) ->
    Msg = #{
        from => UserName,
        message => Message,
        timestamp => erlang:system_time(second)
    },
    %% 广播给所有人
    broadcast_to_users(Users, {chat, UserName, Message}),
    %% 保存历史
    NewHistory = lists:sublist([Msg | History], Max),
    {noreply, State#state{history = NewHistory}}.

handle_info({'DOWN', _, process, Pid, _}, #state{users = Users} = State) ->
    case maps:find(Pid, Users) of
        {ok, #{name := UserName}} ->
            NewUsers = maps:remove(Pid, Users),
            broadcast_to_users(NewUsers, {user_left, UserName}),
            {noreply, State#state{users = NewUsers}};
        error ->
            {noreply, State}
    end.

%% 内部函数
broadcast_to_users(Users, Msg) ->
    maps:foreach(fun(Pid, _) -> Pid ! Msg end, Users).

25.2.3 WebSocket Handler

%% src/chat_ws_handler.erl
-module(chat_ws_handler).
-export([init/2, websocket_init/1, websocket_handle/2,
         websocket_info/2, terminate/3]).

init(Req, State) ->
    %% 从查询参数获取用户名
    UserName = cowboy_req:qs_val(<<"name">>, Req, <<"anonymous">>),
    RoomName = cowboy_req:qs_val(<<"room">>, Req, <<"general">>),
    
    {cowboy_websocket, Req, State#{
        user_name => UserName,
        room_name => RoomName
    }}.

websocket_init(#{user_name := UserName, room_name := RoomName} = State) ->
    %% 查找或创建房间
    RoomPid = case pg:get_members({chat_rooms, RoomName}) of
        [Pid | _] -> Pid;
        [] ->
            {ok, Pid} = chat_room_sup:start_room(RoomName),
            Pid
    end,
    chat_room:join(RoomPid, UserName),
    {ok, State#{room_pid => RoomPid}}.

websocket_handle({text, Msg}, #{user_name := UserName, room_pid := RoomPid} = State) ->
    chat_room:send_message(RoomPid, UserName, Msg),
    {[], State};

websocket_handle(_Frame, State) ->
    {[], State}.

websocket_info({chat, UserName, Message}, State) ->
    Response = jsx:encode(#{
        type => <<"message">>,
        from => UserName,
        message => Message,
        timestamp => erlang:system_time(second)
    }),
    {[{text, Response}], State};

websocket_info({user_joined, UserName}, State) ->
    Response = jsx:encode(#{
        type => <<"system">>,
        event => <<"user_joined">>,
        user => UserName
    }),
    {[{text, Response}], State};

websocket_info({user_left, UserName}, State) ->
    Response = jsx:encode(#{
        type => <<"system">>,
        event => <<"user_left">>,
        user => UserName
    }),
    {[{text, Response}], State};

websocket_info(_Info, State) ->
    {[], State}.

terminate(_Reason, _Req, #{user_name := UserName, room_pid := RoomPid}) ->
    chat_room:leave(RoomPid, UserName),
    ok;
terminate(_, _, _) ->
    ok.

25.3 项目三:实时监控系统

25.3.1 需求

  • 监控多个服务器的 CPU、内存、网络指标
  • 指标存储在 ETS 中
  • 支持阈值告警
  • WebSocket 推送实时数据

25.3.2 指标收集器

%% src/metrics_collector.erl
-module(metrics_collector).
-behaviour(gen_server).

-export([start_link/0, report/2, get_metrics/1, subscribe/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

%% 上报指标
-spec report(atom(), map()) -> ok.
report(Host, Metrics) ->
    gen_server:cast(?MODULE, {report, Host, Metrics}).

%% 获取指标
-spec get_metrics(atom()) -> [map()].
get_metrics(Host) ->
    case ets:lookup(metrics_table, Host) of
        [{Host, Data}] -> Data;
        [] -> []
    end.

%% 订阅告警
subscribe() ->
    gen_server:cast(?MODULE, {subscribe, self()}).

init([]) ->
    ets:new(metrics_table, [set, public, named_table]),
    ets:new(subscribers_table, [set, public, named_table]),
    
    %% 启动定时检查
    timer:send_interval(5000, check_alerts),
    
    {ok, #{
        subscribers => sets:new(),
        thresholds => #{
            cpu_usage => 90,
            memory_usage => 85,
            disk_usage => 95
        }
    }}.

handle_cast({report, Host, Metrics}, State) ->
    %% 存储到 ETS
    Now = erlang:system_time(second),
    Entry = Metrics#{timestamp => Now},
    
    case ets:lookup(metrics_table, Host) of
        [{Host, History}] ->
            NewHistory = lists:sublist([Entry | History], 100),
            ets:insert(metrics_table, {Host, NewHistory});
        [] ->
            ets:insert(metrics_table, {Host, [Entry]})
    end,
    
    %% 检查告警
    check_thresholds(Host, Metrics, State),
    
    {noreply, State};

handle_cast({subscribe, Pid}, #{subscribers := Subs} = State) ->
    monitor(process, Pid),
    NewSubs = sets:add_element(Pid, Subs),
    {noreply, State#{subscribers => NewSubs}}.

handle_info(check_alerts, State) ->
    %% 定期检查所有主机的指标
    AllMetrics = ets:tab2list(metrics_table),
    lists:foreach(fun({Host, [Latest | _]}) ->
        check_thresholds(Host, Latest, State)
    end, AllMetrics),
    {noreply, State};

handle_info({'DOWN', _, process, Pid, _}, #{subscribers := Subs} = State) ->
    {noreply, State#{subscribers => sets:del_element(Pid, Subs)}}.

%% 内部函数
check_thresholds(Host, Metrics, #{thresholds := Thresholds, subscribers := Subs}) ->
    maps:foreach(fun(Key, Limit) ->
        case maps:get(Key, Metrics, 0) of
            Value when Value > Limit ->
                Alert = #{
                    host => Host,
                    metric => Key,
                    value => Value,
                    threshold => Limit,
                    timestamp => erlang:system_time(second)
                },
                broadcast_alert(Subs, Alert);
            _ ->
                ok
        end
    end, Thresholds).

broadcast_alert(Subs, Alert) ->
    sets:foreach(fun(Pid) ->
        Pid ! {alert, Alert}
    end, Subs).

25.3.3 告警处理器

%% src/alert_handler.erl
-module(alert_handler).
-export([start_link/0, loop/0]).

start_link() ->
    Pid = spawn_link(fun loop/0),
    metrics_collector:subscribe(),
    {ok, Pid}.

loop() ->
    receive
        {alert, #{host := Host, metric := Metric, value := Value, threshold := Threshold}} ->
            logger:warning("ALERT: ~p on ~p = ~p (threshold: ~p)",
                          [Metric, Host, Value, Threshold]),
            %% 可以添加:发送邮件、Slack 通知等
            loop();
        stop ->
            ok
    end.

25.4 项目总结

25.4.1 技术栈回顾

项目核心技术
消息队列GenServer, ETS, Monitor, 消息传递
聊天服务器Cowboy WebSocket, pg 模块, Supervisor
监控系统ETS, Timer, 广播模式

25.4.2 OTP 设计模式总结

模式用途
GenServer有状态服务
Supervisor进程监控和重启
Application应用生命周期
ETS高速共享存储
pg进程组管理
Monitor/Link进程监控

25.5 进一步学习

25.5.1 推荐项目

项目难度学习目标
HTTP Proxy中等TCP Socket, Binary 处理
分布式 KV 存储分布式一致性, Mnesia
IoT 平台百万连接, MQTT
游戏服务器实时状态同步
Job Scheduler中等定时任务, 优先级队列

25.5.2 开源项目参考

项目说明
RabbitMQ消息队列
EMQXMQTT Broker
CouchDB文档数据库
PhoenixElixir Web 框架
EjabberdXMPP 服务器

25.6 恭喜!

你已经完成了 Erlang/OTP 完全指南的全部 25 章!

回顾你的学习旅程:

✅ 第 1-6 章:语言基础(变量、模式匹配、函数)
✅ 第 7-8 章:数据结构(列表、元组、Map)
✅ 第 9-12 章:并发与 OTP(进程、GenServer、Supervisor)
✅ 第 13-15 章:存储与网络(ETS、Mnesia、Socket)
✅ 第 16-21 章:工程化(错误处理、测试、部署、性能)
✅ 第 22-25 章:进阶与实战(NIF、Web、最佳实践、实战项目)

“The best way to learn Erlang is to write Erlang.” — Joe Armstrong


25.7 扩展阅读


上一章:24 - 最佳实践 返回目录:Erlang/OTP 完全指南