一、Erlang消息队列的过载困境

想象一下你正在经营一家网红奶茶店。突然某天有个美食博主推荐了你家产品,瞬间涌入的订单让店员手忙脚乱——这就是Erlang进程可能遇到的场景。每个Erlang进程都有自己的邮箱(消息队列),当消息到达速度超过处理能力时,队列就会像奶茶店的订单一样堆积如山。

典型的过载场景包括:

  • 突发流量(DDoS攻击或促销活动)
  • 下游处理阻塞(如数据库响应变慢)
  • 消息处理耗时突增(复杂计算任务)
%% 过载进程示例
handle_info(Message, State) ->
    %% 模拟耗时操作
    timer:sleep(1000),
    %% 处理消息
    {noreply, State}.

这个简单的handler每个消息都要处理1秒钟,当消息到达速度超过1条/秒时,队列就会无限增长。就像奶茶店每位顾客都要现场现磨咖啡豆,队伍自然会排到马路对面。

二、流量控制的四大武器库

2.1 背压(Back Pressure)

就像奶茶店在门口挂"今日已售罄"的牌子,Erlang可以通过反向通知实现流量控制:

%% 背压实现示例
handle_call(Request, From, State) ->
    case overloaded() of
        true ->
            %% 返回busy信号
            {reply, {error, busy}, State};
        false ->
            %% 正常处理
            {reply, ok, State#state{pending = [From|State#state.pending]}}
    end.

overloaded() ->
    %% 根据队列长度判断是否过载
    {message_queue_len, Len} = process_info(self(), message_queue_len),
    Len > 100.

2.2 漏桶算法(Leaky Bucket)

想象一个底部有洞的水桶,无论倒入多快,流出速度都是恒定的:

%% 漏桶算法实现
start_leaky_bucket(Rate) ->
    spawn(fun() -> 
        %% 每100ms处理Rate条消息
        timer:send_interval(100, tick),
        loop(Rate, 0)
    end).

loop(Rate, Count) ->
    receive
        {enqueue, Msg} when Count < Rate ->
            %% 转发消息
            destination ! Msg,
            loop(Rate, Count + 1);
        {enqueue, _Msg} ->
            %% 限流丢弃
            loop(Rate, Count);
        tick ->
            %% 重置计数器
            loop(Rate, 0)
    end.

2.3 动态优先级调整

就像医院急诊分诊台,重要消息可以插队:

%% 优先级队列示例
handle_info({high_priority, Msg}, State) ->
    %% 紧急消息立即处理
    handle_emergency(Msg),
    {noreply, State};
handle_info({low_priority, Msg}, State) ->
    %% 普通消息进入队列
    {noreply, State#state{queue = queue:in(Msg, State#state.queue)}}.

2.4 进程池扩容

当单进程处理不过来时,可以像奶茶店开临时窗口:

%% 动态进程池示例
start_pool(Size) ->
    [spawn_worker() || _ <- lists:seq(1, Size)].

spawn_worker() ->
    spawn_link(fun() ->
        receive
            {work, Fun} -> 
                Fun(),
                spawn_worker()  %% 完成后重建worker
        end
    end).

dispatch(Fun) ->
    case whereis(worker_pool) of
        undefined -> start_pool(10);
        _ -> ok
    end,
    [Pid ! {work, Fun} || Pid <- pg:get_members(worker_pool)].

三、实战:电商秒杀系统优化

假设我们有个秒杀系统,原始版本是这样的:

%% 原始秒杀处理
seckill(Pid, UserId, ItemId) ->
    Pid ! {seckill, UserId, ItemId}.

handle_info({seckill, UserId, ItemId}, State) ->
    case check_inventory(ItemId) of
        {ok, Count} when Count > 0 ->
            decrease_inventory(ItemId),
            grant_item(UserId, ItemId);
        _ ->
            notify_sold_out(UserId)
    end,
    {noreply, State}.

优化后的版本结合了多种流量控制策略:

%% 优化后的秒杀处理
seckill(UserId, ItemId) ->
    %% 先经过速率限制层
    leaky_bucket ! {enqueue, {UserId, ItemId}}.

%% 漏桶进程
init_bucket() ->
    spawn(fun() ->
        %% 每秒最多处理1000请求
        timer:send_interval(100, tick),
        loop(0)
    end).

loop(Count) ->
    receive
        {enqueue, Request} when Count < 10 ->  %% 每100ms处理10个
            seckill_worker ! Request,
            loop(Count + 1);
        {enqueue, _} ->
            %% 记录被限流的请求
            stats:increment(dropped_requests),
            loop(Count);
        tick ->
            loop(0)
    end.

%% 工作进程池
start_workers() ->
    [spawn_link(fun worker/0) || _ <- lists:seq(1, 100)].

worker() ->
    receive
        {UserId, ItemId} ->
            %% 使用熔断机制保护数据库
            case fuse:ask(inventory_db, 5000) of
                ok ->
                    handle_seckill(UserId, ItemId);
                blown ->
                    notify_service_unavailable(UserId)
            end
    end,
    worker().

四、技术选型与注意事项

4.1 各种方案的适用场景

  • 背压控制:适合上下游有明确调用关系的场景,如微服务调用
  • 漏桶算法:适合需要严格限制速率的场景,如API调用限制
  • 令牌桶算法(未示例):适合允许突发流量的场景
  • 进程池:适合可以并行处理的无状态任务

4.2 常见陷阱

  1. 消息堆积监控缺失

    %% 应该定期检查消息队列长度
    monitor_queue() ->
        {message_queue_len, Len} = process_info(self(), message_queue_len),
        Len > WarningThreshold andalso alert().
    
  2. 流量控制引发死锁

    %% 错误示例:两个进程互相等待
    proc_a() ->
        proc_b ! request,
        receive response -> ok end.
    
    proc_b() ->
        proc_a ! request,
        receive response -> ok end.
    
  3. 忽视系统级限制: Erlang虚拟机本身有最大进程数限制(默认32,768),需要调整启动参数:

    erl +P 500000  # 设置最大进程数
    

4.3 性能调优指标

  • 消息队列平均长度
  • 消息处理延迟分布
  • 进程挂起时间比例
  • 垃圾回收频率

可以通过recon工具获取这些指标:

recon:proc_count(message_queue_len, 5).  %% 查看消息队列最长的5个进程

五、总结与最佳实践

经过多年实战,我总结了Erlang流量控制的"三要三不要"原则:

三要

  1. 要在系统设计初期考虑流量控制
  2. 要实施多层次的防御(应用层+系统层)
  3. 要建立完善的监控告警机制

三不要

  1. 不要依赖客户端行为(必须服务端强制控制)
  2. 不要单纯增加硬件资源(要先优化处理逻辑)
  3. 不要在过载时完全拒绝服务(建议优雅降级)

最后分享一个实用的流量控制模板:

-module(smart_worker).
-export([start_link/0, handle/2]).

start_link() ->
    proc_lib:start_link(?MODULE, init, []).

init() ->
    %% 注册到进程组
    pg:join(workers, self()),
    %% 设置初始状态
    State = #{
        rate_limit => 100,  %% 每秒最大处理量
        last_check => os:system_time(millisecond),
        current_count => 0
    },
    loop(State).

loop(State) ->
    Now = os:system_time(millisecond),
    {NewState, Msgs} = check_rate_limit(State, Now),
    process_messages(Msgs),
    loop(NewState).

check_rate_limit(#{rate_limit := Limit} = State, Now) ->
    %% 实现令牌桶算法
    ...

记住,好的流量控制就像优秀的交通管制,既不会让道路空置,也不会造成拥堵。希望这些方案能帮助你构建更健壮的Erlang系统!