一、当消息突然消失:一个真实的线上事故
某天凌晨,我们的订单系统突然报警——有用户重复收到了10条相同的支付成功短信。排查发现,Kafka生产者发送订单消息时,因为网络抖动导致超时,自动重试机制在服务端实际已写入的情况下又发了9次。这就是典型的生产者配置不当引发的数据重复问题。
二、为什么超时会导致重复?
Kafka生产者有两个关键参数在"打架":
request.timeout.ms(请求超时时间)retries(重试次数)
当网络波动时,如果服务端已经成功写入但确认消息延迟返回,生产者会因为超时误判发送失败而重试。就像你给朋友发微信,明明对方收到了,但你这边显示红色感叹号又连发好几条。
技术栈:Java + Spring Boot
// 错误配置示例:超时短+无限重试
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 无限重试
config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000); // 1秒超时
return new DefaultKafkaProducerFactory<>(config);
}
三、正确配置的五个黄金法则
1. 设置合理的超时时间
建议request.timeout.ms至少是max.block.ms(阻塞时间)的2倍。例如:
config.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000); // 阻塞最多3秒
config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 6000); // 超时6秒
2. 启用幂等性(idempotence)
这是Kafka 0.11版本后的杀手锏功能:
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 开启后会自动设置acks=all, retries=Integer.MAX_VALUE
3. 精确控制重试场景
对于非瞬时错误(如磁盘满)不应重试:
config.put(ProducerConfig.RETRIES_CONFIG, 3); // 最多3次
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100); // 每次间隔100ms
4. 关键业务添加事务ID
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-service-1");
// 使用时需要初始化事务
kafkaProducer.initTransactions();
5. 消费者端去重设计
即使生产者配置完美,仍需消费者做最后防线:
// 使用Redis记录已处理消息ID
Boolean isDuplicate = redisTemplate.opsForValue()
.setIfAbsent("msg:"+messageId, "1", 7, TimeUnit.DAYS);
if(Boolean.FALSE.equals(isDuplicate)) {
return; // 重复消息直接丢弃
}
四、不同场景下的配置模板
场景1:金融交易(零数据丢失)
config.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本确认
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 必须开启
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 禁止乱序
场景2:日志收集(允许少量丢失)
config.put(ProducerConfig.ACKS_CONFIG, "1"); // 主节点确认即可
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd"); // 启用压缩
config.put(ProducerConfig.LINGER_MS_CONFIG, 100); // 适当批量发送
五、避坑指南
- 不要盲目增大重试次数:遇到持续错误时,应该记录错误日志而不是无限重试
- 监控关键指标:重点关注
record-retries-total和record-error-total指标 - 测试时模拟网络故障:使用Linux的TC命令制造延迟和丢包:
tc qdisc add dev eth0 root netem delay 200ms loss 10%
六、从原理看问题本质
Kafka的消息传递语义有三种:
- 至少一次(at least once):可能重复
- 至多一次(at most once):可能丢失
- 精确一次(exactly once):需要生产者幂等+消费者事务
通过enable.idempotence=true+acks=all,我们实际上实现了轻量级的精确一次投递。其原理是服务端会缓存最近5个批次的消息ID,对重复请求直接返回成功。
七、终极解决方案:事务消息
对于跨系统数据一致性要求极高的场景(如订单创建+库存扣减),可以使用Kafka事务:
try {
kafkaProducer.beginTransaction();
// 发送主业务消息
kafkaProducer.send(new ProducerRecord<>("orders", orderJson));
// 执行本地数据库操作
orderRepository.save(order);
// 提交事务
kafkaProducer.commitTransaction();
} catch (Exception e) {
kafkaProducer.abortTransaction();
throw new RuntimeException("Transaction failed", e);
}
八、总结与最佳实践
- 超时时间:建议设置在5-30秒范围,根据业务容忍度调整
- 重试策略:结合业务决定是否重试,例如支付订单应该重试,用户行为日志可以不重试
- 监控报警:对连续重试超过3次的消息需要立即报警
- 消费者设计:始终假设消息可能重复,做好幂等处理
记住:没有完美的配置,只有适合业务的配置。建议在预发布环境用Chaos Engineering工具(如ChaosBlade)进行故障演练,观察系统行为是否符合预期。
评论