在开发过程中,我们经常会遇到需要上传大文件到 S3 存储服务的情况。为了提高上传效率,多线程上传是个不错的选择,但同时也要控制好线程数和资源占用。下面就来详细聊聊怎么在 C#/.NET 里实现 S3 大文件上传的并发控制。

一、应用场景

在很多实际项目中,大文件上传是很常见的需求。比如视频网站,用户上传高清视频,这些视频文件往往很大;还有企业级的文件管理系统,员工上传大型的项目文档等。在这些场景下,如果使用单线程上传,速度会非常慢,用户体验也不好。而多线程上传可以充分利用网络带宽和系统资源,加快上传速度。

二、技术优缺点

优点

  • 速度快:多线程上传可以同时处理多个文件块的上传,大大提高了上传效率。就好比搬一堆砖头,一个人搬肯定慢,多个人一起搬就快多了。
  • 资源利用充分:可以充分利用系统的 CPU 和网络带宽资源。当一个线程在等待网络响应时,其他线程可以继续工作。

缺点

  • 资源占用高:如果线程数设置不合理,会占用过多的系统资源,导致系统性能下降。比如,开了太多线程,CPU 就会忙不过来,其他程序可能就会卡顿。
  • 并发控制复杂:需要处理好线程之间的同步和协调,避免出现数据混乱的问题。

三、实现步骤

1. 准备工作

首先,你需要安装 AWS SDK for .NET,这个可以通过 NuGet 包管理器来安装。在 Visual Studio 里,右键点击项目,选择“管理 NuGet 包”,搜索 AWSSDK.S3 并安装。

2. 示例代码

下面是一个完整的示例代码,展示了如何实现 S3 大文件的多线程上传,并控制线程数:

// 技术栈:C#/.NET
using Amazon;
using Amazon.S3;
using Amazon.S3.Model;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

class S3MultipartUploader
{
    private readonly AmazonS3Client s3Client;
    private readonly string bucketName;
    private readonly string key;
    private readonly string filePath;
    private readonly int maxThreads;

    public S3MultipartUploader(string accessKey, string secretKey, string bucketName, string key, string filePath, int maxThreads)
    {
        // 创建 S3 客户端
        this.s3Client = new AmazonS3Client(accessKey, secretKey, RegionEndpoint.USWest2);
        this.bucketName = bucketName;
        this.key = key;
        this.filePath = filePath;
        this.maxThreads = maxThreads;
    }

    public async Task UploadFileAsync()
    {
        // 初始化多部分上传
        var initiateRequest = new InitiateMultipartUploadRequest
        {
            BucketName = bucketName,
            Key = key
        };
        var initiateResponse = await s3Client.InitiateMultipartUploadAsync(initiateRequest);
        string uploadId = initiateResponse.UploadId;

        // 分割文件为多个部分
        var partSize = 5 * 1024 * 1024; // 每个部分 5MB
        var fileLength = new FileInfo(filePath).Length;
        var partCount = (int)Math.Ceiling((double)fileLength / partSize);

        // 存储每个部分的上传结果
        var uploadTasks = new List<Task<UploadPartResponse>>();
        var semaphore = new SemaphoreSlim(maxThreads);

        using (var fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read))
        {
            for (int i = 0; i < partCount; i++)
            {
                await semaphore.WaitAsync();

                var partNumber = i + 1;
                var offset = i * partSize;
                var length = Math.Min(partSize, fileLength - offset);

                var uploadRequest = new UploadPartRequest
                {
                    BucketName = bucketName,
                    Key = key,
                    UploadId = uploadId,
                    PartNumber = partNumber,
                    FilePosition = offset,
                    InputStream = fileStream,
                    PartSize = length
                };

                var task = s3Client.UploadPartAsync(uploadRequest).ContinueWith(t =>
                {
                    semaphore.Release();
                    return t.Result;
                });

                uploadTasks.Add(task);
            }

            // 等待所有部分上传完成
            var responses = await Task.WhenAll(uploadTasks);

            // 完成多部分上传
            var completeRequest = new CompleteMultipartUploadRequest
            {
                BucketName = bucketName,
                Key = key,
                UploadId = uploadId,
                PartETags = new List<PartETag>()
            };

            foreach (var response in responses)
            {
                completeRequest.PartETags.Add(new PartETag(response.PartNumber, response.ETag));
            }

            await s3Client.CompleteMultipartUploadAsync(completeRequest);
        }
    }
}

class Program
{
    static async Task Main()
    {
        string accessKey = "YOUR_ACCESS_KEY";
        string secretKey = "YOUR_SECRET_KEY";
        string bucketName = "YOUR_BUCKET_NAME";
        string key = "YOUR_FILE_KEY";
        string filePath = "PATH_TO_YOUR_FILE";
        int maxThreads = 5; // 最大线程数

        var uploader = new S3MultipartUploader(accessKey, secretKey, bucketName, key, filePath, maxThreads);
        await uploader.UploadFileAsync();
        Console.WriteLine("文件上传完成");
    }
}

代码解释

  • 初始化 S3 客户端:在 S3MultipartUploader 类的构造函数里,创建了一个 AmazonS3Client 对象,用于和 S3 服务进行交互。
  • 初始化多部分上传:调用 InitiateMultipartUploadAsync 方法,开始一个多部分上传任务,并获取上传 ID。
  • 分割文件:将大文件分割成多个 5MB 的部分,方便多线程上传。
  • 控制线程数:使用 SemaphoreSlim 来控制并发线程数。SemaphoreSlim 就像一个门卫,只允许指定数量的线程同时进入。
  • 上传每个部分:每个部分的上传是一个异步任务,通过 UploadPartAsync 方法上传。
  • 完成上传:当所有部分都上传完成后,调用 CompleteMultipartUploadAsync 方法,完成整个文件的上传。

四、注意事项

  • 线程数设置:线程数不是越多越好。要根据系统资源和网络带宽来合理设置。如果线程数太多,会导致系统资源紧张,上传速度反而会变慢。
  • 异常处理:在上传过程中,可能会出现网络异常、文件损坏等问题。要对这些异常进行捕获和处理,确保上传的稳定性。
  • 文件分割大小:每个部分的大小也需要合理设置。太小会增加请求次数,太大会增加单个请求的失败风险。

五、文章总结

通过多线程上传大文件到 S3 可以显著提高上传效率,但需要合理控制线程数和资源占用。在实现过程中,要注意线程同步、异常处理等问题。使用 SemaphoreSlim 可以方便地控制并发线程数,避免资源过度占用。同时,要根据实际情况调整文件分割大小和线程数,以达到最佳的上传效果。