1. 从生活中的快递站说起
生活中的快递站就像消息队列——收发包裹的中间站。当我们想要实现系统间的异步通信时,RabbitMQ就是最好的"快递员"。比如电商订单完成后需要通知库存系统扣减库存,这时候同步调用可能导致超时,而异步消息可以完美解决这个问题。
// Spring Boot项目pom.xml依赖(技术栈:Spring Boot 2.7 + RabbitMQ)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 手把手配置RabbitMQ
步骤1:安装RabbitMQ 使用Docker快速搭建环境:
docker run -d --name my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
步骤2:配置文件注入灵魂
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
3. 消息生产者的诞生记
@Component
public class OrderSender {
@Autowired
private RabbitTemplate rabbitTemplate;
// 发送订单创建消息(直连交换机示例)
public void sendOrderCreated(Order order) {
rabbitTemplate.convertAndSend(
"order.direct", // 交换机名称
"order.create", // 路由键
order, // 消息体
message -> {
// 设置消息属性(类似快递包裹上的标签)
message.getMessageProperties().setPriority(5);
return message;
}
);
System.out.println("订单创建消息已发送:" + order.getId());
}
}
4. 消息消费者的养成计划
@Component
public class InventoryConsumer {
// 消息处理方法的完整参数列表展示(技术栈:Spring AMQP)
@RabbitListener(
queues = "inventory.queue",
concurrency = "3" // 开启3个消费者线程
)
public void handleInventoryUpdate(Order order, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
// 扣减库存业务逻辑
System.out.println("处理订单库存:" + order.getId());
// 手动确认消息(快递签收)
channel.basicAck(tag, false);
} catch (Exception e) {
// 消息重试机制(快递退回)
channel.basicNack(tag, false, true);
}
}
}
5. 交换机与队列的花式玩法
Topic交换机实战示例:
@Configuration
public class TopicConfig {
// 声明主题交换机(类似快递分拣中心)
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("system.topic");
}
// 日志队列绑定(路由键格式)
@Bean
public Queue logQueue() { return new Queue("log.all"); }
@Bean
public Binding logBinding() {
return BindingBuilder.bind(logQueue())
.to(topicExchange()).with("log.#");
}
}
6. 不能忽视的五个注意事项
① 消息持久化双保险:Message持久化 + 队列声明持久化
② 防消息丢失三件套:生产者确认、手动ACK、死信队列
③ 并发控制参数调节:prefetchCount控制消费速度
④ 幂等性设计:通过唯一ID防止重复扣款
⑤ 连接工厂隐藏玩法:自定义心跳检测时间
7. 异常处理实战案例
@Configuration
public class RetryConfig {
// 异常重试策略(类似快递多次投递)
@Bean
public MessageRecoverer messageRecoverer(ConnectionFactory connectionFactory) {
return new RepublishMessageRecoverer(
new RabbitTemplate(connectionFactory),
"error.direct", // 异常交换机
"error.routing" // 死信路由键
);
}
}
8. 性能优化的秘密武器
测试环境下可以通过Management插件观察队列情况:
@Bean
public Queue benchmarkQueue() {
return QueueBuilder.durable("stress.test")
.maxLength(10000) // 最大消息数
.overflow(Overflow.reject) // 拒绝新消息
.build();
}
9. 典型应用场景剧场
① 电商系统:订单处理与库存更新的解耦
② 物联网:百万级设备上报数据削峰填谷
③ 秒杀系统:请求缓冲队列避免数据库被打垮
④ 分布式事务:最终一致性场景的消息补偿
⑤ 日志收集:多服务日志统一处理管道
10. 技术选型对照表
优点速览:
- 协议健壮性:基于AMQP 0-9-1标准
- 可靠性保证:持久化+ACK机制
- 灵活路由:四种交换机类型
- 可视化管理:自带Web控制台
缺点提醒:
- 集群模式下镜像队列性能损耗
- 消息积压时内存管理要求高
- 不支持严格顺序消费
- 较重的客户端依赖