Erlang/OTP 完全指南 / 25 - 实战项目
第 25 章:实战项目 — 消息队列、聊天服务器、实时系统
本章通过三个完整的实战项目,综合运用前面章节所学的 Erlang/OTP 知识。
25.1 项目一:轻量级消息队列
25.1.1 需求
- 支持创建多个队列
- 生产者发送消息,消费者接收消息
- 支持持久化(ETS + 文件)
- 支持多消费者竞争消费
25.1.2 核心模块
%% src/mini_mq.erl
-module(mini_mq).
-behaviour(gen_server).
%% API
-export([start_link/0, create_queue/1, publish/2, subscribe/2,
ack/2, list_queues/0]).
%% Callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-record(state, {
queues = #{} :: #{binary() => queue()},
consumers = #{} :: #{binary() => [pid()]}
}).
-record(queue, {
name :: binary(),
messages = queue:new() :: queue:queue(term()),
consumers = [] :: [pid()],
next_id = 1 :: pos_integer()
}).
%% ===== API =====
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%% 创建队列
-spec create_queue(binary()) -> ok | {error, already_exists}.
create_queue(QueueName) ->
gen_server:call(?MODULE, {create_queue, QueueName}).
%% 发布消息
-spec publish(binary(), term()) -> ok | {error, queue_not_found}.
publish(QueueName, Message) ->
gen_server:call(?MODULE, {publish, QueueName, Message}).
%% 订阅队列
-spec subscribe(binary(), pid()) -> ok | {error, queue_not_found}.
subscribe(QueueName, ConsumerPid) ->
gen_server:call(?MODULE, {subscribe, QueueName, ConsumerPid}).
%% 确认消息
-spec ack(binary(), pos_integer()) -> ok.
ack(QueueName, MsgId) ->
gen_server:cast(?MODULE, {ack, QueueName, MsgId}).
%% 列出所有队列
-spec list_queues() -> [binary()].
list_queues() ->
gen_server:call(?MODULE, list_queues).
%% ===== Callbacks =====
init([]) ->
%% 创建 ETS 表存储消息统计
ets:new(mq_stats, [set, public, named_table]),
{ok, #state{}}.
handle_call({create_queue, Name}, _From, #state{queues = Queues} = State) ->
case maps:is_key(Name, Queues) of
true ->
{reply, {error, already_exists}, State};
false ->
Q = #queue{name = Name},
NewQueues = Queues#{Name => Q},
{reply, ok, State#state{queues = NewQueues}}
end;
handle_call({publish, QueueName, Message}, _From, #state{queues = Queues} = State) ->
case maps:find(QueueName, Queues) of
{ok, #queue{messages = Msgs, next_id = Id, consumers = Consumers}} ->
%% 添加消息到队列
NewMsgs = queue:in({Id, Message, false}, Msgs),
NewQ = (maps:get(QueueName, Queues))#queue{
messages = NewMsgs,
next_id = Id + 1
},
NewQueues = Queues#{QueueName => NewQ},
%% 通知消费者
case Consumers of
[Consumer | _] ->
Consumer ! {new_message, QueueName, Id, Message};
[] ->
ok
end,
%% 更新统计
ets:update_counter(mq_stats, {QueueName, published}, 1, {{QueueName, published}, 0}),
{reply, ok, State#state{queues = NewQueues}};
error ->
{reply, {error, queue_not_found}, State}
end;
handle_call({subscribe, QueueName, ConsumerPid}, _From, #state{queues = Queues} = State) ->
case maps:find(QueueName, Queues) of
{ok, #queue{consumers = Consumers} = Q} ->
monitor(process, ConsumerPid),
NewQ = Q#queue{consumers = [ConsumerPid | Consumers]},
NewQueues = Queues#{QueueName => NewQ},
{reply, ok, State#state{queues = NewQueues}};
error ->
{reply, {error, queue_not_found}, State}
end;
handle_call(list_queues, _From, #state{queues = Queues} = State) ->
{reply, maps:keys(Queues), State}.
handle_cast({ack, QueueName, MsgId}, #state{queues = Queues} = State) ->
case maps:find(QueueName, Queues) of
{ok, Q} ->
%% 标记消息为已确认(简化实现)
ets:update_counter(mq_stats, {QueueName, acked}, 1, {{QueueName, acked}, 0}),
{noreply, State};
error ->
{noreply, State}
end.
handle_info({'DOWN', _, process, Pid, _}, #state{queues = Queues} = State) ->
%% 从所有队列中移除断开的消费者
NewQueues = maps:map(fun(_Name, #queue{consumers = C} = Q) ->
Q#queue{consumers = lists:delete(Pid, C)}
end, Queues),
{noreply, State#state{queues = NewQueues}}.
terminate(_Reason, _State) ->
ok.
25.1.3 消费者进程
%% src/mq_consumer.erl
-module(mq_consumer).
-export([start_link/2, loop/1]).
start_link(QueueName, Handler) ->
Pid = spawn_link(?MODULE, loop, [#{queue => QueueName, handler => Handler}]),
mini_mq:subscribe(QueueName, Pid),
{ok, Pid}.
loop(#{queue := QueueName, handler := Handler} = State) ->
receive
{new_message, QueueName, MsgId, Message} ->
%% 调用处理函数
Handler(Message),
%% 确认消息
mini_mq:ack(QueueName, MsgId),
loop(State);
stop ->
ok
end.
25.1.4 使用示例
%% 启动
mini_mq:start_link().
%% 创建队列
mini_mq:create_queue(<<"orders">>).
%% 订阅并消费
Handler = fun(Msg) -> io:format("Processing: ~p~n", [Msg]) end,
{ok, Consumer} = mq_consumer:start_link(<<"orders">>, Handler).
%% 发送消息
mini_mq:publish(<<"orders">>, #{order_id => 1, amount => 99.99}).
25.2 项目二:聊天服务器
25.2.1 架构
客户端 ──WebSocket──→ Cowboy ──→ Chat Room GenServer
│
┌───────┼───────┐
↓ ↓ ↓
User1 User2 User3
25.2.2 聊天室 GenServer
%% src/chat_room.erl
-module(chat_room).
-behaviour(gen_server).
-export([start_link/1, join/2, leave/2, send_message/3,
get_users/1, get_history/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-record(state, {
name :: binary(),
users = #{} :: #{pid() => #{name => binary(), joined_at => integer()}},
history = [] :: [map()],
max_history = 100 :: pos_integer()
}).
start_link(Name) ->
gen_server:start_link({via, pg, {chat_rooms, Name}}, ?MODULE, Name, []).
join(RoomPid, UserName) ->
gen_server:call(RoomPid, {join, self(), UserName}).
leave(RoomPid, UserName) ->
gen_server:cast(RoomPid, {leave, self(), UserName}).
send_message(RoomPid, UserName, Message) ->
gen_server:cast(RoomPid, {message, UserName, Message}).
get_users(RoomPid) ->
gen_server:call(RoomPid, get_users).
get_history(RoomPid) ->
gen_server:call(RoomPid, get_history).
init(Name) ->
{ok, #state{name = Name}}.
handle_call({join, Pid, UserName}, _From, #state{users = Users} = State) ->
monitor(process, Pid),
NewUsers = Users#{Pid => #{name => UserName, joined_at => erlang:system_time(second)}),
broadcast_to_users(State#state.users, {user_joined, UserName}),
{reply, ok, State#state{users = NewUsers}};
handle_call(get_users, _From, #state{users = Users} = State) ->
UserNames = [maps:get(name, U) || {_, U} <- maps:to_list(Users)],
{reply, UserNames, State};
handle_call(get_history, _From, #state{history = History} = State) ->
{reply, lists:reverse(History), State}.
handle_cast({leave, Pid, UserName}, #state{users = Users} = State) ->
NewUsers = maps:remove(Pid, Users),
broadcast_to_users(NewUsers, {user_left, UserName}),
{noreply, State#state{users = NewUsers}};
handle_cast({message, UserName, Message}, #state{users = Users, history = History, max_history = Max} = State) ->
Msg = #{
from => UserName,
message => Message,
timestamp => erlang:system_time(second)
},
%% 广播给所有人
broadcast_to_users(Users, {chat, UserName, Message}),
%% 保存历史
NewHistory = lists:sublist([Msg | History], Max),
{noreply, State#state{history = NewHistory}}.
handle_info({'DOWN', _, process, Pid, _}, #state{users = Users} = State) ->
case maps:find(Pid, Users) of
{ok, #{name := UserName}} ->
NewUsers = maps:remove(Pid, Users),
broadcast_to_users(NewUsers, {user_left, UserName}),
{noreply, State#state{users = NewUsers}};
error ->
{noreply, State}
end.
%% 内部函数
broadcast_to_users(Users, Msg) ->
maps:foreach(fun(Pid, _) -> Pid ! Msg end, Users).
25.2.3 WebSocket Handler
%% src/chat_ws_handler.erl
-module(chat_ws_handler).
-export([init/2, websocket_init/1, websocket_handle/2,
websocket_info/2, terminate/3]).
init(Req, State) ->
%% 从查询参数获取用户名
UserName = cowboy_req:qs_val(<<"name">>, Req, <<"anonymous">>),
RoomName = cowboy_req:qs_val(<<"room">>, Req, <<"general">>),
{cowboy_websocket, Req, State#{
user_name => UserName,
room_name => RoomName
}}.
websocket_init(#{user_name := UserName, room_name := RoomName} = State) ->
%% 查找或创建房间
RoomPid = case pg:get_members({chat_rooms, RoomName}) of
[Pid | _] -> Pid;
[] ->
{ok, Pid} = chat_room_sup:start_room(RoomName),
Pid
end,
chat_room:join(RoomPid, UserName),
{ok, State#{room_pid => RoomPid}}.
websocket_handle({text, Msg}, #{user_name := UserName, room_pid := RoomPid} = State) ->
chat_room:send_message(RoomPid, UserName, Msg),
{[], State};
websocket_handle(_Frame, State) ->
{[], State}.
websocket_info({chat, UserName, Message}, State) ->
Response = jsx:encode(#{
type => <<"message">>,
from => UserName,
message => Message,
timestamp => erlang:system_time(second)
}),
{[{text, Response}], State};
websocket_info({user_joined, UserName}, State) ->
Response = jsx:encode(#{
type => <<"system">>,
event => <<"user_joined">>,
user => UserName
}),
{[{text, Response}], State};
websocket_info({user_left, UserName}, State) ->
Response = jsx:encode(#{
type => <<"system">>,
event => <<"user_left">>,
user => UserName
}),
{[{text, Response}], State};
websocket_info(_Info, State) ->
{[], State}.
terminate(_Reason, _Req, #{user_name := UserName, room_pid := RoomPid}) ->
chat_room:leave(RoomPid, UserName),
ok;
terminate(_, _, _) ->
ok.
25.3 项目三:实时监控系统
25.3.1 需求
- 监控多个服务器的 CPU、内存、网络指标
- 指标存储在 ETS 中
- 支持阈值告警
- WebSocket 推送实时数据
25.3.2 指标收集器
%% src/metrics_collector.erl
-module(metrics_collector).
-behaviour(gen_server).
-export([start_link/0, report/2, get_metrics/1, subscribe/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%% 上报指标
-spec report(atom(), map()) -> ok.
report(Host, Metrics) ->
gen_server:cast(?MODULE, {report, Host, Metrics}).
%% 获取指标
-spec get_metrics(atom()) -> [map()].
get_metrics(Host) ->
case ets:lookup(metrics_table, Host) of
[{Host, Data}] -> Data;
[] -> []
end.
%% 订阅告警
subscribe() ->
gen_server:cast(?MODULE, {subscribe, self()}).
init([]) ->
ets:new(metrics_table, [set, public, named_table]),
ets:new(subscribers_table, [set, public, named_table]),
%% 启动定时检查
timer:send_interval(5000, check_alerts),
{ok, #{
subscribers => sets:new(),
thresholds => #{
cpu_usage => 90,
memory_usage => 85,
disk_usage => 95
}
}}.
handle_cast({report, Host, Metrics}, State) ->
%% 存储到 ETS
Now = erlang:system_time(second),
Entry = Metrics#{timestamp => Now},
case ets:lookup(metrics_table, Host) of
[{Host, History}] ->
NewHistory = lists:sublist([Entry | History], 100),
ets:insert(metrics_table, {Host, NewHistory});
[] ->
ets:insert(metrics_table, {Host, [Entry]})
end,
%% 检查告警
check_thresholds(Host, Metrics, State),
{noreply, State};
handle_cast({subscribe, Pid}, #{subscribers := Subs} = State) ->
monitor(process, Pid),
NewSubs = sets:add_element(Pid, Subs),
{noreply, State#{subscribers => NewSubs}}.
handle_info(check_alerts, State) ->
%% 定期检查所有主机的指标
AllMetrics = ets:tab2list(metrics_table),
lists:foreach(fun({Host, [Latest | _]}) ->
check_thresholds(Host, Latest, State)
end, AllMetrics),
{noreply, State};
handle_info({'DOWN', _, process, Pid, _}, #{subscribers := Subs} = State) ->
{noreply, State#{subscribers => sets:del_element(Pid, Subs)}}.
%% 内部函数
check_thresholds(Host, Metrics, #{thresholds := Thresholds, subscribers := Subs}) ->
maps:foreach(fun(Key, Limit) ->
case maps:get(Key, Metrics, 0) of
Value when Value > Limit ->
Alert = #{
host => Host,
metric => Key,
value => Value,
threshold => Limit,
timestamp => erlang:system_time(second)
},
broadcast_alert(Subs, Alert);
_ ->
ok
end
end, Thresholds).
broadcast_alert(Subs, Alert) ->
sets:foreach(fun(Pid) ->
Pid ! {alert, Alert}
end, Subs).
25.3.3 告警处理器
%% src/alert_handler.erl
-module(alert_handler).
-export([start_link/0, loop/0]).
start_link() ->
Pid = spawn_link(fun loop/0),
metrics_collector:subscribe(),
{ok, Pid}.
loop() ->
receive
{alert, #{host := Host, metric := Metric, value := Value, threshold := Threshold}} ->
logger:warning("ALERT: ~p on ~p = ~p (threshold: ~p)",
[Metric, Host, Value, Threshold]),
%% 可以添加:发送邮件、Slack 通知等
loop();
stop ->
ok
end.
25.4 项目总结
25.4.1 技术栈回顾
| 项目 | 核心技术 |
|---|---|
| 消息队列 | GenServer, ETS, Monitor, 消息传递 |
| 聊天服务器 | Cowboy WebSocket, pg 模块, Supervisor |
| 监控系统 | ETS, Timer, 广播模式 |
25.4.2 OTP 设计模式总结
| 模式 | 用途 |
|---|---|
| GenServer | 有状态服务 |
| Supervisor | 进程监控和重启 |
| Application | 应用生命周期 |
| ETS | 高速共享存储 |
| pg | 进程组管理 |
| Monitor/Link | 进程监控 |
25.5 进一步学习
25.5.1 推荐项目
| 项目 | 难度 | 学习目标 |
|---|---|---|
| HTTP Proxy | 中等 | TCP Socket, Binary 处理 |
| 分布式 KV 存储 | 难 | 分布式一致性, Mnesia |
| IoT 平台 | 难 | 百万连接, MQTT |
| 游戏服务器 | 难 | 实时状态同步 |
| Job Scheduler | 中等 | 定时任务, 优先级队列 |
25.5.2 开源项目参考
| 项目 | 说明 |
|---|---|
| RabbitMQ | 消息队列 |
| EMQX | MQTT Broker |
| CouchDB | 文档数据库 |
| Phoenix | Elixir Web 框架 |
| Ejabberd | XMPP 服务器 |
25.6 恭喜!
你已经完成了 Erlang/OTP 完全指南的全部 25 章!
回顾你的学习旅程:
✅ 第 1-6 章:语言基础(变量、模式匹配、函数)
✅ 第 7-8 章:数据结构(列表、元组、Map)
✅ 第 9-12 章:并发与 OTP(进程、GenServer、Supervisor)
✅ 第 13-15 章:存储与网络(ETS、Mnesia、Socket)
✅ 第 16-21 章:工程化(错误处理、测试、部署、性能)
✅ 第 22-25 章:进阶与实战(NIF、Web、最佳实践、实战项目)
“The best way to learn Erlang is to write Erlang.” — Joe Armstrong
25.7 扩展阅读
- 📖 Programming Erlang (2nd Edition) — Joe Armstrong
- 📖 Learn You Some Erlang — 免费在线教程
- 📖 Erlang/OTP 官方文档
- 📖 Designing for Scalability with Erlang/OTP — Francesco Cesarini
- 📖 Elixir in Action — Saša Jurić
上一章:24 - 最佳实践 返回目录:Erlang/OTP 完全指南