一、为什么需要大文件上传并发控制

在开发文件上传功能时,小文件上传通常不会遇到太大问题,但一旦涉及大文件(比如几百MB甚至几个GB的文件),事情就变得复杂了。如果直接使用单线程上传,不仅速度慢,而且一旦网络波动导致上传失败,用户就得从头再来,体验极差。

这时候,多线程分块上传就成了一个不错的解决方案。它可以把大文件切成多个小块,让多个线程同时上传,最后在服务器端合并。这样做的好处是:

  1. 提高上传速度:多个线程并行传输,充分利用带宽。
  2. 增强稳定性:即使某个分块上传失败,只需重传该分块,而不是整个文件。
  3. 降低服务器压力:分块上传可以更好地控制资源占用,避免单个大文件占用过多内存或CPU。

但问题来了——如何控制并发线程数? 线程太少,上传速度上不去;线程太多,又可能导致客户端或服务器资源耗尽。这就是我们今天要讨论的重点。


二、C#/.NET 实现多线程分块上传

在.NET环境下,我们可以利用HttpClient结合Task来实现多线程上传。下面是一个完整的示例,展示如何分块上传文件并动态调整线程数。

示例1:基础分块上传实现

using System;
using System.IO;
using System.Net.Http;
using System.Threading.Tasks;

public class FileUploader
{
    private readonly HttpClient _httpClient;
    private readonly int _maxConcurrentThreads; // 最大并发线程数
    private readonly int _chunkSize; // 每个分块的大小(字节)

    public FileUploader(int maxConcurrentThreads = 4, int chunkSize = 1024 * 1024) // 默认1MB分块
    {
        _httpClient = new HttpClient();
        _maxConcurrentThreads = maxConcurrentThreads;
        _chunkSize = chunkSize;
    }

    // 上传文件的核心方法
    public async Task UploadFileAsync(string filePath, string uploadUrl)
    {
        var fileInfo = new FileInfo(filePath);
        var totalChunks = (int)Math.Ceiling((double)fileInfo.Length / _chunkSize);

        // 使用SemaphoreSlim控制并发数
        var semaphore = new SemaphoreSlim(_maxConcurrentThreads);

        var tasks = new List<Task>();
        for (int chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++)
        {
            await semaphore.WaitAsync(); // 等待信号量,控制并发

            tasks.Add(Task.Run(async () =>
            {
                try
                {
                    // 读取当前分块的数据
                    var startPos = chunkIndex * _chunkSize;
                    var buffer = new byte[Math.Min(_chunkSize, (int)(fileInfo.Length - startPos))];
                    using (var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read))
                    {
                        fs.Seek(startPos, SeekOrigin.Begin);
                        await fs.ReadAsync(buffer, 0, buffer.Length);
                    }

                    // 上传分块
                    var content = new ByteArrayContent(buffer);
                    await _httpClient.PostAsync($"{uploadUrl}?chunk={chunkIndex}", content);
                }
                finally
                {
                    semaphore.Release(); // 释放信号量
                }
            }));
        }

        await Task.WhenAll(tasks); // 等待所有分块上传完成
    }
}

代码解析:

  1. 分块逻辑:文件被分成固定大小的块(默认1MB),每个块独立上传。
  2. 并发控制:使用SemaphoreSlim限制同时运行的线程数,避免资源耗尽。
  3. 错误处理:虽然示例中未展示,但在实际项目中,应该对每个分块的上传结果进行检查,失败时进行重试。

三、动态调整线程数以优化性能

固定线程数可能无法适应所有场景。比如:

  • 用户网络带宽较大时,可以增加线程数以加快上传。
  • 服务器负载较高时,应减少线程数避免拖垮服务。

示例2:基于带宽的动态线程调整

public class AdaptiveFileUploader : FileUploader
{
    private readonly int _minThreads;
    private readonly int _maxThreads;
    private long _totalUploadedBytes;
    private DateTime _lastSpeedCheckTime;

    public AdaptiveFileUploader(int minThreads = 2, int maxThreads = 8) 
        : base(minThreads) // 初始线程数设为最小值
    {
        _minThreads = minThreads;
        _maxThreads = maxThreads;
    }

    public new async Task UploadFileAsync(string filePath, string uploadUrl)
    {
        _totalUploadedBytes = 0;
        _lastSpeedCheckTime = DateTime.Now;

        // 启动一个后台任务,动态调整线程数
        var adjustTask = Task.Run(async () =>
        {
            while (true)
            {
                await Task.Delay(5000); // 每5秒检查一次
                AdjustConcurrency();
            }
        });

        await base.UploadFileAsync(filePath, uploadUrl);
        adjustTask.Dispose(); // 上传完成后停止调整
    }

    private void AdjustConcurrency()
    {
        var elapsed = (DateTime.Now - _lastSpeedCheckTime).TotalSeconds;
        var speed = _totalUploadedBytes / elapsed / 1024; // KB/s

        // 根据当前速度调整线程数
        if (speed < 500 && _maxConcurrentThreads < _maxThreads)
        {
            _maxConcurrentThreads++; // 速度低,增加线程
        }
        else if (speed > 1000 && _maxConcurrentThreads > _minThreads)
        {
            _maxConcurrentThreads--; // 速度高,减少线程
        }

        _lastSpeedCheckTime = DateTime.Now;
        _totalUploadedBytes = 0;
    }
}

优化点:

  1. 带宽监测:通过计算已上传字节数和时间,估算当前上传速度。
  2. 动态调整:根据速度动态增减线程数,平衡速度和稳定性。

四、注意事项与最佳实践

  1. 分块大小选择

    • 太小(如100KB)会导致请求过多,增加开销。
    • 太大(如10MB)可能失去分块的意义。推荐1MB~5MB。
  2. 服务器端合并

    • 分块上传后,服务器需要按顺序合并文件。可以用FileStream依次写入。
  3. 异常处理

    • 网络波动可能导致分块上传失败,需实现自动重试机制。
  4. 资源释放

    • HttpClient应复用而非频繁创建,避免Socket耗尽。
  5. 安全考虑

    • 验证分块序号,防止恶意请求导致文件错乱。

五、总结

通过多线程分块上传,我们可以显著提升大文件上传的效率和可靠性。关键在于:

  • 合理控制并发数,避免资源竞争。
  • 动态调整策略,适应不同网络环境。
  • 完善的错误处理,确保上传过程的健壮性。

在.NET中,借助HttpClientSemaphoreSlimTask,我们可以轻松实现这一功能。如果你有更复杂的需求(比如断点续传),可以在现有基础上进一步扩展。