一、消息队列的世界观
在分布式系统架构中,消息队列如同高效的快递网络,RabbitMQ、Kafka与RocketMQ这三种主流方案各具特色。它们在电商秒杀、日志收集、金融交易等场景中扮演着不同的角色,就像不同的运输车辆——有的适合短途精准配送,有的擅长长途大宗运输,还有的则是全能型选手。
二、RabbitMQ:AMQP协议的信使
2.1 安装起手式(Ubuntu环境)
# 添加Erlang存储库(RabbitMQ依赖)
sudo apt-get install -y erlang
# 导入软件源密钥
curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc | sudo gpg --dearmor -o /usr/share/keyrings/rabbitmq-archive-keyring.gpg
# 安装最新稳定版
sudo apt-get update && sudo apt-get install -y rabbitmq-server
# 启动服务并设置开机启动
sudo systemctl enable rabbitmq-server
sudo systemctl start rabbitmq-server
# 创建管理员账号(示例账户/密码)
sudo rabbitmqctl add_user admin MySecureP@ss
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
2.2 Python生产消费示例
# 生产者:message_producer.py
import pika
credentials = pika.PlainCredentials('admin', 'MySecureP@ss')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()
# 声明持久化队列
channel.queue_declare(queue='order_queue', durable=True)
# 发送持久化消息
for i in range(1, 6):
message = f"订单号#00{i}已创建"
channel.basic_publish(
exchange='',
routing_key='order_queue',
body=message,
properties=pika.BasicProperties(delivery_mode=2))
print(f" [x] 已发送: {message}")
connection.close()
# 消费者:message_consumer.py
import pika
def callback(ch, method, properties, body):
print(f" [*] 收到消息: {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag) # 手动确认
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()
channel.basic_qos(prefetch_count=1) # 公平分发
channel.basic_consume(queue='order_queue', on_message_callback=callback)
print(' [*] 等待消息...')
channel.start_consuming()
三、Kafka:日志流的统治者
3.1 单节点快速部署(基于Docker)
# docker-compose.yml(集成Zookeeper)
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.3.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
3.2 Java客户端实战
// 生产者:KafkaProducerDemo.java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all"); // 高可靠性配置
props.put("retries", 3);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
// 发送带回调的消息
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("user_behavior", "key-" + i, "点击事件" + System.currentTimeMillis());
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("消息发送到分区 %s, 偏移量 %d%n",
metadata.partition(), metadata.offset());
}
});
}
producer.close();
四、RocketMQ:阿里系的重剑
4.1 集群部署架构
namesrv集群(建议3节点)
↓
broker主从集群(2主2从)
↓
控制台(可选)
4.2 SpringBoot整合示例
// 生产者配置类
@Configuration
public class RocketMQConfig {
@Value("${rocketmq.name-server}")
private String nameServer;
@Bean
public DefaultMQProducer producer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("payment_group");
producer.setNamesrvAddr(nameServer);
producer.setVipChannelEnabled(false);
producer.start();
return producer;
}
}
// 消息发送服务
@Service
public class PaymentService {
@Autowired
private DefaultMQProducer producer;
public void sendPaymentSuccess(String orderId) throws Exception {
Message msg = new Message("PAYMENT_TOPIC",
"SUCCESS_TAG",
orderId.getBytes(StandardCharsets.UTF_8));
SendResult result = producer.send(msg);
System.out.println("发送状态:" + result.getSendStatus());
}
}
五、三剑客的武林大会
5.1 性能指标对比
维度 | RabbitMQ | Kafka | RocketMQ |
---|---|---|---|
吞吐量 | 5-10w/s | 10-100w/s | 10-100w/s |
时延 | 微秒级 | 毫秒级 | 毫秒级 |
功能特性 | 丰富协议支持 | 分区/消费组 | 事务/顺序消息 |
5.2 企业级部署建议
- RabbitMQ集群:镜像队列+负载均衡器
- Kafka集群:至少3个Broker+独立Zookeeper集群
- RocketMQ集群:Dledger模式实现自动选举
六、选型决策树
当面临选择时,请按以下路径思考:
- 需要严格顺序保障吗? → RocketMQ
- 处理日志类数据流? → Kafka
- 需要灵活路由规则? → RabbitMQ
- 国企/金融行业? → 优选国产化方案
七、技术演进趋势
新一代系统开始融合三者优点,例如Pulsar结合了Kafka和RabbitMQ的特性,但在生产环境中的成熟度仍需观察。同时云原生版本(如RabbitMQ Streams)正在突破传统定位。