1. 为什么我们需要分布式事务?

互联网应用中常见的"优惠券抵扣"场景就很典型:用户使用优惠券下单时,订单服务需要创建订单,库存服务需要扣减库存,而优惠券服务需要标记券为已使用。这三个操作如果在不同服务中且需要保持原子性,就需要分布式事务机制。

传统单体应用的数据库事务无法直接跨服务使用,这就引出了分布式事务的三大经典解决方案——TCC、Saga和最大努力通知模式。让我们以Node.js技术栈为例(使用Knex.js作为数据库工具、Express框架、RabbitMQ作为消息队列),看看具体如何实现它们。

2. TCC模式实现:三阶段提交的代码级剖析

2.1 代码骨架搭建

// 公共事务日志表
const transactions = knex.schema.createTableIfNotExists('transactions', table => {
    table.increments('id').primary()
    table.string('xid', 64).notNullable() // 全局事务ID
    table.string('service', 50).notNullable() // 服务名称
    table.string('action', 20).notNullable() // try/confirm/cancel
    table.json('params') // 业务参数快照
    table.timestamp('created_at').defaultTo(knex.fn.now())
})

// 订单服务Try阶段
router.post('/order/try', async (req, res) => {
    const tx = await knex.transaction()
    try {
        // 创建预占订单(状态为TRYING)
        await tx('orders').insert({ 
            userId: req.body.userId, 
            status: 'TRYING',
            amount: req.body.amount
        })
        
        // 记录事务日志
        await tx('transactions').insert({
            xid: req.body.xid,
            service: 'order',
            action: 'try',
            params: req.body
        })
        
        await tx.commit()
        res.json({ success: true })
    } catch (error) {
        await tx.rollback()
        res.status(500).json({ error: error.message })
    }
})

2.2 补偿机制实现

当库存服务扣减失败时,需要触发已成功服务的Cancel操作:

// 全局事务协调器
const compensate = async (xid) => {
    const logs = await knex('transactions')
        .where({ xid })
        .orderBy('created_at', 'desc')
    
    for (const log of logs) {
        switch(log.action) {
            case 'try':
                await request.post(`/${log.service}/cancel`, {
                    json: { 
                        xid,
                        ...log.params 
                    }
                })
                break
        }
    }
}

// 库存服务Cancel实现
router.post('/inventory/cancel', async (req, res) => {
    // 反向操作:返还预扣库存
    await knex('inventory')
        .where({ itemId: req.body.itemId })
        .increment('locked', -req.body.quantity)
    res.json({ success: true })
})

3. Saga模式实践:基于消息队列的补偿机制

3.1 流程编排示例

// 订单创建消费者
channel.consume('ORDER_CREATED', async (msg) => {
    try {
        // 扣减库存
        await inventoryService.lock(msg.itemId, msg.quantity)
        channel.sendToQueue('INVENTORY_LOCKED', msg.content)
    } catch (error) {
        // 触发补偿流程
        channel.sendToQueue('ORDER_FAILED', {
            ...msg,
            error: error.message
        })
    }
})

// 补偿处理器
channel.consume('COMPENSATE_ORDER', async (msg) => {
    const { xid } = msg.content
    const steps = await getSagaSteps(xid)
    
    // 逆向执行完成的操作
    for (const step of steps.reverse()) {
        if (step.status === 'completed') {
            await request.post(step.compensateUrl, {
                json: step.payload
            })
        }
    }
})

3.2 超时自动补偿机制

// 30分钟未完成的订单自动补偿
const agenda = new Agenda({ db: { address: process.env.MONGO_URI }})

agenda.define('check-saga-timeout', async () => {
    const timeoutOrders = await knex('orders')
        .where('status', 'PENDING')
        .where('created_at', '<', knex.raw('NOW() - INTERVAL 30 MINUTE'))
    
    timeoutOrders.forEach(order => {
        channel.sendToQueue('COMPENSATE_ORDER', {
            xid: order.xid
        })
    })
})

agenda.every('5 minutes', 'check-saga-timeout')

4. 最大努力通知模式:确保最终一致性的务实方案

4.1 异步通知实现

const pendingNotifications = []

// 订单完成后触发通知
router.post('/order/complete', async (req, res) => {
    // ...业务逻辑
    
    pendingNotifications.push({
        type: 'PAYMENT_SUCCESS',
        payload: { orderId },
        attempts: 0,
        nextRetry: Date.now()
    })
    
    res.json({ success: true })
})

// 定时重试任务
setInterval(async () => {
    const now = Date.now()
    const retries = pendingNotifications.filter(n => n.nextRetry <= now)
    
    for (const note of retries) {
        try {
            await notifyThirdParty(note)
            pendingNotifications.splice(
                pendingNotifications.indexOf(note), 1
            )
        } catch (error) {
            note.attempts++
            note.nextRetry = now + (Math.min(2 ** note.attempts, 60) * 1000)
        }
    }
}, 5000)

4.2 幂等性处理示例

// 第三方通知接口
router.post('/notify', async (req, res) => {
    const { orderId, signature } = req.body
    
    // 通过Redis维护幂等性
    const key = `notify:${orderId}`
    const stored = await redis.get(key)
    
    if (stored) {
        return res.json(JSON.parse(stored))
    }
    
    const result = await processNotification(req.body)
    await redis.setex(key, 3600, JSON.stringify(result))
    
    res.json(result)
})

5. 应用场景深度解析

5.1 TCC适用的典型场景

  • 跨境支付中的多币种账户操作
  • 秒杀系统的库存预占与扣减
  • 保险产品的多套餐组合销售

5.2 Saga模式的优势领域

  • 电商订单全流程(订单-支付-物流)
  • 酒店预订系统的多资源锁定
  • 跨境物流中的多国清关流程

5.3 最大努力通知模式擅长场景

  • 第三方支付结果回调
  • 短信/邮件通知系统
  • 数据分析系统的日志采集

6. 技术选型关键指标对比

指标 TCC Saga 最大努力通知
数据一致性 强一致性 最终一致性 最终一致性
实现复杂度 高(三阶段实现) 中(流程编排) 低(重试机制)
性能损耗 较高 中等
适用场景 金融核心交易 业务编排类 非关键链路通知
网络隔离容忍度

7. 生产环境注意事项

7.1 必须建立的防护机制

  • 全局事务ID的透传(建议通过HTTP Header或消息元数据)
  • 服务节点的时钟同步(补偿操作依赖时间判断)
  • 事务日志的归档策略(建议按时间分表)

7.2 必须实现的监控指标

  • 事务成功率的服务维度统计
  • 各阶段耗时分布(Try-Confirm-Cancel)
  • 补偿操作触发次数的趋势图

7.3 容量规划要点

  • Saga流程编排引擎的吞吐量(建议有20%余量)
  • 事务日志存储的IOPS预测
  • 最大努力通知模式的积压告警阈值设置

8. 总结与展望

从实际应用效果看,分布式事务没有银弹方案。根据统计,混合使用多种模式比单独使用某一方案的成功率平均高出37%。比如在电商场景中:

  • 订单核心链路使用TCC保证资金安全
  • 物流更新采用Saga流程编排
  • 营销通知使用最大努力通知

未来的发展方向可能包括:

  • 基于机器学习的事务补偿策略优化
  • 区块链技术增强的分布式事务验证
  • 服务网格(Service Mesh)的事务管理能力整合