1. 当Task组合变成毛线团时
作为ASP.NET Core开发者,我们经常需要处理这样的场景:用户注册时需要同时写入数据库、发送欢迎邮件、更新统计报表,这三个操作既需要保证执行顺序又要处理异常。当这样的任务组合越来越复杂时,代码就会像被猫咪玩过的毛线团一样纠缠不清。
最近在重构一个电商系统时,我遇到了一个包含12个异步任务的订单处理流程。最初的实现用了三层嵌套的ContinueWith,读代码时就像在迷宫里找出口。这迫使我开始系统性地研究Task组合的优化方案。
2. 解构Task组合的四种武器
2.1 基础款:ContinueWith接力赛
// ASP.NET Core 6.0 Web API示例
public async Task<IActionResult> ProcessOrder()
{
// 第一阶段:验证库存
var checkTask = _inventoryService.CheckStockAsync(orderItems)
.ContinueWith(async firstTask => {
if (!firstTask.Result) throw new InsufficientStockException();
// 第二阶段:创建支付记录
var paymentId = await _paymentService.CreatePaymentAsync(totalAmount);
// 第三阶段:扣减库存
await _inventoryService.DeductStockAsync(orderItems);
return paymentId;
}, TaskContinuationOptions.OnlyOnRanToCompletion);
try {
var result = await checkTask;
return Ok(new { PaymentId = result });
} catch (Exception ex) {
_logger.LogError(ex, "订单处理失败");
return StatusCode(500);
}
}
优点:清晰的执行顺序控制,适合简单线性流程
缺点:多层嵌套降低可读性,异常处理复杂化
2.2 并行加速器:WhenAll全家桶
public async Task UpdateUserProfile(User user)
{
// 并行执行三个独立操作
var avatarTask = _storageService.UploadAvatarAsync(user.Avatar);
var historyTask = _historyService.MigrateUserHistoryAsync(user.Id);
var auditTask = _auditService.CreateAuditLogAsync(user);
await Task.WhenAll(avatarTask, historyTask, auditTask);
// 统一处理结果
var storageUrl = await avatarTask;
var successCount = await historyTask;
var auditId = await auditTask;
await _notificationService.SendCompleteEmail(user.Email);
}
适用场景:无依赖的并行任务,如数据预处理、多服务通知
注意事项:单个任务失败会导致AggregateException,建议配合ExceptionHandler使用
2.3 智能管家:Polly策略组合
// 安装NuGet包 Polly.Contrib.WaitAndRetry
public async Task ProcessFinancialReport()
{
var retryPolicy = Policy
.Handle<TimeoutException>()
.WaitAndRetryAsync(Backoff.DecorrelatedJitterBackoffV2(
medianFirstRetryDelay: TimeSpan.FromSeconds(1),
retryCount: 3));
var circuitBreaker = Policy
.Handle<HttpRequestException>()
.CircuitBreakerAsync(3, TimeSpan.FromMinutes(1));
var combinedPolicy = Policy.WrapAsync(retryPolicy, circuitBreaker);
await combinedPolicy.ExecuteAsync(async () => {
var dataTask = _legacyService.FetchFinancialDataAsync();
var convertTask = dataTask.ContinueWith(d =>
_converter.TransformToModernFormat(d.Result));
await Task.WhenAll(convertTask, _cacheService.RefreshAsync());
});
}
技术亮点:
- 指数退避重试策略缓解瞬时故障
- 熔断机制防止级联故障
- 策略组合实现弹性调用
2.4 流水线大师:TPL Dataflow
// 安装NuGet包 System.Threading.Tasks.Dataflow
public async Task DataProcessingPipeline()
{
var bufferBlock = new BufferBlock<RawData>();
var transformBlock = new TransformBlock<RawData, ProcessedData>(data =>
_processor.TransformAsync(data), new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = 4
});
var batchBlock = new BatchBlock<ProcessedData>(100);
var actionBlock = new ActionBlock<ProcessedData[]>(batch =>
_repository.BulkInsertAsync(batch));
transformBlock.LinkTo(batchBlock);
batchBlock.LinkTo(actionBlock);
// 启动数据流
foreach (var data in _dataSource.GetStreamingData())
{
await bufferBlock.SendAsync(data);
}
bufferBlock.Complete();
await actionBlock.Completion;
}
核心优势:
- 可视化的工作流设计
- 精确控制并行度
- 内置背压支持
3. 技术选型指南针
3.1 应用场景矩阵
场景特征 | 推荐方案 |
---|---|
简单顺序执行 | ContinueWith链式调用 |
并行无依赖任务 | WhenAll全家桶 |
需要弹性策略 | Polly策略组合 |
数据流处理 | TPL Dataflow流水线 |
3.2 性能对比备忘录
- ContinueWith:适合轻量级链式调用,但深度嵌套会影响可维护性
- WhenAll:并行效率高,但资源占用与任务数正相关
- Polly:增加约15%的CPU开销,换取系统稳定性提升
- Dataflow:内存占用较高,适合大数据量吞吐场景
4. 避坑指南:那些年我们踩过的雷
4.1 异常处理黑洞
当使用WhenAll时,很多开发者会忽略异常处理的特殊性:
// 错误示例
try {
await Task.WhenAll(task1, task2);
} catch (Exception ex) {
// 这里只能捕获第一个异常!
}
// 正确做法
var allTasks = Task.WhenAll(task1, task2);
try {
await allTasks;
} catch {
foreach (var ex in allTasks.Exception.InnerExceptions) {
// 处理所有异常
}
}
4.2 上下文陷阱
在ASP.NET Core中,默认的Task调度会捕获执行上下文。当使用ConfigureAwait(false)
时要注意:
async Task UpdateCache()
{
// 正确使用ConfigureAwait
var data = await _remoteService.GetDataAsync()
.ConfigureAwait(false);
// 此处已不在原始上下文
ProcessData(data);
// 需要恢复上下文时
await _dbContext.SaveChangesAsync()
.ConfigureAwait(true);
}
4.3 资源泄漏监控
长期运行的Task可能引发内存泄漏,建议配合Diagnostic工具:
// 安装NuGet包 Microsoft.Extensions.Diagnostics.HealthChecks
services.AddHealthChecks()
.AddCheck<AsyncTaskHealthCheck>("async_tasks");
// 自定义健康检查
public class AsyncTaskHealthCheck : IHealthCheck
{
public Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken = default)
{
var leakCount = TaskTracker.GetLeakedTasksCount();
return leakCount > 0
? Task.FromResult(HealthCheckResult.Degraded($"发现{leakCount}个僵尸任务"))
: Task.FromResult(HealthCheckResult.Healthy());
}
}
5. 终极组合技实战
将Polly与Dataflow结合的电商订单处理系统:
public class OrderPipeline
{
private readonly TransformBlock<Order, OrderResult> _processingBlock;
public OrderPipeline()
{
var retryPolicy = Policy<OrderResult>
.Handle<TimeoutException>()
.RetryAsync(2);
_processingBlock = new TransformBlock<Order, OrderResult>(async order =>
await retryPolicy.ExecuteAsync(async () => {
// 并行执行校验
var validationTasks = new[]
{
_fraudService.ValidateAsync(order),
_inventoryService.CheckStockAsync(order.Items)
};
await Task.WhenAll(validationTasks);
// 创建支付记录
var paymentTask = _paymentService.CreateAsync(order.Total);
// 扣减库存
var deductTask = _inventoryService.DeductStockAsync(order.Items);
await Task.WhenAll(paymentTask, deductTask);
return new OrderResult(order.Id, paymentTask.Result);
}), new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = 10
});
}
public async Task ProcessBatch(IEnumerable<Order> orders)
{
foreach (var order in orders)
{
await _processingBlock.SendAsync(order);
}
_processingBlock.Complete();
await _processingBlock.Completion;
}
}
这个实现方案具有:
- 自动重试机制
- 可控的并行度
- 批量处理能力
- 完善的错误隔离
6. 总结与展望
通过本文的多个实战示例,我们看到了ASP.NET Core中处理复杂Task组合的不同武器库。就像瑞士军刀有不同的工具应对不同场景,开发者需要根据具体需求选择合适的方案:
- 对于简单链式调用,ContinueWith仍然是最佳选择
- 需要弹性能力时,Polly的策略组合是不二之选
- 大数据量处理场景,TPL Dataflow的流水线设计优势明显
- WhenAll在并行无依赖任务中继续保持统治地位
未来随着.NET 8的发布,我们可能会看到更强大的Parallel.ForEachAsync等新特性。但无论工具如何演进,掌握核心的任务组合原理,保持代码的可读性和可维护性,才是应对复杂异步编程的终极解决方案。