一、当消息突然消失:一个真实的线上事故

某天凌晨,我们的订单系统突然报警——有用户重复收到了10条相同的支付成功短信。排查发现,Kafka生产者发送订单消息时,因为网络抖动导致超时,自动重试机制在服务端实际已写入的情况下又发了9次。这就是典型的生产者配置不当引发的数据重复问题。

二、为什么超时会导致重复?

Kafka生产者有两个关键参数在"打架":

  • request.timeout.ms(请求超时时间)
  • retries(重试次数)

当网络波动时,如果服务端已经成功写入但确认消息延迟返回,生产者会因为超时误判发送失败而重试。就像你给朋友发微信,明明对方收到了,但你这边显示红色感叹号又连发好几条。

技术栈:Java + Spring Boot

// 错误配置示例:超时短+无限重试
@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
    config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 无限重试
    config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000);   // 1秒超时
    return new DefaultKafkaProducerFactory<>(config);
}

三、正确配置的五个黄金法则

1. 设置合理的超时时间

建议request.timeout.ms至少是max.block.ms(阻塞时间)的2倍。例如:

config.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000);   // 阻塞最多3秒
config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 6000); // 超时6秒

2. 启用幂等性(idempotence)

这是Kafka 0.11版本后的杀手锏功能:

config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); 
// 开启后会自动设置acks=all, retries=Integer.MAX_VALUE

3. 精确控制重试场景

对于非瞬时错误(如磁盘满)不应重试:

config.put(ProducerConfig.RETRIES_CONFIG, 3); // 最多3次
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100); // 每次间隔100ms

4. 关键业务添加事务ID

config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-service-1");
// 使用时需要初始化事务
kafkaProducer.initTransactions();

5. 消费者端去重设计

即使生产者配置完美,仍需消费者做最后防线:

// 使用Redis记录已处理消息ID
Boolean isDuplicate = redisTemplate.opsForValue()
    .setIfAbsent("msg:"+messageId, "1", 7, TimeUnit.DAYS);
if(Boolean.FALSE.equals(isDuplicate)) {
    return; // 重复消息直接丢弃
}

四、不同场景下的配置模板

场景1:金融交易(零数据丢失)

config.put(ProducerConfig.ACKS_CONFIG, "all");          // 所有副本确认
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 必须开启
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 禁止乱序

场景2:日志收集(允许少量丢失)

config.put(ProducerConfig.ACKS_CONFIG, "1");           // 主节点确认即可
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd"); // 启用压缩
config.put(ProducerConfig.LINGER_MS_CONFIG, 100);      // 适当批量发送

五、避坑指南

  1. 不要盲目增大重试次数:遇到持续错误时,应该记录错误日志而不是无限重试
  2. 监控关键指标:重点关注record-retries-totalrecord-error-total指标
  3. 测试时模拟网络故障:使用Linux的TC命令制造延迟和丢包:
    tc qdisc add dev eth0 root netem delay 200ms loss 10%
    

六、从原理看问题本质

Kafka的消息传递语义有三种:

  • 至少一次(at least once):可能重复
  • 至多一次(at most once):可能丢失
  • 精确一次(exactly once):需要生产者幂等+消费者事务

通过enable.idempotence=true+acks=all,我们实际上实现了轻量级的精确一次投递。其原理是服务端会缓存最近5个批次的消息ID,对重复请求直接返回成功。

七、终极解决方案:事务消息

对于跨系统数据一致性要求极高的场景(如订单创建+库存扣减),可以使用Kafka事务:

try {
    kafkaProducer.beginTransaction();
    // 发送主业务消息
    kafkaProducer.send(new ProducerRecord<>("orders", orderJson));
    // 执行本地数据库操作
    orderRepository.save(order);
    // 提交事务
    kafkaProducer.commitTransaction();
} catch (Exception e) {
    kafkaProducer.abortTransaction();
    throw new RuntimeException("Transaction failed", e);
}

八、总结与最佳实践

  1. 超时时间:建议设置在5-30秒范围,根据业务容忍度调整
  2. 重试策略:结合业务决定是否重试,例如支付订单应该重试,用户行为日志可以不重试
  3. 监控报警:对连续重试超过3次的消息需要立即报警
  4. 消费者设计:始终假设消息可能重复,做好幂等处理

记住:没有完美的配置,只有适合业务的配置。建议在预发布环境用Chaos Engineering工具(如ChaosBlade)进行故障演练,观察系统行为是否符合预期。