一、当网络打个喷嚏,Kafka消息就感冒了

想象一下这样的场景:你正在用微信给朋友发重要文件,突然电梯里没信号了,等回到地面发现"消息发送失败"。Kafka客户端遇到网络闪断时,就跟这个情况一模一样。不过别担心,就像我们会给重要消息点"重发"按钮一样,Kafka也有一整套"急救方案"。

先看个Java客户端的典型翻车现场:

// 危险示范:裸奔的生产者配置
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092"); // 只配一个服务器地址
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("important_data", "关键业务数据")); // 网络闪断时直接丢失

这种情况就像把鸡蛋放在一个篮子里:没有重试机制、没有确认机制、连服务器列表都不完整。当网络抖动时,消息说没就没了。

二、给消息穿上防弹衣的核心配置

2.1 必须开启的防护罩

这三个参数就像消息的三重保险:

props.put("acks", "all"); // 等所有ISR副本都确认
props.put("retries", Integer.MAX_VALUE); // 无限重试
props.put("max.block.ms", 60000); // 生产者阻塞超时时间

这相当于给消息上了三道锁:

  1. acks=all确保消息真正落地到磁盘
  2. retries=MAX_VALUE让客户端不断尝试
  3. max.block.ms给足重试时间

2.2 重试策略的精细调节

光有重试还不够,还得讲究策略:

props.put("retry.backoff.ms", 1000); // 每次重试间隔1秒
props.put("delivery.timeout.ms", 120000); // 总尝试时间2分钟
props.put("enable.idempotence", true); // 启用幂等性

特别注意enable.idempotence,它就像消息的身份证号,能避免网络抖动导致的重复消息。不过启用它需要同时满足:

  • acks=all
  • retries>0
  • max.in.flight.requests.per.connection=1

三、客户端缓存的双刃剑

Kafka生产者有个内存缓冲区,默认32MB。就像快递公司的中转仓库,好处是能应对突发流量,坏处是如果客户端崩溃,仓库里的"货物"就全丢了。

3.1 缓冲区调优示例

props.put("buffer.memory", 67108864); // 缓冲区扩容到64MB
props.put("batch.size", 32768); // 批次大小调整为32KB
props.put("linger.ms", 100); // 最多等待100ms凑批

这三个参数需要根据业务特点调整:

  • 高吞吐场景:增大batch.sizelinger.ms
  • 低延迟场景:减小linger.ms
  • 内存敏感场景:控制buffer.memory

3.2 崩溃前的最后挣扎

建议添加JVM关闭钩子保存未发送消息:

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    producer.flush(); // 强制发送缓冲区数据
    producer.close(Duration.ofSeconds(30)); // 优雅关闭
}));

四、消费者端的容错设计

消费者遇到网络问题同样会丢消息,关键是要管理好offset这个"书签"。

4.1 手动提交的正确姿势

props.put("enable.auto.commit", "false"); // 关闭自动提交

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            processRecord(record); // 处理消息
            consumer.commitSync(Collections.singletonMap(
                new TopicPartition(record.topic(), record.partition()),
                new OffsetAndMetadata(record.offset() + 1))); // 处理完立即提交
        }
    }
} catch (Exception e) {
    storeOffsetsToDB(); // 异常时保存offset到数据库
}

4.2 消费者重平衡监听器

consumer.subscribe(Collections.singleton("important_data"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        saveRecoveryPoints(partitions); // 失去分区前保存状态
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        loadRecoveryPoints(partitions); // 获得分区后恢复状态
    }
});

五、终极防御:本地持久化队列

对于金融级重要数据,可以引入本地存储作为二级缓存:

// 发送前先写本地
localQueue.saveToDisk("messageId", message); 

try {
    producer.send(record, (metadata, exception) -> {
        if (exception == null) {
            localQueue.markAsSent("messageId"); // 成功发送后标记
        }
    });
} catch (Exception e) {
    logger.error("发送失败,消息已持久化到本地", e);
}

// 后台线程处理失败消息
new Thread(() -> {
    while (true) {
        List<FailedMessage> failed = localQueue.getFailedMessages();
        failed.forEach(msg -> retrySend(msg));
        Thread.sleep(60000); // 每分钟检查一次
    }
}).start();

六、监控与运维要点

  1. 关键监控指标:

    • 生产者:record-error-rate, record-retry-rate
    • 消费者:consumer-lag, commit-rate
  2. 运维建议:

    # 检查消费者滞后情况
    kafka-consumer-groups --bootstrap-server kafka:9092 \
    --group important_group --describe
    
  3. 网络优化:

    • 保持长连接:调整socket.connection.setup.timeout.ms
    • TCP优化:调整send.buffer.bytesreceive.buffer.bytes

七、方案选型指南

  1. 普通业务:开启重试+幂等性即可
  2. 重要业务:增加本地持久化队列
  3. 金融业务:需要实现端到端事务

最后记住,没有银弹。我曾经遇到过一个案例:客户端配置了完善的重试机制,但因为DNS缓存问题导致重试全部指向错误的IP。所以网络问题需要全方位防御。