1. 为什么需要任务队列管理?
想象你正在运营一个快递分拣中心。每天有成千上万的包裹(任务)涌入,分拣员(工作线程)需要有条不紊地处理这些包裹。如果直接让分拣员争抢包裹,现场必定混乱不堪。任务队列就像传送带系统,让包裹按顺序流动,分拣员从固定位置取件,这就是并发场景下队列管理的核心价值。
最近团队遇到一个典型案例:日志处理系统在百万级请求下频繁崩溃。经排查发现,日志写入线程直接操作文件导致IO阻塞,工作线程与IO线程相互等待形成死锁。引入任务队列后,系统吞吐量提升了8倍,CPU利用率从90%降至60%。
2. 典型应用场景
2.1 日志异步写入系统
// 技术栈:.NET 6 + BlockingCollection
public class LogQueueManager
{
private readonly BlockingCollection<string> _logQueue = new();
private readonly CancellationTokenSource _cts = new();
// 启动消费者线程(示例仅展示核心逻辑)
public void StartConsumers(int workerCount)
{
for (int i = 0; i < workerCount; i++)
{
Task.Run(() =>
{
foreach (var log in _logQueue.GetConsumingEnumerable(_cts.Token))
{
File.AppendAllText("app.log", $"{DateTime.Now:HH:mm:ss} - {log}\n");
}
});
}
}
// 生产者方法
public void EnqueueLog(string message)
{
if (!_logQueue.TryAdd(message, 1000)) // 1秒超时
{
// 队列满时的降级策略
Console.WriteLine($"日志队列已满,丢弃日志:{message}");
}
}
}
2.2 电商秒杀系统
// 技术栈:.NET 7 + Channel
public class SpikeQueueService
{
private readonly Channel<OrderRequest> _orderChannel;
private const int MaxQueueLength = 5000;
public SpikeQueueService()
{
// 创建有界通道防止内存溢出
var options = new BoundedChannelOptions(MaxQueueLength)
{
FullMode = BoundedChannelFullMode.DropOldest // 队列满时丢弃最旧请求
};
_orderChannel = Channel.CreateBounded<OrderRequest>(options);
}
// 下单请求入队
public async ValueTask EnqueueOrderAsync(OrderRequest request)
{
while (await _orderChannel.Writer.WaitToWriteAsync())
{
if (_orderChannel.Writer.TryWrite(request))
{
return;
}
}
}
// 订单处理消费者
public async Task ProcessOrdersAsync(CancellationToken token)
{
await foreach (var order in _orderChannel.Reader.ReadAllAsync(token))
{
try
{
// 库存校验与扣减逻辑
if (await InventoryService.ReduceStockAsync(order.SkuId))
{
await OrderService.CreateOrderAsync(order);
}
}
catch (Exception ex)
{
Logger.LogError(ex, "订单处理失败");
}
}
}
}
3. 三大实现方案对比
3.1 BlockingCollection方案
// 经典生产者-消费者模式实现
public class BlockingQueue<T>
{
private readonly BlockingCollection<T> _collection = new();
// 生产方法(支持超时控制)
public bool TryAdd(T item, int timeoutMs)
{
return _collection.TryAdd(item, timeoutMs);
}
// 消费方法(阻塞式)
public IEnumerable<T> GetConsumingItems()
{
return _collection.GetConsumingEnumerable();
}
}
优点:实现简单,自带阻塞机制
缺点:不支持异步操作,吞吐量有限
3.2 Channel方案
// 异步队列最佳实践
public class AsyncQueue<T>
{
private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();
public async ValueTask EnqueueAsync(T item)
{
await _channel.Writer.WriteAsync(item);
}
public IAsyncEnumerable<T> DequeueAsync()
{
return _channel.Reader.ReadAllAsync();
}
}
优点:高性能异步支持,内存控制灵活
缺点:需要手动触发消费
3.3 TPL Dataflow方案
// 复杂流程控制示例
public class DataflowPipeline
{
private readonly TransformBlock<InputData, ProcessedData> _processor;
private readonly ActionBlock<ProcessedData> _persister;
public DataflowPipeline()
{
_processor = new TransformBlock<InputData, ProcessedData>(data =>
{
// 耗时计算操作
return HeavyCompute(data);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
});
_persister = new ActionBlock<ProcessedData>(async data =>
{
await SaveToDatabaseAsync(data);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1 // 数据库写入保持顺序
});
_processor.LinkTo(_persister, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Post(InputData data) => _processor.Post(data);
}
优点:支持复杂流水线,细粒度控制
缺点:学习曲线陡峭
4. 关键技术对比表
特性 | BlockingCollection | Channel | TPL Dataflow |
---|---|---|---|
异步支持 | ❌ | ✅ | ✅ |
背压控制 | 手动实现 | 内置 | 内置 |
内存管理 | 无限制 | 可配置 | 可配置 |
线程模型 | 同步阻塞 | 异步非阻塞 | 混合模式 |
复杂流程支持 | ❌ | ❌ | ✅ |
学习难度 | 简单 | 中等 | 较高 |
5. 避坑指南与最佳实践
5.1 线程安全三大原则
- 使用并发集合代替锁机制
// 错误示范:使用List加锁
private readonly List<string> _list = new();
private readonly object _lock = new();
public void AddItem(string item)
{
lock (_lock)
{
_list.Add(item);
}
}
// 正确示范:使用ConcurrentQueue
private readonly ConcurrentQueue<string> _queue = new();
public void AddItem(string item)
{
_queue.Enqueue(item);
}
- 资源释放的优雅关闭
public async Task StopAsync()
{
_channel.Writer.Complete(); // 停止接收新任务
await _processingTask; // 等待现有任务完成
_database.CloseConnection();// 释放资源
}
- 异常处理的防御策略
public async Task ProcessItemsAsync()
{
await foreach (var item in _channel.Reader.ReadAllAsync())
{
try
{
await ProcessItemAsync(item);
}
catch (TransientException ex)
{
// 重试机制
await RetryPolicy.ExecuteAsync(() => ProcessItemAsync(item));
}
catch (FatalException ex)
{
// 熔断机制
_circuitBreaker.Trip();
break;
}
}
}
6. 性能优化实战技巧
- 批量处理提升吞吐量
private readonly BatchBlock<string> _batchBlock = new(100);
private readonly ActionBlock<string[]> _saveBlock;
public BulkProcessor()
{
_saveBlock = new ActionBlock<string[]>(async batch =>
{
await Database.BulkInsertAsync(batch);
});
_batchBlock.LinkTo(_saveBlock);
}
public void Enqueue(string item) => _batchBlock.Post(item);
- 动态调节工作线程
public class ElasticWorkerPool
{
private int _currentWorkers;
private readonly int _maxWorkers = Environment.ProcessorCount * 2;
public void AdjustWorkers()
{
var queueLength = GetQueueLength();
var targetWorkers = Math.Min(
_maxWorkers,
(int)Math.Ceiling(queueLength / 50.0)
);
while (_currentWorkers < targetWorkers)
{
StartNewWorker();
_currentWorkers++;
}
}
}
7. 总结与展望
任务队列管理如同交通控制系统,需要根据场景选择合适方案。对于简单场景,BlockingCollection即可满足需求;高并发异步场景优选Channel;复杂流水线则选择TPL Dataflow。建议在开发初期就建立队列监控体系,记录队列长度、处理延迟等关键指标。
未来可以探索与Kafka等消息队列的集成,实现分布式队列管理。同时关注.NET 8即将推出的PriorityQueue增强功能,为优先级队列场景提供更优解。