1. 为什么需要任务队列管理?

想象你正在运营一个快递分拣中心。每天有成千上万的包裹(任务)涌入,分拣员(工作线程)需要有条不紊地处理这些包裹。如果直接让分拣员争抢包裹,现场必定混乱不堪。任务队列就像传送带系统,让包裹按顺序流动,分拣员从固定位置取件,这就是并发场景下队列管理的核心价值。

最近团队遇到一个典型案例:日志处理系统在百万级请求下频繁崩溃。经排查发现,日志写入线程直接操作文件导致IO阻塞,工作线程与IO线程相互等待形成死锁。引入任务队列后,系统吞吐量提升了8倍,CPU利用率从90%降至60%。

2. 典型应用场景

2.1 日志异步写入系统

// 技术栈:.NET 6 + BlockingCollection
public class LogQueueManager
{
    private readonly BlockingCollection<string> _logQueue = new();
    private readonly CancellationTokenSource _cts = new();

    // 启动消费者线程(示例仅展示核心逻辑)
    public void StartConsumers(int workerCount)
    {
        for (int i = 0; i < workerCount; i++)
        {
            Task.Run(() =>
            {
                foreach (var log in _logQueue.GetConsumingEnumerable(_cts.Token))
                {
                    File.AppendAllText("app.log", $"{DateTime.Now:HH:mm:ss} - {log}\n");
                }
            });
        }
    }

    // 生产者方法
    public void EnqueueLog(string message)
    {
        if (!_logQueue.TryAdd(message, 1000)) // 1秒超时
        {
            // 队列满时的降级策略
            Console.WriteLine($"日志队列已满,丢弃日志:{message}");
        }
    }
}

2.2 电商秒杀系统

// 技术栈:.NET 7 + Channel
public class SpikeQueueService
{
    private readonly Channel<OrderRequest> _orderChannel;
    private const int MaxQueueLength = 5000;

    public SpikeQueueService()
    {
        // 创建有界通道防止内存溢出
        var options = new BoundedChannelOptions(MaxQueueLength)
        {
            FullMode = BoundedChannelFullMode.DropOldest // 队列满时丢弃最旧请求
        };
        _orderChannel = Channel.CreateBounded<OrderRequest>(options);
    }

    // 下单请求入队
    public async ValueTask EnqueueOrderAsync(OrderRequest request)
    {
        while (await _orderChannel.Writer.WaitToWriteAsync())
        {
            if (_orderChannel.Writer.TryWrite(request))
            {
                return;
            }
        }
    }

    // 订单处理消费者
    public async Task ProcessOrdersAsync(CancellationToken token)
    {
        await foreach (var order in _orderChannel.Reader.ReadAllAsync(token))
        {
            try
            {
                // 库存校验与扣减逻辑
                if (await InventoryService.ReduceStockAsync(order.SkuId))
                {
                    await OrderService.CreateOrderAsync(order);
                }
            }
            catch (Exception ex)
            {
                Logger.LogError(ex, "订单处理失败");
            }
        }
    }
}

3. 三大实现方案对比

3.1 BlockingCollection方案

// 经典生产者-消费者模式实现
public class BlockingQueue<T>
{
    private readonly BlockingCollection<T> _collection = new();

    // 生产方法(支持超时控制)
    public bool TryAdd(T item, int timeoutMs)
    {
        return _collection.TryAdd(item, timeoutMs);
    }

    // 消费方法(阻塞式)
    public IEnumerable<T> GetConsumingItems()
    {
        return _collection.GetConsumingEnumerable();
    }
}

优点:实现简单,自带阻塞机制
缺点:不支持异步操作,吞吐量有限

3.2 Channel方案

// 异步队列最佳实践
public class AsyncQueue<T>
{
    private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();

    public async ValueTask EnqueueAsync(T item)
    {
        await _channel.Writer.WriteAsync(item);
    }

    public IAsyncEnumerable<T> DequeueAsync()
    {
        return _channel.Reader.ReadAllAsync();
    }
}

优点:高性能异步支持,内存控制灵活
缺点:需要手动触发消费

3.3 TPL Dataflow方案

// 复杂流程控制示例
public class DataflowPipeline
{
    private readonly TransformBlock<InputData, ProcessedData> _processor;
    private readonly ActionBlock<ProcessedData> _persister;

    public DataflowPipeline()
    {
        _processor = new TransformBlock<InputData, ProcessedData>(data =>
        {
            // 耗时计算操作
            return HeavyCompute(data);
        }, new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = Environment.ProcessorCount
        });

        _persister = new ActionBlock<ProcessedData>(async data =>
        {
            await SaveToDatabaseAsync(data);
        }, new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 1 // 数据库写入保持顺序
        });

        _processor.LinkTo(_persister, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Post(InputData data) => _processor.Post(data);
}

优点:支持复杂流水线,细粒度控制
缺点:学习曲线陡峭

4. 关键技术对比表

特性 BlockingCollection Channel TPL Dataflow
异步支持
背压控制 手动实现 内置 内置
内存管理 无限制 可配置 可配置
线程模型 同步阻塞 异步非阻塞 混合模式
复杂流程支持
学习难度 简单 中等 较高

5. 避坑指南与最佳实践

5.1 线程安全三大原则

  1. 使用并发集合代替锁机制
// 错误示范:使用List加锁
private readonly List<string> _list = new();
private readonly object _lock = new();

public void AddItem(string item)
{
    lock (_lock)
    {
        _list.Add(item);
    }
}

// 正确示范:使用ConcurrentQueue
private readonly ConcurrentQueue<string> _queue = new();

public void AddItem(string item)
{
    _queue.Enqueue(item);
}
  1. 资源释放的优雅关闭
public async Task StopAsync()
{
    _channel.Writer.Complete(); // 停止接收新任务
    await _processingTask;      // 等待现有任务完成
    _database.CloseConnection();// 释放资源
}
  1. 异常处理的防御策略
public async Task ProcessItemsAsync()
{
    await foreach (var item in _channel.Reader.ReadAllAsync())
    {
        try
        {
            await ProcessItemAsync(item);
        }
        catch (TransientException ex)
        {
            // 重试机制
            await RetryPolicy.ExecuteAsync(() => ProcessItemAsync(item));
        }
        catch (FatalException ex)
        {
            // 熔断机制
            _circuitBreaker.Trip();
            break;
        }
    }
}

6. 性能优化实战技巧

  1. 批量处理提升吞吐量
private readonly BatchBlock<string> _batchBlock = new(100);
private readonly ActionBlock<string[]> _saveBlock;

public BulkProcessor()
{
    _saveBlock = new ActionBlock<string[]>(async batch =>
    {
        await Database.BulkInsertAsync(batch);
    });

    _batchBlock.LinkTo(_saveBlock);
}

public void Enqueue(string item) => _batchBlock.Post(item);
  1. 动态调节工作线程
public class ElasticWorkerPool
{
    private int _currentWorkers;
    private readonly int _maxWorkers = Environment.ProcessorCount * 2;

    public void AdjustWorkers()
    {
        var queueLength = GetQueueLength();
        var targetWorkers = Math.Min(
            _maxWorkers, 
            (int)Math.Ceiling(queueLength / 50.0)
        );

        while (_currentWorkers < targetWorkers)
        {
            StartNewWorker();
            _currentWorkers++;
        }
    }
}

7. 总结与展望

任务队列管理如同交通控制系统,需要根据场景选择合适方案。对于简单场景,BlockingCollection即可满足需求;高并发异步场景优选Channel;复杂流水线则选择TPL Dataflow。建议在开发初期就建立队列监控体系,记录队列长度、处理延迟等关键指标。

未来可以探索与Kafka等消息队列的集成,实现分布式队列管理。同时关注.NET 8即将推出的PriorityQueue增强功能,为优先级队列场景提供更优解。