1. 开发环境准备

推荐使用Java 8+和Elasticsearch 7.x版本组合。选择官方的High Level REST Client客户端:

<!-- Maven依赖配置 -->
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.16.3</version>
</dependency>

2. 基础CRUD操作

2.1 创建文档(Create)

// 创建客户端实例
RestHighLevelClient client = new RestHighLevelClient(
    RestClient.builder(new HttpHost("localhost", 9200, "http")));

// 创建索引请求
IndexRequest request = new IndexRequest("users");
request.id("1001");  // 指定文档ID
Map<String, Object> user = new HashMap<>();
user.put("name", "王伟");
user.put("age", 28);
user.put("tags", Arrays.asList("开发", "篮球"));
request.source(user);  // 设置文档内容

// 同步执行请求
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
System.out.println("文档版本:" + response.getVersion());

2.2 读取文档(Read)

GetRequest request = new GetRequest("users", "1001");
GetResponse response = client.get(request, RequestOptions.DEFAULT);
if (response.isExists()) {
    String source = response.getSourceAsString();
    System.out.println("获取文档:" + source);
} else {
    System.out.println("文档不存在");
}

2.3 更新文档(Update)

UpdateRequest request = new UpdateRequest("users", "1001");
Map<String, Object> updates = new HashMap<>();
updates.put("age", 29);
request.doc(updates);  // 部分字段更新

UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
System.out.println("新版本号:" + response.getVersion());

2.4 删除文档(Delete)

DeleteRequest request = new DeleteRequest("users", "1001");
DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
System.out.println("删除结果:" + response.getResult());

3. 批量操作进阶

3.1 批量写入处理

BulkRequest bulkRequest = new BulkRequest();
for (int i = 1; i <= 1000; i++) {
    IndexRequest request = new IndexRequest("logs")
        .id(String.valueOf(i))
        .source("level", "INFO",
                "message", "系统启动完成["+i+"]",
                "timestamp", new Date());
    bulkRequest.add(request);
}

BulkResponse responses = client.bulk(bulkRequest, RequestOptions.DEFAULT);
if (responses.hasFailures()) {
    System.out.println("部分操作失败:" + responses.buildFailureMessage());
}

3.2 批量更新策略

BulkProcessor processor = BulkProcessor.builder(
    (request, bulkListener) -> 
        client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
    new BulkProcessor.Listener() {
        @Override
        public void beforeBulk(long executionId, BulkRequest request) {
            System.out.println("即将执行"+request.numberOfActions()+"项操作");
        }
    })
    .setBulkActions(500)  // 每500条执行一次
    .setConcurrentRequests(2)  // 允许并发请求
    .build();

4. 关联技术深入

4.1 版本控制机制

使用乐观锁确保数据一致性:

UpdateRequest request = new UpdateRequest("inventory", "001")
    .version(3)  // 当前已知版本号
    .doc(Collections.singletonMap("stock", 50));

try {
    client.update(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        System.out.println("文档已被其他进程修改");
    }
}

5. 应用场景分析

  • 日志采集系统:使用Bulk API处理海量日志
  • 商品库存管理:结合版本控制实现并发扣减
  • 用户画像更新:部分更新提升性能

6. 技术优缺点对比

优势特性

  • 分布式扩展性:轻松处理PB级数据
  • 近实时检索:1秒级延迟即可检索最新数据
  • 自动分片:简化集群管理

局限考量

  • 事务支持有限:不适合强一致性场景
  • 深度分页性能:避免过大的from+size查询
  • 字段类型固定:字段类型定义后不可修改

7. 核心注意事项

  1. 索引设计规范:预定义字段类型和分词器
  2. 批量请求调优:控制在5-15MB/次
  3. 线程资源管理:使用try-with-resources
  4. 错误重试机制:处理429(限流)错误码

8. 实战总结

本文系统讲解了Java操作Elasticsearch的CRUD实现与批量处理技术。建议重点关注Bulk Processor的吞吐量优化,结合实际业务场景选择同步/异步操作方式。通过实践案例验证,合理配置批量参数可以使写入性能提升3-5倍。