一、为什么需要重试机制

在分布式系统中,网络是不可靠的,消息可能会因为各种原因丢失或失败。比如,Kafka Broker 可能临时不可用,或者客户端与 Broker 之间的网络出现抖动。这时候,如果没有重试机制,消息就可能彻底丢失,导致业务逻辑中断。

举个例子,假设你正在开发一个订单系统,订单创建后需要发送消息到 Kafka,以便库存服务进行扣减。如果消息发送失败,而你又没有重试机制,那库存就不会被扣减,最终可能导致超卖。

所以,设计合理的重试机制,是保障消息可靠性的关键。

二、Kafka 客户端的重试机制

Kafka 生产者在发送消息时,默认会进行重试(retries 参数默认是 Integer.MAX_VALUE)。但这个默认行为并不一定适合所有场景,我们需要根据业务需求调整。

2.1 基本重试配置

在 Java 客户端中,可以通过 ProducerConfig 设置重试参数:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置最大重试次数
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 设置重试间隔(默认100ms)
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

代码说明:

  • RETRIES_CONFIG:控制最大重试次数,默认是 Integer.MAX_VALUE(几乎无限重试)。
  • RETRY_BACKOFF_MS_CONFIG:每次重试的间隔时间,默认 100ms。

2.2 重试的触发条件

Kafka 客户端会在以下情况下触发重试:

  1. 网络错误:比如连接超时、Broker 不可用。
  2. Leader 切换:分区 Leader 发生切换,客户端需要重新发现新的 Leader。
  3. 消息过大:如果消息超过 max.request.size,Broker 会拒绝,客户端需要调整消息大小后重试。

2.3 幂等性与事务

如果业务要求消息不能重复(例如支付场景),可以启用 幂等生产者

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

或者使用 事务 保证 Exactly-Once 语义:

props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-1");
producer.initTransactions();
try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<>("orders", "order-123", "PAID"));
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

三、消息可靠性保障策略

单纯依赖重试是不够的,我们还需要结合其他策略来确保消息不丢失。

3.1 ACKS 配置

acks 参数决定了 Kafka Broker 如何确认消息:

  • acks=0:不等待确认,消息可能丢失。
  • acks=1:Leader 写入成功即返回,如果 Leader 宕机且 Follower 未同步,消息会丢失。
  • acks=all:所有 ISR(In-Sync Replicas)都确认后才返回,最可靠。
props.put(ProducerConfig.ACKS_CONFIG, "all");

3.2 消息持久化与副本

确保 Topic 的 replication.factor ≥ 2,并且 min.insync.replicas ≥ 1(通常设置为 2)。

# 创建 Topic 时指定副本数
kafka-topics --create --topic orders --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092

3.3 消费者端的可靠性

消费者也需要正确处理消息:

  • 手动提交 Offset(避免自动提交导致消息丢失)。
  • 处理异常后重试消费。
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        try {
            processOrder(record.value());  // 业务处理
            consumer.commitSync();  // 手动提交 Offset
        } catch (Exception e) {
            log.error("处理消息失败,稍后重试", e);
        }
    }
}

四、应用场景与注意事项

4.1 典型应用场景

  1. 金融交易:要求消息绝对不能丢失,必须使用 acks=all + 事务。
  2. 日志收集:允许少量丢失,可以用 acks=1 提高吞吐量。
  3. 实时计算:结合 Flink 或 Spark Streaming 做 Exactly-Once 处理。

4.2 技术优缺点

方案 优点 缺点
默认重试 简单易用 可能重复消息
幂等生产者 避免重复 仅单分区有效
事务 强一致性 性能较低

4.3 注意事项

  1. 监控重试次数:如果重试过多,可能说明集群有问题。
  2. 避免无限重试:设置合理的 retries 和超时时间。
  3. 消费者幂等:即使生产者不重复,消费者也要做好幂等处理。

五、总结

Kafka 客户端的重试机制是消息可靠性的重要保障,但需要结合 acks、副本、消费者策略等才能做到万无一失。不同的业务场景需要不同的配置,金融级系统可以用事务 + acks=all,而日志类系统可以适当放宽要求。

最后,记得监控你的 Kafka 集群,确保消息既不会丢失,也不会重复!