一、引言

在当今的软件开发领域,微服务架构已经成为了一种主流的架构模式。它将一个大型的单体应用拆分成多个小型的、自治的服务,每个服务都可以独立开发、部署和扩展。这种架构模式带来了很多好处,比如提高了开发效率、增强了系统的可维护性和可扩展性等。然而,微服务架构也带来了一些挑战,其中分布式事务处理就是一个比较棘手的问题。在分布式系统中,不同的服务可能运行在不同的节点上,使用不同的数据库,如何保证这些服务之间的数据一致性就成了一个难题。接下来,我们就一起来探讨一下在 Java 微服务架构下分布式事务处理的可靠方案。

二、应用场景

2.1 电商系统

在电商系统中,一个订单的创建可能会涉及到多个服务,比如订单服务、库存服务、支付服务等。当用户下单时,订单服务需要创建订单记录,库存服务需要扣减相应商品的库存,支付服务需要处理用户的支付。这一系列操作必须保证要么全部成功,要么全部失败。如果在扣减库存成功后,支付服务出现故障,导致支付失败,那么就需要回滚前面的操作,恢复库存,否则就会出现数据不一致的情况,比如商品被超卖。

2.2 金融系统

金融系统对数据的一致性要求非常高。例如,在进行资金转账时,会涉及到转出账户服务和转入账户服务。转出账户服务需要减少转出账户的余额,转入账户服务需要增加转入账户的余额。这两个操作必须同时成功或者同时失败,否则就会导致资金的错误流动,给用户带来损失。

三、常见的分布式事务处理方案

3.1 两阶段提交协议(2PC)

3.1.1 原理

两阶段提交协议是一种经典的分布式事务处理协议,它分为两个阶段:准备阶段和提交阶段。在准备阶段,协调者向所有参与者发送准备请求,参与者收到请求后,执行事务操作,但不提交,然后向协调者反馈是否可以提交事务。在提交阶段,如果所有参与者都反馈可以提交,那么协调者向所有参与者发送提交请求,参与者执行提交操作;如果有任何一个参与者反馈不能提交,那么协调者向所有参与者发送回滚请求,参与者执行回滚操作。

3.1.2 示例(使用 Java 和 JTA)
import javax.transaction.*;
import javax.transaction.xa.*;
import com.mysql.cj.jdbc.MysqlXADataSource;
import javax.sql.DataSource;

// 模拟两阶段提交协议
public class TwoPhaseCommitExample {
    public static void main(String[] args) throws Exception {
        // 创建数据源
        MysqlXADataSource dataSource1 = new MysqlXADataSource();
        dataSource1.setURL("jdbc:mysql://localhost:3306/db1");
        dataSource1.setUser("root");
        dataSource1.setPassword("password");

        MysqlXADataSource dataSource2 = new MysqlXADataSource();
        dataSource2.setURL("jdbc:mysql://localhost:3306/db2");
        dataSource2.setUser("root");
        dataSource2.setPassword("password");

        // 获取 XA 连接
        XAConnection xaConnection1 = dataSource1.getXAConnection();
        XAResource xaResource1 = xaConnection1.getXAResource();
        XAConnection xaConnection2 = dataSource2.getXAConnection();
        XAResource xaResource2 = xaConnection2.getXAResource();

        // 创建事务管理器
        TransactionManager tm = com.atomikos.icatch.jta.UserTransactionManager();

        // 开始事务
        tm.begin();

        // 准备事务分支
        Xid xid1 = new XidImpl(1);
        Xid xid2 = new XidImpl(2);
        xaResource1.start(xid1, XAResource.TMNOFLAGS);
        // 执行数据库操作,例如插入数据
        // ...
        xaResource1.end(xid1, XAResource.TMSUCCESS);
        xaResource2.start(xid2, XAResource.TMNOFLAGS);
        // 执行数据库操作,例如更新数据
        // ...
        xaResource2.end(xid2, XAResource.TMSUCCESS);

        // 准备阶段
        int prepare1 = xaResource1.prepare(xid1);
        int prepare2 = xaResource2.prepare(xid2);

        if (prepare1 == XAResource.XA_OK && prepare2 == XAResource.XA_OK) {
            // 提交阶段
            xaResource1.commit(xid1, false);
            xaResource2.commit(xid2, false);
            tm.commit();
        } else {
            // 回滚阶段
            xaResource1.rollback(xid1);
            xaResource2.rollback(xid2);
            tm.rollback();
        }

        // 关闭连接
        xaConnection1.close();
        xaConnection2.close();
    }
}

// 自定义 Xid 实现
class XidImpl implements Xid {
    private int formatId;
    private byte[] globalTransactionId;
    private byte[] branchQualifier;

    public XidImpl(int formatId) {
        this.formatId = formatId;
        this.globalTransactionId = new byte[64];
        this.branchQualifier = new byte[64];
    }

    @Override
    public int getFormatId() {
        return formatId;
    }

    @Override
    public byte[] getGlobalTransactionId() {
        return globalTransactionId;
    }

    @Override
    public byte[] getBranchQualifier() {
        return branchQualifier;
    }
}

注释

  • MysqlXADataSource:用于创建支持 XA 协议的 MySQL 数据源。
  • XAConnectionXAResource:用于与数据库进行两阶段提交交互。
  • TransactionManager:负责管理整个事务的生命周期。
  • xaResource.startxaResource.end:用于开始和结束事务分支。
  • xaResource.prepare:进行准备阶段的操作。
  • xaResource.commitxaResource.rollback:分别进行提交和回滚操作。
3.1.3 优缺点
  • 优点:保证了分布式事务的强一致性,所有参与者要么全部提交,要么全部回滚。
  • 缺点:存在单点故障问题,如果协调者出现故障,整个事务可能会陷入阻塞状态;性能较低,因为需要进行多次网络通信和等待参与者的响应。
3.1.4 注意事项
  • 协调者需要具备高可用性,可以采用集群或者备份的方式来保证。
  • 参与者的响应时间可能会影响整个事务的性能,需要对系统进行性能优化。

3.2 补偿事务(TCC)

3.2.1 原理

补偿事务(TCC)将一个业务操作拆分成三个阶段:Try、Confirm 和 Cancel。Try 阶段主要是对业务资源进行预留,比如预留库存、冻结资金等;Confirm 阶段是在 Try 阶段成功的基础上,对预留的资源进行实际的操作,比如扣减库存、划账等;Cancel 阶段是在 Try 阶段失败或者出现异常时,对已经预留的资源进行释放,比如恢复库存、解冻资金等。

3.2.2 示例(使用 Java)
// 库存服务接口
interface InventoryService {
    // Try 阶段,预留库存
    boolean tryReserveInventory(int productId, int quantity);
    // Confirm 阶段,扣减库存
    boolean confirmReserveInventory(int productId, int quantity);
    // Cancel 阶段,释放库存
    boolean cancelReserveInventory(int productId, int quantity);
}

// 库存服务实现
class InventoryServiceImpl implements InventoryService {
    @Override
    public boolean tryReserveInventory(int productId, int quantity) {
        // 模拟预留库存
        System.out.println("Try to reserve inventory for product " + productId + " with quantity " + quantity);
        return true;
    }

    @Override
    public boolean confirmReserveInventory(int productId, int quantity) {
        // 模拟扣减库存
        System.out.println("Confirm to deduct inventory for product " + productId + " with quantity " + quantity);
        return true;
    }

    @Override
    public boolean cancelReserveInventory(int productId, int quantity) {
        // 模拟释放库存
        System.out.println("Cancel to release inventory for product " + productId + " with quantity " + quantity);
        return true;
    }
}

// 订单服务
class OrderService {
    private InventoryService inventoryService;

    public OrderService(InventoryService inventoryService) {
        this.inventoryService = inventoryService;
    }

    public boolean createOrder(int productId, int quantity) {
        // Try 阶段
        if (!inventoryService.tryReserveInventory(productId, quantity)) {
            return false;
        }

        try {
            // 模拟创建订单操作
            System.out.println("Create order for product " + productId + " with quantity " + quantity);
            // Confirm 阶段
            return inventoryService.confirmReserveInventory(productId, quantity);
        } catch (Exception e) {
            // Cancel 阶段
            inventoryService.cancelReserveInventory(productId, quantity);
            return false;
        }
    }
}

// 测试类
public class TCCExample {
    public static void main(String[] args) {
        InventoryService inventoryService = new InventoryServiceImpl();
        OrderService orderService = new OrderService(inventoryService);
        boolean result = orderService.createOrder(1, 10);
        System.out.println("Order creation result: " + result);
    }
}

注释

  • InventoryService 接口定义了库存服务的三个阶段的方法。
  • InventoryServiceImpl 实现了库存服务的具体逻辑。
  • OrderService 调用库存服务的方法来完成订单的创建,在不同阶段调用相应的库存服务方法。
3.2.3 优缺点
  • 优点:性能较高,因为不需要像 2PC 那样进行多次阻塞等待;可以实现最终一致性,在一定时间内保证数据的一致性。
  • 缺点:开发成本较高,需要开发者手动实现 Try、Confirm 和 Cancel 三个阶段的逻辑;补偿操作可能会比较复杂,需要考虑各种异常情况。
3.2.4 注意事项
  • Try 阶段的预留操作需要保证幂等性,即多次调用的结果是一样的。
  • Confirm 和 Cancel 阶段也需要保证幂等性,避免重复操作导致数据不一致。

四、总结

在 Java 微服务架构下,分布式事务处理是一个必须要解决的问题。不同的分布式事务处理方案有不同的优缺点和适用场景。两阶段提交协议(2PC)可以保证强一致性,但性能较低,存在单点故障问题;补偿事务(TCC)性能较高,可以实现最终一致性,但开发成本较高。在实际应用中,我们需要根据具体的业务场景和性能要求来选择合适的方案。同时,在使用这些方案时,也需要注意一些事项,比如保证操作的幂等性、提高系统的可用性等。通过合理选择和使用分布式事务处理方案,可以有效地保证微服务架构下系统的数据一致性,提高系统的可靠性和稳定性。