一、当数据玩起了捉迷藏

不知道你有没有遇到过这样的情况:明明看着数据从Kafka生产者那边欢快地蹦出去了,结果到Elasticsearch这边一查,咦?怎么少了几个?这种数据丢失的情况就像是你网购时快递丢件,让人又急又气。今天我们就来当一回"数据侦探",好好排查下这个问题。

先来看个真实案例。某电商平台的用户行为日志采集系统,用Kafka做消息队列,Elasticsearch做存储和分析。某天运营同学突然发现:"昨天的加购数据怎么比平时少了15%?"技术团队顿时炸开了锅。

二、排查路线图:顺藤摸瓜

2.1 第一站:Kafka生产者是否靠谱

首先得确认数据是不是真的发出去了。我们来看个Java写的Kafka生产者示例:

// Kafka生产者配置示例
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092"); 
props.put("acks", "all"); // 确保所有副本都收到消息
props.put("retries", 3); // 失败重试3次
props.put("max.in.flight.requests.per.connection", 1); // 防止消息乱序
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

try {
    // 发送消息并获取返回的Future对象
    Future<RecordMetadata> future = producer.send(new ProducerRecord<>(
        "user_behavior", 
        "user123", 
        "{\"action\":\"add_to_cart\",\"item_id\":\"sku1001\"}"
    ));
    
    // 同步等待发送结果
    RecordMetadata metadata = future.get();
    System.out.println("消息发送成功,offset: " + metadata.offset());
} catch (Exception e) {
    System.out.println("消息发送失败: " + e.getMessage());
} finally {
    producer.close();
}

关键点说明:

  1. acks=all确保消息被所有ISR副本确认
  2. retries=3给了足够的重试机会
  3. 同步等待发送结果可以立即发现发送失败

2.2 第二站:Kafka消费者是否偷懒

确认生产者没问题后,我们来看看消费者这边。常见的坑是消费者自动提交offset但实际处理失败。来看个Java消费者的正确姿势:

// Kafka消费者配置示例
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("group.id", "es_loader");
props.put("enable.auto.commit", "false"); // 关闭自动提交
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user_behavior"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            try {
                // 1. 处理消息(写入ES)
                indexToElasticsearch(record.value());
                
                // 2. 处理成功后再提交offset
                consumer.commitSync();
            } catch (Exception e) {
                // 记录失败日志,但不提交offset
                System.err.println("处理消息失败: " + record.value());
            }
        }
    }
} finally {
    consumer.close();
}

这里的关键是:

  1. 关闭自动提交(enable.auto.commit=false)
  2. 确保ES写入成功后再手动提交offset
  3. 失败时不提交offset,让消息可以被重新消费

三、Elasticsearch的接收陷阱

3.1 批量写入的优化与风险

为了提高性能,我们通常会使用批量写入。但这里有个平衡点需要把握:

// ES批量写入示例
BulkRequest bulkRequest = new BulkRequest();
int batchSize = 0;

while (有数据) {
    String message = 获取下一条Kafka消息();
    IndexRequest request = new IndexRequest("user_behavior")
        .source(message, XContentType.JSON);
    bulkRequest.add(request);
    
    if (++batchSize >= 500) { // 每500条批量提交一次
        BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
        if (bulkResponse.hasFailures()) {
            // 处理失败的文档
            for (BulkItemResponse item : bulkResponse.getItems()) {
                if (item.isFailed()) {
                    System.err.println("文档写入失败: " + item.getFailureMessage());
                }
            }
        }
        bulkRequest = new BulkRequest();
        batchSize = 0;
    }
}

// 提交剩余文档
if (batchSize > 0) {
    client.bulk(bulkRequest, RequestOptions.DEFAULT);
}

注意事项:

  1. 批量大小要适中,太大可能导致超时
  2. 必须检查批量响应中的失败项
  3. 程序退出前要提交剩余文档

3.2 重试机制的智慧

网络波动是常有的事,我们需要一个聪明的重试策略:

// 带重试的ES写入方法
public void indexWithRetry(String index, String document, int maxRetries) {
    int retryCount = 0;
    while (retryCount <= maxRetries) {
        try {
            IndexRequest request = new IndexRequest(index)
                .source(document, XContentType.JSON);
            client.index(request, RequestOptions.DEFAULT);
            return; // 成功则退出
        } catch (ElasticsearchStatusException e) {
            if (e.status() == RestStatus.TOO_MANY_REQUESTS) {
                // 429错误需要等待
                Thread.sleep(1000 * (long) Math.pow(2, retryCount));
                retryCount++;
            } else {
                throw e; // 其他错误直接抛出
            }
        } catch (Exception e) {
            retryCount++;
            if (retryCount > maxRetries) {
                throw new RuntimeException("写入ES失败,已达最大重试次数", e);
            }
            Thread.sleep(1000 * retryCount); // 指数退避
        }
    }
}

这个重试策略的特点是:

  1. 对429错误特别处理
  2. 采用指数退避算法
  3. 限制最大重试次数

四、全链路监控与数据核对

4.1 关键指标监控

我们需要在以下几个关键点埋点:

  1. Kafka生产者发送成功/失败计数
  2. Kafka消费者拉取消息计数
  3. ES写入成功/失败计数
  4. 端到端延迟监控

4.2 定期数据核对

可以写个简单的核对脚本,比如:

// 数据核对脚本示例
public void verifyData() {
    // 1. 获取Kafka最新offset
    long kafkaLatestOffset = getKafkaLatestOffset("user_behavior");
    
    // 2. 获取ES中最后处理的消息offset(需要存储)
    long esLatestOffset = getEsLatestOffset();
    
    // 3. 比较两者差异
    if (kafkaLatestOffset > esLatestOffset) {
        long missing = kafkaLatestOffset - esLatestOffset;
        System.out.println("发现数据缺失,缺少" + missing + "条消息");
        
        // 4. 触发补偿机制
        compensateMissingData(esLatestOffset + 1, kafkaLatestOffset);
    }
}

五、经验总结与最佳实践

经过多次实战,我总结了以下几点经验:

  1. 生产者配置黄金法则

    • 一定要设置acks=all
    • 合理设置retriesretry.backoff.ms
    • 重要数据建议同步等待发送结果
  2. 消费者注意事项

    • 永远不要使用自动提交offset
    • 处理逻辑和offset提交要在同一个事务中
    • 考虑使用Kafka的exactly-once语义
  3. ES写入优化

    • 批量大小控制在500-1000条
    • 使用带重试的写入策略
    • 监控bulk队列和拒绝情况
  4. 全链路保障

    • 实施端到端监控
    • 定期数据核对
    • 建立补偿机制

六、不同场景下的技术选型思考

如果你的业务是:

  • 金融级数据:考虑使用Kafka事务+ES的refresh=wait_for
  • 海量日志:可以适当降低可靠性要求换取吞吐量
  • 实时分析:可以结合Kafka Streams进行预处理

记住,没有银弹,只有最适合的方案。