一、背景引入

在日常的软件开发工作中,消息队列是一个非常重要的组件,它可以帮助我们实现系统之间的异步通信、解耦和流量削峰。RabbitMQ 作为一款功能强大且广泛应用的消息队列中间件,常常被用于各种场景中。然而,在实际使用过程中,我们可能会遇到消息重复的问题。想象一下,当一个电商系统在处理订单时,如果因为网络问题、服务重启等原因导致同一条订单消息被多次发送到 RabbitMQ 中,那么后续的业务处理就可能会出现重复操作,比如重复创建订单、重复扣款等,这显然是我们不希望看到的。所以,如何对 RabbitMQ 中的消息进行去重就成了一个亟待解决的问题。本文将介绍一种基于业务 ID 实现重复消息过滤的方案,帮助大家解决这个难题。

二、应用场景

1. 电商系统

在电商系统中,用户下单后会生成一个订单消息发送到 RabbitMQ 中,然后由不同的服务来处理这个订单,比如库存扣减、支付等。如果订单消息重复,就可能导致库存重复扣减或者用户被重复扣款。通过基于业务 ID(订单 ID)的消息去重,可以确保每个订单只被处理一次。

2. 日志系统

在日志系统中,应用程序会将日志消息发送到 RabbitMQ 中,然后由日志处理服务进行收集、存储和分析。如果日志消息重复,会增加存储成本和处理负担。使用业务 ID(比如日志的唯一编号)进行去重,可以避免这种情况。

3. 金融系统

在金融系统中,交易消息的准确性至关重要。如果交易消息重复,可能会导致资金错误转移等严重问题。通过业务 ID(交易 ID)对消息进行去重,可以保障金融交易的安全性和准确性。

三、技术方案

1. 基本思路

基于业务 ID 实现重复消息过滤的基本思路是:在消息生产者发送消息时,为每条消息添加一个唯一的业务 ID。当消息消费者接收到消息时,首先检查该业务 ID 是否已经处理过。如果已经处理过,则忽略该消息;如果没有处理过,则进行业务处理,并记录该业务 ID 已经被处理。为了实现这个功能,我们可以使用 Redis 来存储已经处理过的业务 ID。

2. 示例代码(Java + RabbitMQ + Redis)

生产者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import redis.clients.jedis.Jedis;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;

public class MessageProducer {
    private static final String QUEUE_NAME = "order_queue";
    private static final String REDIS_HOST = "localhost";
    private static final int REDIS_PORT = 6379;

    public static void main(String[] args) throws IOException {
        // 创建 Redis 连接
        Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT); 

        // 创建 RabbitMQ 连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null); 

            // 生成业务 ID
            String businessId = UUID.randomUUID().toString(); 
            // 消息内容
            String message = "Order data for ID: " + businessId; 

            // 检查 Redis 中是否已经存在该业务 ID
            if (!jedis.exists(businessId)) {
                // 若不存在,则将业务 ID 存入 Redis
                jedis.set(businessId, "processed"); 
                // 发送消息到 RabbitMQ
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); 
                System.out.println(" [x] Sent '" + message + "'");
            } else {
                System.out.println(" [x] Skipped duplicate message for ID: " + businessId);
            }
        }
    }
}

消费者代码

import com.rabbitmq.client.*;
import redis.clients.jedis.Jedis;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class MessageConsumer {
    private static final String QUEUE_NAME = "order_queue";
    private static final String REDIS_HOST = "localhost";
    private static final int REDIS_PORT = 6379;

    public static void main(String[] args) throws Exception {
        // 创建 Redis 连接
        Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT); 

        // 创建 RabbitMQ 连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null); 
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            try {
                // 获取消息内容
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8); 
                // 从消息中提取业务 ID
                String businessId = message.split(": ")[1]; 

                // 检查 Redis 中是否已经存在该业务 ID
                if (!jedis.exists(businessId)) {
                    // 若不存在,则将业务 ID 存入 Redis
                    jedis.set(businessId, "processed");
                    System.out.println(" [x] Received '" + message + "' and processed.");
                } else {
                    System.out.println(" [x] Skipped duplicate message for ID: " + businessId);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        };

        // 消费消息
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); 
    }
}

四、技术优缺点

1. 优点

简单易实现

基于业务 ID 和 Redis 实现消息去重的方案非常简单,只需要在消息生产者和消费者中添加少量的代码即可。不需要对 RabbitMQ 本身进行复杂的配置和修改。

性能高

Redis 是一个高性能的内存数据库,读写速度非常快。使用 Redis 来存储已经处理过的业务 ID,可以快速地进行重复检查,不会对消息处理的性能造成太大的影响。

可扩展性强

该方案可以很容易地扩展到不同的业务场景中。只需要根据不同的业务需求,修改业务 ID 的生成方式和消息处理逻辑即可。

2. 缺点

依赖 Redis

该方案依赖于 Redis,如果 Redis 出现故障或者网络问题,可能会导致消息去重功能失效。需要对 Redis 进行高可用配置,以确保系统的稳定性。

业务 ID 管理

需要确保业务 ID 的唯一性。如果业务 ID 不唯一,可能会导致消息去重出现错误。需要在业务系统中实现合理的业务 ID 生成机制。

五、注意事项

1. Redis 高可用

为了避免 Redis 故障导致消息去重功能失效,需要对 Redis 进行高可用配置。可以使用 Redis 集群或者 Redis Sentinel 来实现 Redis 的高可用性。

2. 业务 ID 生成

业务 ID 必须是唯一的。可以使用 UUID 或者数据库自增主键等方式来生成业务 ID。同时,要确保业务 ID 在消息生产者和消费者之间的一致性。

3. Redis 内存管理

由于 Redis 是基于内存的数据库,存储大量的业务 ID 可能会导致内存占用过高。可以设置 Redis 的过期时间,定期清理过期的业务 ID,以节省内存。

六、文章总结

在使用 RabbitMQ 进行消息传递时,消息重复是一个常见的问题,可能会对业务系统造成严重的影响。基于业务 ID 实现重复消息过滤的方案是一种简单、高效且可扩展的解决方案。通过为每条消息添加唯一的业务 ID,并使用 Redis 来存储已经处理过的业务 ID,可以有效地避免消息的重复处理。

在实际应用中,我们需要注意 Redis 的高可用性、业务 ID 的生成和管理以及 Redis 的内存管理等问题。同时,要根据具体的业务场景和需求,对方案进行适当的调整和优化。通过本文的介绍和示例代码,相信大家对基于业务 ID 实现 RabbitMQ 消息去重有了更深入的理解和掌握。