1. 开发环境准备
推荐使用Java 8+和Elasticsearch 7.x版本组合。选择官方的High Level REST Client客户端:
<!-- Maven依赖配置 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.16.3</version>
</dependency>
2. 基础CRUD操作
2.1 创建文档(Create)
// 创建客户端实例
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http")));
// 创建索引请求
IndexRequest request = new IndexRequest("users");
request.id("1001"); // 指定文档ID
Map<String, Object> user = new HashMap<>();
user.put("name", "王伟");
user.put("age", 28);
user.put("tags", Arrays.asList("开发", "篮球"));
request.source(user); // 设置文档内容
// 同步执行请求
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
System.out.println("文档版本:" + response.getVersion());
2.2 读取文档(Read)
GetRequest request = new GetRequest("users", "1001");
GetResponse response = client.get(request, RequestOptions.DEFAULT);
if (response.isExists()) {
String source = response.getSourceAsString();
System.out.println("获取文档:" + source);
} else {
System.out.println("文档不存在");
}
2.3 更新文档(Update)
UpdateRequest request = new UpdateRequest("users", "1001");
Map<String, Object> updates = new HashMap<>();
updates.put("age", 29);
request.doc(updates); // 部分字段更新
UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
System.out.println("新版本号:" + response.getVersion());
2.4 删除文档(Delete)
DeleteRequest request = new DeleteRequest("users", "1001");
DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
System.out.println("删除结果:" + response.getResult());
3. 批量操作进阶
3.1 批量写入处理
BulkRequest bulkRequest = new BulkRequest();
for (int i = 1; i <= 1000; i++) {
IndexRequest request = new IndexRequest("logs")
.id(String.valueOf(i))
.source("level", "INFO",
"message", "系统启动完成["+i+"]",
"timestamp", new Date());
bulkRequest.add(request);
}
BulkResponse responses = client.bulk(bulkRequest, RequestOptions.DEFAULT);
if (responses.hasFailures()) {
System.out.println("部分操作失败:" + responses.buildFailureMessage());
}
3.2 批量更新策略
BulkProcessor processor = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
System.out.println("即将执行"+request.numberOfActions()+"项操作");
}
})
.setBulkActions(500) // 每500条执行一次
.setConcurrentRequests(2) // 允许并发请求
.build();
4. 关联技术深入
4.1 版本控制机制
使用乐观锁确保数据一致性:
UpdateRequest request = new UpdateRequest("inventory", "001")
.version(3) // 当前已知版本号
.doc(Collections.singletonMap("stock", 50));
try {
client.update(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
System.out.println("文档已被其他进程修改");
}
}
5. 应用场景分析
- 日志采集系统:使用Bulk API处理海量日志
- 商品库存管理:结合版本控制实现并发扣减
- 用户画像更新:部分更新提升性能
6. 技术优缺点对比
优势特性
- 分布式扩展性:轻松处理PB级数据
- 近实时检索:1秒级延迟即可检索最新数据
- 自动分片:简化集群管理
局限考量
- 事务支持有限:不适合强一致性场景
- 深度分页性能:避免过大的from+size查询
- 字段类型固定:字段类型定义后不可修改
7. 核心注意事项
- 索引设计规范:预定义字段类型和分词器
- 批量请求调优:控制在5-15MB/次
- 线程资源管理:使用try-with-resources
- 错误重试机制:处理429(限流)错误码
8. 实战总结
本文系统讲解了Java操作Elasticsearch的CRUD实现与批量处理技术。建议重点关注Bulk Processor的吞吐量优化,结合实际业务场景选择同步/异步操作方式。通过实践案例验证,合理配置批量参数可以使写入性能提升3-5倍。