一、消息队列的世界观

在分布式系统架构中,消息队列如同高效的快递网络,RabbitMQ、Kafka与RocketMQ这三种主流方案各具特色。它们在电商秒杀、日志收集、金融交易等场景中扮演着不同的角色,就像不同的运输车辆——有的适合短途精准配送,有的擅长长途大宗运输,还有的则是全能型选手。

二、RabbitMQ:AMQP协议的信使

2.1 安装起手式(Ubuntu环境)

# 添加Erlang存储库(RabbitMQ依赖)
sudo apt-get install -y erlang

# 导入软件源密钥
curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc | sudo gpg --dearmor -o /usr/share/keyrings/rabbitmq-archive-keyring.gpg

# 安装最新稳定版
sudo apt-get update && sudo apt-get install -y rabbitmq-server

# 启动服务并设置开机启动
sudo systemctl enable rabbitmq-server
sudo systemctl start rabbitmq-server

# 创建管理员账号(示例账户/密码)
sudo rabbitmqctl add_user admin MySecureP@ss
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

2.2 Python生产消费示例

# 生产者:message_producer.py
import pika

credentials = pika.PlainCredentials('admin', 'MySecureP@ss')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()

# 声明持久化队列
channel.queue_declare(queue='order_queue', durable=True)

# 发送持久化消息
for i in range(1, 6):
    message = f"订单号#00{i}已创建"
    channel.basic_publish(
        exchange='',
        routing_key='order_queue',
        body=message,
        properties=pika.BasicProperties(delivery_mode=2))
    print(f" [x] 已发送: {message}")
connection.close()
# 消费者:message_consumer.py
import pika

def callback(ch, method, properties, body):
    print(f" [*] 收到消息: {body.decode()}")
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动确认

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()

channel.basic_qos(prefetch_count=1)  # 公平分发
channel.basic_consume(queue='order_queue', on_message_callback=callback)
print(' [*] 等待消息...')
channel.start_consuming()

三、Kafka:日志流的统治者

3.1 单节点快速部署(基于Docker)

# docker-compose.yml(集成Zookeeper)
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"

3.2 Java客户端实战

// 生产者:KafkaProducerDemo.java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");  // 高可靠性配置
props.put("retries", 3);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

// 发送带回调的消息
for (int i = 0; i < 10; i++) {
    ProducerRecord<String, String> record = 
        new ProducerRecord<>("user_behavior", "key-" + i, "点击事件" + System.currentTimeMillis());
    producer.send(record, (metadata, exception) -> {
        if (exception == null) {
            System.out.printf("消息发送到分区 %s, 偏移量 %d%n", 
                metadata.partition(), metadata.offset());
        }
    });
}
producer.close();

四、RocketMQ:阿里系的重剑

4.1 集群部署架构

namesrv集群(建议3节点)
  ↓
broker主从集群(2主2从)
  ↓
控制台(可选)

4.2 SpringBoot整合示例

// 生产者配置类
@Configuration
public class RocketMQConfig {
    @Value("${rocketmq.name-server}")
    private String nameServer;

    @Bean
    public DefaultMQProducer producer() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("payment_group");
        producer.setNamesrvAddr(nameServer);
        producer.setVipChannelEnabled(false);
        producer.start();
        return producer;
    }
}

// 消息发送服务
@Service
public class PaymentService {
    @Autowired
    private DefaultMQProducer producer;

    public void sendPaymentSuccess(String orderId) throws Exception {
        Message msg = new Message("PAYMENT_TOPIC", 
            "SUCCESS_TAG", 
            orderId.getBytes(StandardCharsets.UTF_8));
        SendResult result = producer.send(msg);
        System.out.println("发送状态:" + result.getSendStatus());
    }
}

五、三剑客的武林大会

5.1 性能指标对比

维度 RabbitMQ Kafka RocketMQ
吞吐量 5-10w/s 10-100w/s 10-100w/s
时延 微秒级 毫秒级 毫秒级
功能特性 丰富协议支持 分区/消费组 事务/顺序消息

5.2 企业级部署建议

  1. RabbitMQ集群:镜像队列+负载均衡器
  2. Kafka集群:至少3个Broker+独立Zookeeper集群
  3. RocketMQ集群:Dledger模式实现自动选举

六、选型决策树

当面临选择时,请按以下路径思考:

  1. 需要严格顺序保障吗? → RocketMQ
  2. 处理日志类数据流? → Kafka
  3. 需要灵活路由规则? → RabbitMQ
  4. 国企/金融行业? → 优选国产化方案

七、技术演进趋势

新一代系统开始融合三者优点,例如Pulsar结合了Kafka和RabbitMQ的特性,但在生产环境中的成熟度仍需观察。同时云原生版本(如RabbitMQ Streams)正在突破传统定位。