在分布式系统里,消息队列是个特别重要的组件,它能帮我们处理高并发,还能让系统之间的耦合度降低。RabbitMQ 就是消息队列界的明星,好多开发者都喜欢用它。不过呢,在实际使用的时候,消息丢失可是个让人头疼的问题。今天咱就来好好分析分析这个问题,再找找解决方案,保证消息能可靠投递。

一、RabbitMQ 基础介绍

RabbitMQ 其实就是个实现了高级消息队列协议(AMQP)的开源消息代理软件。它就像是个“快递站”,发送方把消息“包裹”送到这个“快递站”,接收方再从“快递站”把“包裹”取走。这么一来,发送方和接收方就不用直接打交道了,系统的耦合度也就降低了。

举个例子,有个电商系统,用户下单后,系统要做很多事儿,比如扣库存、发通知啥的。要是这些事儿都让下单服务一个一个挨着做,那下单的响应速度肯定慢。这时候就可以用 RabbitMQ 了。下单服务把消息发给 RabbitMQ,然后就可以马上给用户返回下单成功的消息。其他服务再从 RabbitMQ 里拿消息,去做扣库存、发通知这些事儿。这样一来,下单服务就不用等其他事儿都做完才给用户反馈,响应速度就快多了。

二、消息丢失的场景分析

1. 生产者发送消息时丢失

生产者在把消息发给 RabbitMQ 的过程中,可能会因为网络问题或者 RabbitMQ 服务挂掉,导致消息发不出去。比如说,生产者想把用户下单的消息发给 RabbitMQ,结果网络突然断了,这消息就发不过去了。

2. RabbitMQ 存储消息时丢失

RabbitMQ 在存储消息的时候,如果它所在的服务器突然死机、磁盘坏了,那消息就有可能丢失。就好比“快递站”突然着火了,里面的“包裹”都被烧没了。

3. 消费者接收消息时丢失

消费者从 RabbitMQ 拿到消息后,还没处理完就因为各种原因挂掉了,而且没给 RabbitMQ 确认消息已经处理好。这时候 RabbitMQ 就以为消息已经被成功处理了,不会再发一次,消息就这么丢了。比如说,消费者拿到用户下单的消息后,准备去扣库存,结果程序突然崩溃了,库存没扣成,消息也丢了。

三、解决方案

1. 生产者端的解决方案

(1)开启确认模式(Confirm 模式)

在 Java 里用 RabbitMQ 时,可以开启 Confirm 模式。开启之后,生产者发消息给 RabbitMQ,RabbitMQ 收到消息就会给生产者一个确认信号。生产者根据这个信号就能知道消息有没有发成功。

// Java 技术栈示例
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ProducerExample {
    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建信道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 开启 Confirm 模式
        channel.confirmSelect();

        String message = "Hello, RabbitMQ!";
        // 发送消息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

        // 等待确认
        if (channel.waitForConfirms()) {
            System.out.println("消息发送成功");
        } else {
            System.out.println("消息发送失败");
        }

        // 关闭信道和连接
        channel.close();
        connection.close();
    }
}

在这个示例里,我们先创建了连接工厂、连接和信道,然后声明了队列。接着开启了 Confirm 模式,发消息之后用 waitForConfirms() 方法等待确认。如果返回 true,就说明消息发成功了;如果返回 false,就说明消息发失败了。

(2)事务模式

事务模式也能保证消息可靠发送。不过它的性能没 Confirm 模式好,因为它是同步的,会阻塞。

// Java 技术栈示例
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TransactionProducerExample {
    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建信道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        try {
            // 开启事务
            channel.txSelect();
            String message = "Hello, RabbitMQ!";
            // 发送消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            // 提交事务
            channel.txCommit();
            System.out.println("消息发送成功");
        } catch (IOException e) {
            // 回滚事务
            channel.txRollback();
            System.out.println("消息发送失败");
        }

        // 关闭信道和连接
        channel.close();
        connection.close();
    }
}

在这个示例里,我们先开启事务,发消息之后提交事务。如果发送过程中出问题了,就回滚事务。

2. RabbitMQ 端的解决方案

(1)持久化队列和消息

把队列和消息都设置成持久化的,这样就算 RabbitMQ 所在的服务器重启了,消息也不会丢。

// Java 技术栈示例
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PersistentProducerExample {
    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建信道
        Channel channel = connection.createChannel();
        // 声明持久化队列
        boolean durable = true;
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

        String message = "Hello, RabbitMQ!";
        // 设置消息为持久化
        com.rabbitmq.client.AMQP.BasicProperties properties = new com.rabbitmq.client.AMQP.BasicProperties.Builder()
               .deliveryMode(2) // 2 表示持久化
               .build();
        // 发送持久化消息
        channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());

        // 关闭信道和连接
        channel.close();
        connection.close();
    }
}

在这个示例里,我们声明队列的时候把 durable 参数设为 true,表示队列是持久化的。发消息的时候,把消息的 deliveryMode 设为 2,也表示消息是持久化的。

(2)镜像队列

镜像队列可以让消息在多个节点上都有副本。这样就算一个节点挂了,其他节点上还有消息的副本,消息也不会丢。不过镜像队列会占用更多的资源。

3. 消费者端的解决方案

(1)手动确认消息

消费者从 RabbitMQ 拿到消息后,处理完了再手动给 RabbitMQ 发个确认消息。这样就算消费者处理消息的时候挂了,RabbitMQ 没收到确认消息,就会把消息再发给其他消费者处理。

// Java 技术栈示例
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerExample {
    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建信道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 创建消费者
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            try {
                // 模拟处理消息
                doWork(message);
            } finally {
                // 手动确认消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println(" [x] Canceled");
        };

        // 开启手动确认
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }

    private static void doWork(String task) {
        for (char ch: task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

在这个示例里,我们把 autoAck 参数设为 false,表示开启手动确认。消费者处理完消息后,用 basicAck() 方法给 RabbitMQ 发确认消息。

四、应用场景

1. 电商系统

在电商系统里,用户下单后,订单服务把消息发给 RabbitMQ,库存服务、物流服务、通知服务等从 RabbitMQ 拿消息去处理。这样可以保证订单处理的高并发和系统的解耦。

2. 日志收集系统

应用程序把日志消息发给 RabbitMQ,日志收集器从 RabbitMQ 拿消息,再把日志存储到日志系统里。这样可以避免应用程序因为日志存储的问题而影响性能。

五、技术优缺点

1. 优点

(1)可靠性高

通过上面说的这些解决方案,可以很大程度上保证消息的可靠投递,减少消息丢失的情况。

(2)解耦性强

发送方和接收方不用直接打交道,系统的耦合度降低了,开发和维护都更方便。

(3)支持多种协议

RabbitMQ 支持 AMQP、STOMP、MQTT 等多种消息协议,能满足不同场景的需求。

2. 缺点

(1)性能开销

开启 Confirm 模式、事务模式、持久化等功能会有一定的性能开销,影响系统的吞吐量。

(2)复杂度增加

为了保证消息可靠投递,需要在生产者、RabbitMQ 和消费者端都做一些处理,系统的复杂度增加了。

六、注意事项

1. 性能和可靠性的平衡

在实际使用中,要根据具体场景在性能和可靠性之间找到一个平衡点。比如在对性能要求很高的场景下,就可以适当降低一些可靠性要求;在对可靠性要求很高的场景下,就可以牺牲一些性能来保证消息可靠投递。

2. 异常处理

在生产者、RabbitMQ 和消费者端都要做好异常处理。比如说,生产者发送消息失败了,要能重试或者记录日志;消费者处理消息失败了,要能回滚或者记录错误信息。

七、文章总结

RabbitMQ 是个很强大的消息队列,但是消息丢失问题确实是个需要解决的难题。通过在生产者端开启 Confirm 模式或事务模式,在 RabbitMQ 端设置持久化队列和消息、使用镜像队列,在消费者端开启手动确认消息等方法,可以有效地避免消息丢失,保证消息的可靠投递。不过在实际使用中,要根据具体的应用场景,在性能和可靠性之间找到平衡,同时做好异常处理,这样才能让系统更加稳定可靠。