在开发中,我们经常会用到消息队列来处理异步任务,RabbitMQ 就是其中一款很受欢迎的消息队列。但有时候,消息可能会重复发送,这就会导致业务出现问题。今天咱们就来聊聊怎么利用业务 ID 实现幂等性消费,解决 RabbitMQ 消息去重的问题。
一、RabbitMQ 消息去重的背景
在很多实际的业务场景里,消息重复是很常见的。比如说,网络抖动可能会让消息发送方重复发送消息;消息处理方在处理消息时,因为某些原因失败了,重试机制也可能导致消息重复。要是不处理这些重复消息,可能会造成数据重复写入、业务逻辑混乱等问题。
举个例子,电商系统里,用户下单后会发送一条消息到 RabbitMQ,消息处理方收到消息后会扣减库存。如果消息重复,就会导致库存多扣,这显然是不行的。所以,消息去重就变得非常重要。
二、幂等性消费的概念
幂等性消费,简单来说,就是对同一个业务操作,不管执行多少次,产生的结果都是一样的。在 RabbitMQ 里,就是要保证同一条消息,不管消费多少次,业务逻辑只执行一次。
比如说,用户支付订单这个操作,不管支付消息重复消费多少次,订单的支付状态只能从未支付变成已支付,不能多次改变状态。
三、基于业务 ID 实现幂等性消费的方案
3.1 基本思路
我们可以给每个业务操作分配一个唯一的业务 ID,在消费消息时,先检查这个业务 ID 是否已经处理过。如果处理过,就直接忽略这条消息;如果没处理过,就处理消息,并记录这个业务 ID 已经处理。
3.2 示例代码(Java 技术栈)
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
// 模拟一个简单的消息消费者
public class MessageConsumer {
private static final String QUEUE_NAME = "test_queue";
// 用一个 Map 来记录已经处理过的业务 ID
private static final Map<String, Boolean> processedIds = new HashMap<>();
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 创建连接
Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
// 假设消息格式是 "业务ID:业务内容"
String[] parts = message.split(":");
String businessId = parts[0];
String businessContent = parts[1];
// 检查业务 ID 是否已经处理过
if (processedIds.containsKey(businessId)) {
System.out.println("业务 ID " + businessId + " 已经处理过,忽略该消息");
return;
}
// 处理业务逻辑
System.out.println("收到消息,业务 ID: " + businessId + ", 业务内容: " + businessContent);
// 模拟业务处理
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 记录业务 ID 已经处理
processedIds.put(businessId, true);
}
};
// 开始消费消息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
3.3 代码解释
processedIds:这是一个Map,用来记录已经处理过的业务 ID。- 在
handleDelivery方法里,我们先从消息中提取业务 ID,然后检查这个业务 ID 是否已经在processedIds里。如果在,就忽略这条消息;如果不在,就处理业务逻辑,并把业务 ID 记录到processedIds里。
四、应用场景
4.1 电商系统
在电商系统里,订单的创建、支付、库存扣减等操作都可以使用这种消息去重方案。比如,用户支付订单时,支付消息可能会因为网络问题重复发送。通过业务 ID 去重,就能保证订单的支付状态只改变一次。
4.2 金融系统
金融系统对数据的准确性要求非常高,消息重复可能会导致资金的错误转移。使用业务 ID 实现幂等性消费,可以确保每一笔交易只处理一次,避免资金的重复操作。
4.3 日志系统
在日志系统里,可能会因为网络问题或系统故障导致日志消息重复发送。通过消息去重,可以避免重复记录日志,减少存储成本。
五、技术优缺点
5.1 优点
- 简单易懂:基于业务 ID 的去重方案很容易理解和实现,只需要在消费消息时检查业务 ID 是否已经处理过。
- 性能较高:使用
Map或缓存来记录业务 ID,查询速度很快,不会对系统性能造成太大影响。 - 通用性强:这种方案适用于各种业务场景,只要业务有唯一的业务 ID 就行。
5.2 缺点
- 状态管理复杂:如果业务 ID 记录在内存里,系统重启后数据会丢失;如果记录在数据库里,会增加数据库的读写压力。
- 并发问题:在高并发场景下,可能会出现多个线程同时处理同一个业务 ID 的情况,需要考虑并发控制。
六、注意事项
6.1 业务 ID 的唯一性
业务 ID 必须是唯一的,不能有重复。可以使用 UUID 或数据库的自增 ID 作为业务 ID。
6.2 状态记录的持久化
如果使用内存来记录业务 ID,系统重启后数据会丢失。可以考虑使用数据库或缓存(如 Redis)来持久化记录业务 ID。
6.3 并发控制
在高并发场景下,需要考虑并发控制。可以使用锁机制或分布式锁来保证同一时间只有一个线程处理同一个业务 ID。
七、文章总结
通过基于业务 ID 实现幂等性消费,我们可以有效地解决 RabbitMQ 消息去重的问题。这种方案简单易懂,性能较高,适用于各种业务场景。但在实际应用中,需要注意业务 ID 的唯一性、状态记录的持久化和并发控制等问题。
评论