一、Kafka消息丢失的典型场景

消息队列用得好是神器,用不好就是灾难现场。咱们先来看看哪些情况下Kafka会把你的消息给"吞"了。想象一下你刚写完的代码,明明发送了订单支付消息,结果下游服务说根本没收到——这种场景简直能让程序员当场崩溃。

生产者端最常见的问题就是发送后不管。比如下面这个Java示例(技术栈:Java+Kafka):

// 错误示范:发送即遗忘模式
ProducerRecord<String, String> record = new ProducerRecord<>("payment", "order123", "paid");
producer.send(record); // 没有回调也没有get()等待
producer.close();

注释说明:

  1. 这种发送方式就像把信扔进邮筒就不管了
  2. 网络波动或Broker故障时消息可能根本没送达
  3. 生产环境绝对要避免这种写法

消费者端的问题往往更隐蔽。比如自动提交偏移量这个坑:

// 危险配置:自动提交偏移量
props.put("enable.auto.commit", "true"); // 默认就是true
props.put("auto.commit.interval.ms", "5000"); // 5秒提交一次

注释说明:

  1. 如果消费者崩溃,最后5秒处理的消息会重复消费
  2. 更可怕的是如果处理逻辑有异常,消息可能根本没被正确处理
  3. 自动提交就像不锁车门就离开,容易被"偷"消息

二、生产者端的可靠性保障

要保证生产者不丢消息,得搞明白Kafka的应答机制。就像寄快递,你要选择"必须签收"的服务才靠谱。来看正确姿势:

// 正确配置:高可靠生产者
props.put("acks", "all"); // 必须所有副本都确认
props.put("retries", 3); // 自动重试3次
props.put("max.in.flight.requests.per.connection", 1); // 防止乱序

// 发送时必须检查结果
producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        logger.error("消息发送失败", exception);
        // 这里应该加入重试或告警逻辑
    }
});

注释说明:

  1. acks=all是最高可靠性配置
  2. 回调函数能捕获所有发送异常
  3. 重试机制要配合幂等生产者使用

对于关键业务,还需要增加本地消息表作为双保险:

// 业务表和消息表在同一个事务
transactionTemplate.execute(status -> {
    orderDao.insert(order); // 写业务表
    messageLogDao.insert(log); // 写消息日志
    return null;
});

// 异步发送消息并更新状态
kafkaTemplate.send("payment", message).addCallback(
    success -> messageLogDao.updateStatus(messageId, "SENT"),
    failure -> alertService.notifyAdmin()
);

三、消费者端的正确姿势

消费者处理消息就像吃自助餐,不能贪多嚼不烂。先来看手动提交的正确方式:

// 安全消费模式
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        try {
            process(record); // 业务处理
            consumer.commitSync(); // 每条消息单独提交
        } catch (Exception e) {
            logger.error("处理失败", e);
            // 将错误消息写入死信队列
            deadLetterProducer.send(...);
        }
    }
}

注释说明:

  1. 同步提交确保偏移量准确更新
  2. 单条提交避免批量提交的"漏网之鱼"
  3. 死信队列收集处理失败的消息

对于批量处理场景,可以采用批处理+事务的方式:

// 批量处理模板
List<ConsumerRecord> batch = new ArrayList();
while (records.hasNext()) {
    batch.add(records.next());
    if (batch.size() >= 100) {
        transactionTemplate.execute(status -> {
            batch.forEach(this::process);
            consumer.commitSync();
            return null;
        });
        batch.clear();
    }
}

四、Broker端的配置优化

消息在Broker上也可能丢失,就像快递在中转站丢件。关键配置要记牢:

  1. 副本因子至少设置为3:
bin/kafka-topics.sh --create --topic important \
--partitions 3 --replication-factor 3 \
--config min.insync.replicas=2
  1. 必须配置min.insync.replicas:
# server.properties
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false

注释说明:

  1. min.insync.replicas=2表示至少2个副本写入成功
  2. 禁止unclean选举避免数据不一致
  3. 副本分散在不同机架更安全

五、监控与灾备方案

没有监控的系统就像蒙眼开车。这几个指标必须监控:

  1. 生产者丢失率:
metrics.addMetric("producer-loss-rate", 
    () -> (sentCount - ackedCount) / (double)sentCount);
  1. 消费者滞后量:
long lag = consumer.endOffsets(partitions) - consumer.position(partition);
alertIf(lag > 10000); // 滞后超过1万条就告警

灾备方案要像备胎一样随时可用:

  1. 跨机房镜像:使用MirrorMaker2同步数据
  2. 定期备份:用kafka-dump-log工具导出数据
  3. 消息追溯:保留至少7天日志

六、不同场景下的选择策略

  1. 金融交易场景:
  • 必须使用事务型生产者
  • 配合数据库事务表
  • 双机房部署
  1. 日志收集场景:
  • 可以适当降低可靠性
  • 允许少量重复
  • 采用压缩传输
  1. IoT设备数据:
  • 提高生产者缓存
  • 增加本地暂存
  • 批量发送优化

七、常见误区与避坑指南

  1. 误区:"配置了acks=all就万无一失" 事实:还要配合min.insync.replicas

  2. 误区:"消费者自动提交很方便" 事实:自动提交是丢消息的罪魁祸首

  3. 大坑:Kafka版本兼容问题

    • 0.11+ 才支持事务
    • 2.5+ 改进的幂等生产者

八、终极解决方案架构

给出一个完整的高可靠架构示例:

[生产者] -> [本地消息表] -> [Kafka集群(3副本)]
           ^              |
           |              v
[事务管理器]          [消费者] -> [业务处理] -> [人工稽核]

关键组件说明:

  1. 本地消息表:确保可追溯
  2. 事务管理器:保证原子性
  3. 人工稽核:最后防线

总结

Kafka消息可靠性就像接力赛,每个环节都不能掉棒。生产者要确认,Broker要多副本,消费者要谨慎提交。记住这个口诀:"发送要回调,消费要同步,副本要充足,监控要到位"。具体到实施时,要根据业务场景选择合适的一致性级别,金融级场景不怕麻烦,日志类场景可以适当放宽。最后提醒,所有配置都要经过压测验证,纸上谈兵终觉浅,绝知此事要躬行。