一、当数据玩起了捉迷藏
不知道你有没有遇到过这样的情况:明明看着数据从Kafka生产者那边欢快地蹦出去了,结果到Elasticsearch这边一查,咦?怎么少了几个?这种数据丢失的情况就像是你网购时快递丢件,让人又急又气。今天我们就来当一回"数据侦探",好好排查下这个问题。
先来看个真实案例。某电商平台的用户行为日志采集系统,用Kafka做消息队列,Elasticsearch做存储和分析。某天运营同学突然发现:"昨天的加购数据怎么比平时少了15%?"技术团队顿时炸开了锅。
二、排查路线图:顺藤摸瓜
2.1 第一站:Kafka生产者是否靠谱
首先得确认数据是不是真的发出去了。我们来看个Java写的Kafka生产者示例:
// Kafka生产者配置示例
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("acks", "all"); // 确保所有副本都收到消息
props.put("retries", 3); // 失败重试3次
props.put("max.in.flight.requests.per.connection", 1); // 防止消息乱序
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
try {
// 发送消息并获取返回的Future对象
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(
"user_behavior",
"user123",
"{\"action\":\"add_to_cart\",\"item_id\":\"sku1001\"}"
));
// 同步等待发送结果
RecordMetadata metadata = future.get();
System.out.println("消息发送成功,offset: " + metadata.offset());
} catch (Exception e) {
System.out.println("消息发送失败: " + e.getMessage());
} finally {
producer.close();
}
关键点说明:
acks=all确保消息被所有ISR副本确认retries=3给了足够的重试机会- 同步等待发送结果可以立即发现发送失败
2.2 第二站:Kafka消费者是否偷懒
确认生产者没问题后,我们来看看消费者这边。常见的坑是消费者自动提交offset但实际处理失败。来看个Java消费者的正确姿势:
// Kafka消费者配置示例
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("group.id", "es_loader");
props.put("enable.auto.commit", "false"); // 关闭自动提交
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user_behavior"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// 1. 处理消息(写入ES)
indexToElasticsearch(record.value());
// 2. 处理成功后再提交offset
consumer.commitSync();
} catch (Exception e) {
// 记录失败日志,但不提交offset
System.err.println("处理消息失败: " + record.value());
}
}
}
} finally {
consumer.close();
}
这里的关键是:
- 关闭自动提交(
enable.auto.commit=false) - 确保ES写入成功后再手动提交offset
- 失败时不提交offset,让消息可以被重新消费
三、Elasticsearch的接收陷阱
3.1 批量写入的优化与风险
为了提高性能,我们通常会使用批量写入。但这里有个平衡点需要把握:
// ES批量写入示例
BulkRequest bulkRequest = new BulkRequest();
int batchSize = 0;
while (有数据) {
String message = 获取下一条Kafka消息();
IndexRequest request = new IndexRequest("user_behavior")
.source(message, XContentType.JSON);
bulkRequest.add(request);
if (++batchSize >= 500) { // 每500条批量提交一次
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
if (bulkResponse.hasFailures()) {
// 处理失败的文档
for (BulkItemResponse item : bulkResponse.getItems()) {
if (item.isFailed()) {
System.err.println("文档写入失败: " + item.getFailureMessage());
}
}
}
bulkRequest = new BulkRequest();
batchSize = 0;
}
}
// 提交剩余文档
if (batchSize > 0) {
client.bulk(bulkRequest, RequestOptions.DEFAULT);
}
注意事项:
- 批量大小要适中,太大可能导致超时
- 必须检查批量响应中的失败项
- 程序退出前要提交剩余文档
3.2 重试机制的智慧
网络波动是常有的事,我们需要一个聪明的重试策略:
// 带重试的ES写入方法
public void indexWithRetry(String index, String document, int maxRetries) {
int retryCount = 0;
while (retryCount <= maxRetries) {
try {
IndexRequest request = new IndexRequest(index)
.source(document, XContentType.JSON);
client.index(request, RequestOptions.DEFAULT);
return; // 成功则退出
} catch (ElasticsearchStatusException e) {
if (e.status() == RestStatus.TOO_MANY_REQUESTS) {
// 429错误需要等待
Thread.sleep(1000 * (long) Math.pow(2, retryCount));
retryCount++;
} else {
throw e; // 其他错误直接抛出
}
} catch (Exception e) {
retryCount++;
if (retryCount > maxRetries) {
throw new RuntimeException("写入ES失败,已达最大重试次数", e);
}
Thread.sleep(1000 * retryCount); // 指数退避
}
}
}
这个重试策略的特点是:
- 对429错误特别处理
- 采用指数退避算法
- 限制最大重试次数
四、全链路监控与数据核对
4.1 关键指标监控
我们需要在以下几个关键点埋点:
- Kafka生产者发送成功/失败计数
- Kafka消费者拉取消息计数
- ES写入成功/失败计数
- 端到端延迟监控
4.2 定期数据核对
可以写个简单的核对脚本,比如:
// 数据核对脚本示例
public void verifyData() {
// 1. 获取Kafka最新offset
long kafkaLatestOffset = getKafkaLatestOffset("user_behavior");
// 2. 获取ES中最后处理的消息offset(需要存储)
long esLatestOffset = getEsLatestOffset();
// 3. 比较两者差异
if (kafkaLatestOffset > esLatestOffset) {
long missing = kafkaLatestOffset - esLatestOffset;
System.out.println("发现数据缺失,缺少" + missing + "条消息");
// 4. 触发补偿机制
compensateMissingData(esLatestOffset + 1, kafkaLatestOffset);
}
}
五、经验总结与最佳实践
经过多次实战,我总结了以下几点经验:
生产者配置黄金法则:
- 一定要设置
acks=all - 合理设置
retries和retry.backoff.ms - 重要数据建议同步等待发送结果
- 一定要设置
消费者注意事项:
- 永远不要使用自动提交offset
- 处理逻辑和offset提交要在同一个事务中
- 考虑使用Kafka的exactly-once语义
ES写入优化:
- 批量大小控制在500-1000条
- 使用带重试的写入策略
- 监控bulk队列和拒绝情况
全链路保障:
- 实施端到端监控
- 定期数据核对
- 建立补偿机制
六、不同场景下的技术选型思考
如果你的业务是:
- 金融级数据:考虑使用Kafka事务+ES的refresh=wait_for
- 海量日志:可以适当降低可靠性要求换取吞吐量
- 实时分析:可以结合Kafka Streams进行预处理
记住,没有银弹,只有最适合的方案。
评论