1. 为什么我们需要分布式事务?
互联网应用中常见的"优惠券抵扣"场景就很典型:用户使用优惠券下单时,订单服务需要创建订单,库存服务需要扣减库存,而优惠券服务需要标记券为已使用。这三个操作如果在不同服务中且需要保持原子性,就需要分布式事务机制。
传统单体应用的数据库事务无法直接跨服务使用,这就引出了分布式事务的三大经典解决方案——TCC、Saga和最大努力通知模式。让我们以Node.js技术栈为例(使用Knex.js作为数据库工具、Express框架、RabbitMQ作为消息队列),看看具体如何实现它们。
2. TCC模式实现:三阶段提交的代码级剖析
2.1 代码骨架搭建
// 公共事务日志表
const transactions = knex.schema.createTableIfNotExists('transactions', table => {
table.increments('id').primary()
table.string('xid', 64).notNullable() // 全局事务ID
table.string('service', 50).notNullable() // 服务名称
table.string('action', 20).notNullable() // try/confirm/cancel
table.json('params') // 业务参数快照
table.timestamp('created_at').defaultTo(knex.fn.now())
})
// 订单服务Try阶段
router.post('/order/try', async (req, res) => {
const tx = await knex.transaction()
try {
// 创建预占订单(状态为TRYING)
await tx('orders').insert({
userId: req.body.userId,
status: 'TRYING',
amount: req.body.amount
})
// 记录事务日志
await tx('transactions').insert({
xid: req.body.xid,
service: 'order',
action: 'try',
params: req.body
})
await tx.commit()
res.json({ success: true })
} catch (error) {
await tx.rollback()
res.status(500).json({ error: error.message })
}
})
2.2 补偿机制实现
当库存服务扣减失败时,需要触发已成功服务的Cancel操作:
// 全局事务协调器
const compensate = async (xid) => {
const logs = await knex('transactions')
.where({ xid })
.orderBy('created_at', 'desc')
for (const log of logs) {
switch(log.action) {
case 'try':
await request.post(`/${log.service}/cancel`, {
json: {
xid,
...log.params
}
})
break
}
}
}
// 库存服务Cancel实现
router.post('/inventory/cancel', async (req, res) => {
// 反向操作:返还预扣库存
await knex('inventory')
.where({ itemId: req.body.itemId })
.increment('locked', -req.body.quantity)
res.json({ success: true })
})
3. Saga模式实践:基于消息队列的补偿机制
3.1 流程编排示例
// 订单创建消费者
channel.consume('ORDER_CREATED', async (msg) => {
try {
// 扣减库存
await inventoryService.lock(msg.itemId, msg.quantity)
channel.sendToQueue('INVENTORY_LOCKED', msg.content)
} catch (error) {
// 触发补偿流程
channel.sendToQueue('ORDER_FAILED', {
...msg,
error: error.message
})
}
})
// 补偿处理器
channel.consume('COMPENSATE_ORDER', async (msg) => {
const { xid } = msg.content
const steps = await getSagaSteps(xid)
// 逆向执行完成的操作
for (const step of steps.reverse()) {
if (step.status === 'completed') {
await request.post(step.compensateUrl, {
json: step.payload
})
}
}
})
3.2 超时自动补偿机制
// 30分钟未完成的订单自动补偿
const agenda = new Agenda({ db: { address: process.env.MONGO_URI }})
agenda.define('check-saga-timeout', async () => {
const timeoutOrders = await knex('orders')
.where('status', 'PENDING')
.where('created_at', '<', knex.raw('NOW() - INTERVAL 30 MINUTE'))
timeoutOrders.forEach(order => {
channel.sendToQueue('COMPENSATE_ORDER', {
xid: order.xid
})
})
})
agenda.every('5 minutes', 'check-saga-timeout')
4. 最大努力通知模式:确保最终一致性的务实方案
4.1 异步通知实现
const pendingNotifications = []
// 订单完成后触发通知
router.post('/order/complete', async (req, res) => {
// ...业务逻辑
pendingNotifications.push({
type: 'PAYMENT_SUCCESS',
payload: { orderId },
attempts: 0,
nextRetry: Date.now()
})
res.json({ success: true })
})
// 定时重试任务
setInterval(async () => {
const now = Date.now()
const retries = pendingNotifications.filter(n => n.nextRetry <= now)
for (const note of retries) {
try {
await notifyThirdParty(note)
pendingNotifications.splice(
pendingNotifications.indexOf(note), 1
)
} catch (error) {
note.attempts++
note.nextRetry = now + (Math.min(2 ** note.attempts, 60) * 1000)
}
}
}, 5000)
4.2 幂等性处理示例
// 第三方通知接口
router.post('/notify', async (req, res) => {
const { orderId, signature } = req.body
// 通过Redis维护幂等性
const key = `notify:${orderId}`
const stored = await redis.get(key)
if (stored) {
return res.json(JSON.parse(stored))
}
const result = await processNotification(req.body)
await redis.setex(key, 3600, JSON.stringify(result))
res.json(result)
})
5. 应用场景深度解析
5.1 TCC适用的典型场景
- 跨境支付中的多币种账户操作
- 秒杀系统的库存预占与扣减
- 保险产品的多套餐组合销售
5.2 Saga模式的优势领域
- 电商订单全流程(订单-支付-物流)
- 酒店预订系统的多资源锁定
- 跨境物流中的多国清关流程
5.3 最大努力通知模式擅长场景
- 第三方支付结果回调
- 短信/邮件通知系统
- 数据分析系统的日志采集
6. 技术选型关键指标对比
指标 | TCC | Saga | 最大努力通知 |
---|---|---|---|
数据一致性 | 强一致性 | 最终一致性 | 最终一致性 |
实现复杂度 | 高(三阶段实现) | 中(流程编排) | 低(重试机制) |
性能损耗 | 较高 | 中等 | 低 |
适用场景 | 金融核心交易 | 业务编排类 | 非关键链路通知 |
网络隔离容忍度 | 低 | 中 | 高 |
7. 生产环境注意事项
7.1 必须建立的防护机制
- 全局事务ID的透传(建议通过HTTP Header或消息元数据)
- 服务节点的时钟同步(补偿操作依赖时间判断)
- 事务日志的归档策略(建议按时间分表)
7.2 必须实现的监控指标
- 事务成功率的服务维度统计
- 各阶段耗时分布(Try-Confirm-Cancel)
- 补偿操作触发次数的趋势图
7.3 容量规划要点
- Saga流程编排引擎的吞吐量(建议有20%余量)
- 事务日志存储的IOPS预测
- 最大努力通知模式的积压告警阈值设置
8. 总结与展望
从实际应用效果看,分布式事务没有银弹方案。根据统计,混合使用多种模式比单独使用某一方案的成功率平均高出37%。比如在电商场景中:
- 订单核心链路使用TCC保证资金安全
- 物流更新采用Saga流程编排
- 营销通知使用最大努力通知
未来的发展方向可能包括:
- 基于机器学习的事务补偿策略优化
- 区块链技术增强的分布式事务验证
- 服务网格(Service Mesh)的事务管理能力整合