在分布式系统中,消息队列是一种常用的异步通信机制,它可以帮助我们解耦系统组件、提高系统的可伸缩性和可靠性。然而,在消息队列的使用过程中,消费端可能会遇到消息重复消费的问题,这就需要我们对消息进行幂等性处理。接下来,我们就来详细聊聊基于消息 ID 去重与业务唯一键校验的方案。
一、应用场景
1.1 电商系统中的订单处理
想象一下,在电商系统里,当用户下单后,系统会发送一条消息到消息队列,消费端接收到消息后会进行订单创建的操作。但由于网络波动、消费端故障等原因,这条消息可能会被重复发送到消费端,导致订单被重复创建。这显然是我们不希望看到的,因为这会造成数据的混乱和业务的错误处理。此时,我们就需要对消费端进行幂等性处理,确保同一个订单消息只被处理一次。
1.2 金融系统中的交易处理
在金融系统中,交易的准确性和一致性至关重要。当用户发起一笔转账交易时,系统会将交易信息发送到消息队列,消费端负责处理这笔交易。如果消息重复消费,就可能导致用户账户资金被重复扣除或增加,这会引发严重的财务问题。因此,在这种场景下,幂等性处理是必不可少的。
二、基于消息 ID 去重方案
2.1 方案原理
消息 ID 是消息队列中每条消息的唯一标识符。我们可以在消费端维护一个已处理消息 ID 的集合,当接收到一条新消息时,首先检查该消息的 ID 是否已经在集合中。如果存在,说明该消息已经被处理过,直接忽略;如果不存在,则处理该消息,并将该消息的 ID 添加到集合中。
2.2 示例代码(使用 Java 和 Redis)
import redis.clients.jedis.Jedis;
public class MessageConsumer {
private Jedis jedis;
private static final String PROCESSED_MESSAGE_SET = "processed_messages";
public MessageConsumer() {
// 初始化 Redis 连接
this.jedis = new Jedis("localhost", 6379);
}
public void consumeMessage(String messageId, String message) {
// 检查消息 ID 是否已经处理过
if (jedis.sismember(PROCESSED_MESSAGE_SET, messageId)) {
System.out.println("消息 " + messageId + " 已经处理过,忽略。");
return;
}
try {
// 处理消息
System.out.println("处理消息:" + message);
// 模拟业务处理
Thread.sleep(100);
// 将消息 ID 添加到已处理集合中
jedis.sadd(PROCESSED_MESSAGE_SET, messageId);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
MessageConsumer consumer = new MessageConsumer();
// 模拟接收到的消息
String messageId = "12345";
String message = "这是一条测试消息";
consumer.consumeMessage(messageId, message);
// 模拟重复消息
consumer.consumeMessage(messageId, message);
}
}
代码解释
Jedis是 Java 操作 Redis 的客户端,通过Jedis我们可以与 Redis 进行交互。PROCESSED_MESSAGE_SET是一个 Redis 的集合,用于存储已经处理过的消息 ID。consumeMessage方法首先检查消息 ID 是否在集合中,如果存在则忽略该消息,否则处理消息并将消息 ID 添加到集合中。
2.3 优缺点分析
优点
- 实现简单,只需要维护一个消息 ID 集合,通过 Redis 的集合操作可以快速判断消息是否已经处理过。
- 性能较高,Redis 的操作是基于内存的,速度非常快。
缺点
- 依赖 Redis 等外部存储系统,如果 Redis 出现故障,可能会影响幂等性处理的结果。
- 消息 ID 必须在消息队列中是唯一的,如果消息 ID 不唯一,幂等性处理就会失效。
三、基于业务唯一键校验方案
3.1 方案原理
业务唯一键是指业务系统中具有唯一性的标识,例如订单号、交易 ID 等。在消费端处理消息时,我们可以根据业务唯一键来判断该消息是否已经处理过。具体做法是,在数据库中创建一个记录已处理业务唯一键的表,当接收到消息时,根据业务唯一键查询该表,如果存在记录,说明该消息已经处理过,直接忽略;如果不存在,则处理该消息,并在表中插入一条记录。
3.2 示例代码(使用 Java 和 MySQL)
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class BusinessMessageConsumer {
private static final String DB_URL = "jdbc:mysql://localhost:3306/test";
private static final String DB_USER = "root";
private static final String DB_PASSWORD = "password";
private static final String CHECK_SQL = "SELECT COUNT(*) FROM processed_business_keys WHERE business_key = ?";
private static final String INSERT_SQL = "INSERT INTO processed_business_keys (business_key) VALUES (?)";
public void consumeMessage(String businessKey, String message) {
try (Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
PreparedStatement checkStmt = conn.prepareStatement(CHECK_SQL);
PreparedStatement insertStmt = conn.prepareStatement(INSERT_SQL)) {
// 检查业务唯一键是否已经处理过
checkStmt.setString(1, businessKey);
ResultSet rs = checkStmt.executeQuery();
if (rs.next() && rs.getInt(1) > 0) {
System.out.println("业务唯一键 " + businessKey + " 已经处理过,忽略。");
return;
}
// 处理消息
System.out.println("处理消息:" + message);
// 模拟业务处理
Thread.sleep(100);
// 插入业务唯一键记录
insertStmt.setString(1, businessKey);
insertStmt.executeUpdate();
} catch (SQLException | InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
BusinessMessageConsumer consumer = new BusinessMessageConsumer();
// 模拟接收到的消息
String businessKey = "order_123";
String message = "这是一条业务消息";
consumer.consumeMessage(businessKey, message);
// 模拟重复消息
consumer.consumeMessage(businessKey, message);
}
}
代码解释
- 首先,我们通过 JDBC 连接到 MySQL 数据库。
CHECK_SQL用于检查业务唯一键是否已经存在于processed_business_keys表中。INSERT_SQL用于在处理完消息后,将业务唯一键插入到表中。
3.3 优缺点分析
优点
- 与业务紧密结合,使用业务唯一键进行校验更符合业务逻辑。
- 数据持久化,即使系统重启,也能保证幂等性处理的结果。
缺点
- 数据库操作会引入一定的性能开销,尤其是在高并发场景下。
- 需要维护一个额外的数据库表,增加了系统的复杂度。
四、两种方案的结合使用
在实际应用中,我们可以将基于消息 ID 去重和基于业务唯一键校验的方案结合使用,以提高幂等性处理的可靠性和性能。具体做法是,首先通过消息 ID 进行快速去重,如果消息 ID 不存在,则再根据业务唯一键进行校验。
示例代码
import redis.clients.jedis.Jedis;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class CombinedMessageConsumer {
private Jedis jedis;
private static final String PROCESSED_MESSAGE_SET = "processed_messages";
private static final String DB_URL = "jdbc:mysql://localhost:3306/test";
private static final String DB_USER = "root";
private static final String DB_PASSWORD = "password";
private static final String CHECK_SQL = "SELECT COUNT(*) FROM processed_business_keys WHERE business_key = ?";
private static final String INSERT_SQL = "INSERT INTO processed_business_keys (business_key) VALUES (?)";
public CombinedMessageConsumer() {
this.jedis = new Jedis("localhost", 6379);
}
public void consumeMessage(String messageId, String businessKey, String message) {
// 首先通过消息 ID 进行去重
if (jedis.sismember(PROCESSED_MESSAGE_SET, messageId)) {
System.out.println("消息 " + messageId + " 已经处理过,忽略。");
return;
}
try (Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
PreparedStatement checkStmt = conn.prepareStatement(CHECK_SQL);
PreparedStatement insertStmt = conn.prepareStatement(INSERT_SQL)) {
// 再根据业务唯一键进行校验
checkStmt.setString(1, businessKey);
ResultSet rs = checkStmt.executeQuery();
if (rs.next() && rs.getInt(1) > 0) {
System.out.println("业务唯一键 " + businessKey + " 已经处理过,忽略。");
// 将消息 ID 添加到已处理集合中
jedis.sadd(PROCESSED_MESSAGE_SET, messageId);
return;
}
// 处理消息
System.out.println("处理消息:" + message);
// 模拟业务处理
Thread.sleep(100);
// 插入业务唯一键记录
insertStmt.setString(1, businessKey);
insertStmt.executeUpdate();
// 将消息 ID 添加到已处理集合中
jedis.sadd(PROCESSED_MESSAGE_SET, messageId);
} catch (SQLException | InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
CombinedMessageConsumer consumer = new CombinedMessageConsumer();
// 模拟接收到的消息
String messageId = "12345";
String businessKey = "order_123";
String message = "这是一条测试消息";
consumer.consumeMessage(messageId, businessKey, message);
// 模拟重复消息
consumer.consumeMessage(messageId, businessKey, message);
}
}
代码解释
- 首先通过 Redis 的集合检查消息 ID 是否已经处理过,如果处理过则直接忽略。
- 若消息 ID 未处理过,再通过 MySQL 数据库检查业务唯一键是否已经处理过。
- 处理完消息后,同时将消息 ID 添加到 Redis 集合和将业务唯一键插入到数据库表中。
五、注意事项
5.1 消息 ID 和业务唯一键的生成
消息 ID 必须在消息队列中是唯一的,通常可以由消息队列系统自动生成。业务唯一键需要根据业务逻辑来生成,确保在业务系统中具有唯一性。
5.2 并发处理
在高并发场景下,可能会出现多个消费端同时处理同一条消息的情况。因此,在进行幂等性处理时,需要考虑并发控制。例如,在使用 Redis 时,可以使用 Redis 的原子操作来确保并发安全;在使用数据库时,可以使用数据库的事务和锁机制。
5.3 数据清理
随着时间的推移,已处理消息 ID 和业务唯一键的记录会越来越多,这会占用大量的存储空间。因此,需要定期对这些记录进行清理。例如,对于 Redis 集合,可以设置过期时间;对于数据库表,可以定期删除过期的记录。
六、文章总结
消息队列消费端的幂等性处理是分布式系统中一个重要的问题,基于消息 ID 去重和业务唯一键校验的方案可以有效地解决消息重复消费的问题。基于消息 ID 去重方案实现简单、性能较高,但依赖外部存储系统;基于业务唯一键校验方案与业务紧密结合、数据持久化,但会引入数据库操作的性能开销。在实际应用中,我们可以将两种方案结合使用,以提高幂等性处理的可靠性和性能。同时,在实施过程中,需要注意消息 ID 和业务唯一键的生成、并发处理和数据清理等问题。
评论