一、当网络打个喷嚏,Kafka消息就感冒了
想象一下这样的场景:你正在用微信给朋友发重要文件,突然电梯里没信号了,等回到地面发现"消息发送失败"。Kafka客户端遇到网络闪断时,就跟这个情况一模一样。不过别担心,就像我们会给重要消息点"重发"按钮一样,Kafka也有一整套"急救方案"。
先看个Java客户端的典型翻车现场:
// 危险示范:裸奔的生产者配置
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092"); // 只配一个服务器地址
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("important_data", "关键业务数据")); // 网络闪断时直接丢失
这种情况就像把鸡蛋放在一个篮子里:没有重试机制、没有确认机制、连服务器列表都不完整。当网络抖动时,消息说没就没了。
二、给消息穿上防弹衣的核心配置
2.1 必须开启的防护罩
这三个参数就像消息的三重保险:
props.put("acks", "all"); // 等所有ISR副本都确认
props.put("retries", Integer.MAX_VALUE); // 无限重试
props.put("max.block.ms", 60000); // 生产者阻塞超时时间
这相当于给消息上了三道锁:
acks=all确保消息真正落地到磁盘retries=MAX_VALUE让客户端不断尝试max.block.ms给足重试时间
2.2 重试策略的精细调节
光有重试还不够,还得讲究策略:
props.put("retry.backoff.ms", 1000); // 每次重试间隔1秒
props.put("delivery.timeout.ms", 120000); // 总尝试时间2分钟
props.put("enable.idempotence", true); // 启用幂等性
特别注意enable.idempotence,它就像消息的身份证号,能避免网络抖动导致的重复消息。不过启用它需要同时满足:
acks=allretries>0max.in.flight.requests.per.connection=1
三、客户端缓存的双刃剑
Kafka生产者有个内存缓冲区,默认32MB。就像快递公司的中转仓库,好处是能应对突发流量,坏处是如果客户端崩溃,仓库里的"货物"就全丢了。
3.1 缓冲区调优示例
props.put("buffer.memory", 67108864); // 缓冲区扩容到64MB
props.put("batch.size", 32768); // 批次大小调整为32KB
props.put("linger.ms", 100); // 最多等待100ms凑批
这三个参数需要根据业务特点调整:
- 高吞吐场景:增大
batch.size和linger.ms - 低延迟场景:减小
linger.ms - 内存敏感场景:控制
buffer.memory
3.2 崩溃前的最后挣扎
建议添加JVM关闭钩子保存未发送消息:
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
producer.flush(); // 强制发送缓冲区数据
producer.close(Duration.ofSeconds(30)); // 优雅关闭
}));
四、消费者端的容错设计
消费者遇到网络问题同样会丢消息,关键是要管理好offset这个"书签"。
4.1 手动提交的正确姿势
props.put("enable.auto.commit", "false"); // 关闭自动提交
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record); // 处理消息
consumer.commitSync(Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1))); // 处理完立即提交
}
}
} catch (Exception e) {
storeOffsetsToDB(); // 异常时保存offset到数据库
}
4.2 消费者重平衡监听器
consumer.subscribe(Collections.singleton("important_data"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
saveRecoveryPoints(partitions); // 失去分区前保存状态
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
loadRecoveryPoints(partitions); // 获得分区后恢复状态
}
});
五、终极防御:本地持久化队列
对于金融级重要数据,可以引入本地存储作为二级缓存:
// 发送前先写本地
localQueue.saveToDisk("messageId", message);
try {
producer.send(record, (metadata, exception) -> {
if (exception == null) {
localQueue.markAsSent("messageId"); // 成功发送后标记
}
});
} catch (Exception e) {
logger.error("发送失败,消息已持久化到本地", e);
}
// 后台线程处理失败消息
new Thread(() -> {
while (true) {
List<FailedMessage> failed = localQueue.getFailedMessages();
failed.forEach(msg -> retrySend(msg));
Thread.sleep(60000); // 每分钟检查一次
}
}).start();
六、监控与运维要点
关键监控指标:
- 生产者:record-error-rate, record-retry-rate
- 消费者:consumer-lag, commit-rate
运维建议:
# 检查消费者滞后情况 kafka-consumer-groups --bootstrap-server kafka:9092 \ --group important_group --describe网络优化:
- 保持长连接:调整
socket.connection.setup.timeout.ms - TCP优化:调整
send.buffer.bytes和receive.buffer.bytes
- 保持长连接:调整
七、方案选型指南
- 普通业务:开启重试+幂等性即可
- 重要业务:增加本地持久化队列
- 金融业务:需要实现端到端事务
最后记住,没有银弹。我曾经遇到过一个案例:客户端配置了完善的重试机制,但因为DNS缓存问题导致重试全部指向错误的IP。所以网络问题需要全方位防御。