一、为什么需要分布式锁

在微服务架构中,多个服务实例可能同时操作共享资源(比如数据库某条记录)。举个生活中的例子:就像超市里最后一瓶老干妈,如果收银系统没有锁机制,两个收银员可能同时标记"已售出",结果就是超卖。分布式锁的核心目标是保证在分布式环境下,同一时刻只有一个服务能执行关键操作。

技术栈说明:本文所有示例均基于 .NET Core 6 + C# 实现

二、基于数据库的分布式锁方案

1. 悲观锁实现

// 使用SQLServer的排他锁示例
public async Task<bool> UpdateWithPessimisticLock(int productId)
{
    using var connection = new SqlConnection(_config.GetConnectionString("Default"));
    await connection.OpenAsync();
    
    // 开启事务并加锁
    using var transaction = await connection.BeginTransactionAsync();
    try
    {
        // 关键点:WITH (UPDLOCK) 表示获取更新锁
        var sql = @"SELECT Quantity FROM Products WITH (UPDLOCK) 
                    WHERE ProductId = @productId";
        var quantity = await connection.QueryFirstOrDefaultAsync<int>(
            sql, new { productId }, transaction);
            
        if (quantity > 0)
        {
            await connection.ExecuteAsync(
                @"UPDATE Products SET Quantity = Quantity - 1 
                  WHERE ProductId = @productId",
                new { productId }, transaction);
                
            await transaction.CommitAsync();
            return true;
        }
        return false;
    }
    catch
    {
        await transaction.RollbackAsync();
        throw;
    }
}

优点:实现简单,强一致性保证
缺点:性能较差(锁表影响并发),数据库连接耗尽风险

2. 乐观锁实现

// 使用版本号控制的乐观锁
public async Task<bool> UpdateWithOptimisticLock(int productId, int originalVersion)
{
    using var connection = new SqlConnection(_config.GetConnectionString("Default"));
    
    // 通过版本号校验
    var affectedRows = await connection.ExecuteAsync(
        @"UPDATE Products 
          SET Quantity = Quantity - 1, 
              Version = Version + 1 
          WHERE ProductId = @productId 
          AND Version = @originalVersion",
        new { productId, originalVersion });
        
    return affectedRows > 0;
}

适用场景:读多写少,冲突概率低的场景

三、基于Redis的分布式锁方案

1. 基础SETNX实现

// 使用StackExchange.Redis实现
public async Task<bool> AcquireLockAsync(IDatabase redisDb, string lockKey, string lockValue, TimeSpan expiry)
{
    // 关键参数:NX(不存在才设置), EX(过期时间)
    return await redisDb.StringSetAsync(
        lockKey, 
        lockValue, 
        expiry, 
        When.NotExists);
}

// 释放锁时需验证value防止误删
public async Task ReleaseLockAsync(IDatabase redisDb, string lockKey, string lockValue)
{
    var luaScript = @"
        if redis.call('GET', KEYS[1]) == ARGV[1] then
            return redis.call('DEL', KEYS[1])
        else
            return 0
        end";
        
    await redisDb.ScriptEvaluateAsync(luaScript, 
        new { KEYS = new RedisKey[] { lockKey }, 
              ARGV = new RedisValue[] { lockValue } });
}

注意事项

  • 必须设置过期时间防止死锁
  • 锁值需唯一(推荐Guid+线程ID)
  • 建议实现自动续期机制

2. RedLock算法增强版

// 使用RedLock.Net库
public async Task<RedLock> AcquireRedLockAsync(string resource)
{
    var endPoint = new List<RedLockEndPoint> 
    { 
        new DnsEndPoint("redis1.example.com", 6379),
        new DnsEndPoint("redis2.example.com", 6379)
    };
    
    var factory = RedLockFactory.Create(endPoint);
    return await factory.CreateLockAsync(
        resource, 
        TimeSpan.FromSeconds(30),
        TimeSpan.FromSeconds(10),
        TimeSpan.FromSeconds(1));
}

优势:更高的可靠性(N/2+1节点共识)
代价:实现复杂度上升,性能下降约30%

四、基于ZooKeeper的分布式锁

// 使用ZooKeeperNetEx库
public class ZooKeeperLock : IDisposable
{
    private readonly ZooKeeper _zk;
    private string _lockPath;
    
    public async Task<bool> AcquireAsync(string lockName)
    {
        _lockPath = await _zk.CreateAsync(
            $"/locks/{lockName}-", 
            Array.Empty<byte>(),
            Ids.OPEN_ACL_UNSAFE,
            CreateMode.EphemeralSequential);
            
        // 获取所有兄弟节点
        var siblings = await _zk.GetChildrenAsync("/locks", false);
        var orderedNodes = siblings
            .Where(p => p.StartsWith(lockName))
            .OrderBy(p => p).ToList();
            
        // 判断自己是否是最小节点
        var currentIndex = orderedNodes.IndexOf(_lockPath.Split('/').Last());
        return currentIndex == 0;
    }
    
    public void Dispose()
    {
        _zk.DeleteAsync(_lockPath).Wait();
    }
}

特点

  • 通过临时顺序节点实现公平锁
  • 自带Watch机制可实现阻塞等待
  • 适合长事务场景

五、方案选型决策树

  1. 数据库锁:遗留系统改造,事务一致性要求极高
  2. Redis锁:高并发短事务,允许偶尔失效(秒杀场景)
  3. ZooKeeper锁:需要严格顺序执行的长事务(订单结算)

六、避坑指南

  1. 锁粒度要适中:太细导致频繁争用,太粗降低并发
  2. 必须处理锁超时:建议设置自动续期(如Redisson的WatchDog)
  3. 避免递归获取锁:容易导致死锁
  4. 监控锁等待时间:超过200ms就需要预警

七、最佳实践示例

// 综合Redis锁+Polly重试的完整示例
public async Task<string> ProcessWithRetryLock(string businessKey)
{
    var policy = Policy.Handle<Exception>()
        .WaitAndRetryAsync(3, retryAttempt => 
            TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
            
    return await policy.ExecuteAsync(async () => 
    {
        var lockValue = Guid.NewGuid().ToString();
        if (!await AcquireLockAsync(redisDb, businessKey, lockValue, TimeSpan.FromSeconds(30)))
            throw new LockTimeoutException();
            
        try 
        {
            // 业务处理
            return await _service.DoBusiness(businessKey);
        }
        finally 
        {
            await ReleaseLockAsync(redisDb, businessKey, lockValue);
        }
    });
}

八、未来演进方向

  1. 混合锁策略:Redis+数据库双重校验
  2. 分片锁优化:按业务键哈希到不同节点
  3. 无锁化设计:改用事件溯源(Event Sourcing)模式

在实际项目中,没有银弹方案。最近我们有个电商项目,在压测时发现Redis锁在跨机房场景下存在时钟漂移问题,最终采用RedLock+本地缓存降级的混合方案才解决。建议大家根据实际业务特点做技术选型,记住:适合的才是最好的。