在分布式系统中,消息队列是一种常用的异步通信机制,它可以帮助我们解耦系统组件、提高系统的可伸缩性和可靠性。然而,在消息队列的使用过程中,消费端可能会遇到消息重复消费的问题,这就需要我们对消息进行幂等性处理。接下来,我们就来详细聊聊基于消息 ID 去重与业务唯一键校验的方案。

一、应用场景

1.1 电商系统中的订单处理

想象一下,在电商系统里,当用户下单后,系统会发送一条消息到消息队列,消费端接收到消息后会进行订单创建的操作。但由于网络波动、消费端故障等原因,这条消息可能会被重复发送到消费端,导致订单被重复创建。这显然是我们不希望看到的,因为这会造成数据的混乱和业务的错误处理。此时,我们就需要对消费端进行幂等性处理,确保同一个订单消息只被处理一次。

1.2 金融系统中的交易处理

在金融系统中,交易的准确性和一致性至关重要。当用户发起一笔转账交易时,系统会将交易信息发送到消息队列,消费端负责处理这笔交易。如果消息重复消费,就可能导致用户账户资金被重复扣除或增加,这会引发严重的财务问题。因此,在这种场景下,幂等性处理是必不可少的。

二、基于消息 ID 去重方案

2.1 方案原理

消息 ID 是消息队列中每条消息的唯一标识符。我们可以在消费端维护一个已处理消息 ID 的集合,当接收到一条新消息时,首先检查该消息的 ID 是否已经在集合中。如果存在,说明该消息已经被处理过,直接忽略;如果不存在,则处理该消息,并将该消息的 ID 添加到集合中。

2.2 示例代码(使用 Java 和 Redis)

import redis.clients.jedis.Jedis;

public class MessageConsumer {
    private Jedis jedis;
    private static final String PROCESSED_MESSAGE_SET = "processed_messages";

    public MessageConsumer() {
        // 初始化 Redis 连接
        this.jedis = new Jedis("localhost", 6379);
    }

    public void consumeMessage(String messageId, String message) {
        // 检查消息 ID 是否已经处理过
        if (jedis.sismember(PROCESSED_MESSAGE_SET, messageId)) {
            System.out.println("消息 " + messageId + " 已经处理过,忽略。");
            return;
        }

        try {
            // 处理消息
            System.out.println("处理消息:" + message);

            // 模拟业务处理
            Thread.sleep(100);

            // 将消息 ID 添加到已处理集合中
            jedis.sadd(PROCESSED_MESSAGE_SET, messageId);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        MessageConsumer consumer = new MessageConsumer();
        // 模拟接收到的消息
        String messageId = "12345";
        String message = "这是一条测试消息";
        consumer.consumeMessage(messageId, message);
        // 模拟重复消息
        consumer.consumeMessage(messageId, message);
    }
}

代码解释

  • Jedis 是 Java 操作 Redis 的客户端,通过 Jedis 我们可以与 Redis 进行交互。
  • PROCESSED_MESSAGE_SET 是一个 Redis 的集合,用于存储已经处理过的消息 ID。
  • consumeMessage 方法首先检查消息 ID 是否在集合中,如果存在则忽略该消息,否则处理消息并将消息 ID 添加到集合中。

2.3 优缺点分析

优点

  • 实现简单,只需要维护一个消息 ID 集合,通过 Redis 的集合操作可以快速判断消息是否已经处理过。
  • 性能较高,Redis 的操作是基于内存的,速度非常快。

缺点

  • 依赖 Redis 等外部存储系统,如果 Redis 出现故障,可能会影响幂等性处理的结果。
  • 消息 ID 必须在消息队列中是唯一的,如果消息 ID 不唯一,幂等性处理就会失效。

三、基于业务唯一键校验方案

3.1 方案原理

业务唯一键是指业务系统中具有唯一性的标识,例如订单号、交易 ID 等。在消费端处理消息时,我们可以根据业务唯一键来判断该消息是否已经处理过。具体做法是,在数据库中创建一个记录已处理业务唯一键的表,当接收到消息时,根据业务唯一键查询该表,如果存在记录,说明该消息已经处理过,直接忽略;如果不存在,则处理该消息,并在表中插入一条记录。

3.2 示例代码(使用 Java 和 MySQL)

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class BusinessMessageConsumer {
    private static final String DB_URL = "jdbc:mysql://localhost:3306/test";
    private static final String DB_USER = "root";
    private static final String DB_PASSWORD = "password";
    private static final String CHECK_SQL = "SELECT COUNT(*) FROM processed_business_keys WHERE business_key = ?";
    private static final String INSERT_SQL = "INSERT INTO processed_business_keys (business_key) VALUES (?)";

    public void consumeMessage(String businessKey, String message) {
        try (Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
             PreparedStatement checkStmt = conn.prepareStatement(CHECK_SQL);
             PreparedStatement insertStmt = conn.prepareStatement(INSERT_SQL)) {

            // 检查业务唯一键是否已经处理过
            checkStmt.setString(1, businessKey);
            ResultSet rs = checkStmt.executeQuery();
            if (rs.next() && rs.getInt(1) > 0) {
                System.out.println("业务唯一键 " + businessKey + " 已经处理过,忽略。");
                return;
            }

            // 处理消息
            System.out.println("处理消息:" + message);

            // 模拟业务处理
            Thread.sleep(100);

            // 插入业务唯一键记录
            insertStmt.setString(1, businessKey);
            insertStmt.executeUpdate();
        } catch (SQLException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        BusinessMessageConsumer consumer = new BusinessMessageConsumer();
        // 模拟接收到的消息
        String businessKey = "order_123";
        String message = "这是一条业务消息";
        consumer.consumeMessage(businessKey, message);
        // 模拟重复消息
        consumer.consumeMessage(businessKey, message);
    }
}

代码解释

  • 首先,我们通过 JDBC 连接到 MySQL 数据库。
  • CHECK_SQL 用于检查业务唯一键是否已经存在于 processed_business_keys 表中。
  • INSERT_SQL 用于在处理完消息后,将业务唯一键插入到表中。

3.3 优缺点分析

优点

  • 与业务紧密结合,使用业务唯一键进行校验更符合业务逻辑。
  • 数据持久化,即使系统重启,也能保证幂等性处理的结果。

缺点

  • 数据库操作会引入一定的性能开销,尤其是在高并发场景下。
  • 需要维护一个额外的数据库表,增加了系统的复杂度。

四、两种方案的结合使用

在实际应用中,我们可以将基于消息 ID 去重和基于业务唯一键校验的方案结合使用,以提高幂等性处理的可靠性和性能。具体做法是,首先通过消息 ID 进行快速去重,如果消息 ID 不存在,则再根据业务唯一键进行校验。

示例代码

import redis.clients.jedis.Jedis;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class CombinedMessageConsumer {
    private Jedis jedis;
    private static final String PROCESSED_MESSAGE_SET = "processed_messages";
    private static final String DB_URL = "jdbc:mysql://localhost:3306/test";
    private static final String DB_USER = "root";
    private static final String DB_PASSWORD = "password";
    private static final String CHECK_SQL = "SELECT COUNT(*) FROM processed_business_keys WHERE business_key = ?";
    private static final String INSERT_SQL = "INSERT INTO processed_business_keys (business_key) VALUES (?)";

    public CombinedMessageConsumer() {
        this.jedis = new Jedis("localhost", 6379);
    }

    public void consumeMessage(String messageId, String businessKey, String message) {
        // 首先通过消息 ID 进行去重
        if (jedis.sismember(PROCESSED_MESSAGE_SET, messageId)) {
            System.out.println("消息 " + messageId + " 已经处理过,忽略。");
            return;
        }

        try (Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
             PreparedStatement checkStmt = conn.prepareStatement(CHECK_SQL);
             PreparedStatement insertStmt = conn.prepareStatement(INSERT_SQL)) {

            // 再根据业务唯一键进行校验
            checkStmt.setString(1, businessKey);
            ResultSet rs = checkStmt.executeQuery();
            if (rs.next() && rs.getInt(1) > 0) {
                System.out.println("业务唯一键 " + businessKey + " 已经处理过,忽略。");
                // 将消息 ID 添加到已处理集合中
                jedis.sadd(PROCESSED_MESSAGE_SET, messageId);
                return;
            }

            // 处理消息
            System.out.println("处理消息:" + message);

            // 模拟业务处理
            Thread.sleep(100);

            // 插入业务唯一键记录
            insertStmt.setString(1, businessKey);
            insertStmt.executeUpdate();

            // 将消息 ID 添加到已处理集合中
            jedis.sadd(PROCESSED_MESSAGE_SET, messageId);
        } catch (SQLException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        CombinedMessageConsumer consumer = new CombinedMessageConsumer();
        // 模拟接收到的消息
        String messageId = "12345";
        String businessKey = "order_123";
        String message = "这是一条测试消息";
        consumer.consumeMessage(messageId, businessKey, message);
        // 模拟重复消息
        consumer.consumeMessage(messageId, businessKey, message);
    }
}

代码解释

  • 首先通过 Redis 的集合检查消息 ID 是否已经处理过,如果处理过则直接忽略。
  • 若消息 ID 未处理过,再通过 MySQL 数据库检查业务唯一键是否已经处理过。
  • 处理完消息后,同时将消息 ID 添加到 Redis 集合和将业务唯一键插入到数据库表中。

五、注意事项

5.1 消息 ID 和业务唯一键的生成

消息 ID 必须在消息队列中是唯一的,通常可以由消息队列系统自动生成。业务唯一键需要根据业务逻辑来生成,确保在业务系统中具有唯一性。

5.2 并发处理

在高并发场景下,可能会出现多个消费端同时处理同一条消息的情况。因此,在进行幂等性处理时,需要考虑并发控制。例如,在使用 Redis 时,可以使用 Redis 的原子操作来确保并发安全;在使用数据库时,可以使用数据库的事务和锁机制。

5.3 数据清理

随着时间的推移,已处理消息 ID 和业务唯一键的记录会越来越多,这会占用大量的存储空间。因此,需要定期对这些记录进行清理。例如,对于 Redis 集合,可以设置过期时间;对于数据库表,可以定期删除过期的记录。

六、文章总结

消息队列消费端的幂等性处理是分布式系统中一个重要的问题,基于消息 ID 去重和业务唯一键校验的方案可以有效地解决消息重复消费的问题。基于消息 ID 去重方案实现简单、性能较高,但依赖外部存储系统;基于业务唯一键校验方案与业务紧密结合、数据持久化,但会引入数据库操作的性能开销。在实际应用中,我们可以将两种方案结合使用,以提高幂等性处理的可靠性和性能。同时,在实施过程中,需要注意消息 ID 和业务唯一键的生成、并发处理和数据清理等问题。