引子

作为分布式搜索领域的扛把子选手,Elasticsearch在企业级应用中经常需要面对跨集群迁移的场景。今天咱们就来唠唠这个话题,通过真实案例手把手演示三种主流迁移方案。


一、什么时候需要跨集群迁移?

  1. 机房搬迁:服务器从自建机房迁移到云平台
  2. 环境隔离:开发环境数据同步到测试环境
  3. 版本升级:老集群ES 6.x迁移到新集群ES 8.x
  4. 数据合并:多个业务集群合并到统一数据平台

去年我们团队就遇到过典型场景:某电商平台的日志集群需要从阿里云迁移到华为云,涉及20TB日志数据和实时业务的无缝切换。


二、三大迁移方案实战演示

(基于Elasticsearch 7.x)

方案1:Snapshot & Restore(推荐方案)
PUT _snapshot/my_backup
{
  "type": "hdfs",
  "settings": {
    "uri": "hdfs://namenode:8020/",
    "path": "/elasticsearch/backups",
    "conf.dfs.client.read.shortcircuit": "false"
  }
}

# 创建快照(带时间戳)
PUT _snapshot/my_backup/%3Csnapshot_%7Bnow%2Fd%7D%3E
{
  "indices": "nginx-*",
  "ignore_unavailable": true,
  "include_global_state": false
}

# 查看快照状态(直到STATE显示SUCCESS)
GET _snapshot/my_backup/_status

# 在新集群注册同一仓库后恢复
POST _snapshot/my_backup/snapshot_2023-08-01/_restore
{
  "indices": "nginx-*",
  "rename_pattern": "(.+)",
  "rename_replacement": "restored_$1"
}

方案特点:

  • 支持断点续传
  • 可保留原数据结构
  • 适合TB级数据迁移

方案2:Reindex API(跨集群操作)
# 在目标集群配置白名单
PUT _cluster/settings
{
  "persistent": {
    "reindex.remote.whitelist": "old_cluster:9200"
  }
}

# 执行跨集群重建索引
POST _reindex
{
  "source": {
    "remote": {
      "host": "http://old_cluster:9200",
      "username": "elastic",
      "password": "changeme"
    },
    "index": "order_logs",
    "size": 5000
  },
  "dest": {
    "index": "order_logs_v2",
    "op_type": "create"
  }
}

使用场景:

  • 需要修改索引mapping
  • 过滤部分数据(可添加query条件)
  • 中小规模数据迁移(建议单次操作不超过10亿文档)

方案3:Logstash管道运输
input {
  elasticsearch {
    hosts => ["http://old_cluster:9200"]
    index => "user_behavior*"
    query => '{ "query": { "range": { "@timestamp": { "gte": "now-30d/d" } } } }'
    docinfo => true
  }
}

filter {
  # 数据清洗示例:移除敏感字段
  mutate {
    remove_field => ["credit_card", "phone_number"]
  }
}

output {
  elasticsearch {
    hosts => ["http://new_cluster:9200"]
    index => "%{[@metadata][_index]}_migrated"
    document_type => "_doc"
  }
}

适用情况:

  • 需要实时增量迁移
  • 数据格式转换需求复杂
  • 需要添加ETL处理逻辑

三、方案选型对照表

指标 Snapshot Reindex Logstash
数据量级 不限 <1TB <500GB
网络带宽 低要求 高要求 中要求
迁移速度 最快 中等 较慢
业务影响 需停写 无影响 无影响
版本兼容性 严格 较宽松 最宽松

四、避坑指南

  1. 版本鸿沟问题
  • ES大版本间快照不兼容(如6.x快照无法恢复到8.x)
  • 解决方案:通过Reindex API进行跨版本重建
  1. 字段类型暗雷
# 错误示例:keyword字段被自动推断为text
PUT new_index/_doc/1
{
  "product_id": "SKU-123"  # 在动态mapping下可能变成text类型
}

# 正确做法:提前创建目标索引
PUT new_index
{
  "mappings": {
    "properties": {
      "product_id": {
        "type": "keyword",
        "ignore_above": 256
      }
    }
  }
}
  1. 网络性能优化
  • 使用专用传输线路(避免走公网)
  • 调整TCP参数(示例优化配置):
# elasticsearch.yml
network.tcp.no_delay: true
network.tcp.keep_alive: true
network.tcp.reuse_address: true
  1. 权限管控要点
  • 最小化授权原则(迁移账号只给必要权限)
  • 临时token使用(Kibana示例):
POST /_security/oauth2/token
{
  "grant_type": "password",
  "username": "migration_user",
  "password": "TempP@ssw0rd!"
}

五、总结与展望

经过多个项目的实战检验,我们总结出以下最佳实践:

  1. 超过1TB数据优先使用Snapshot方案
  2. 需要转换数据结构时采用Reindex+Painless脚本
  3. 实时增量迁移使用Logstash配合消息队列
  4. 务必进行数据校验:
# 数据量校验
GET _cat/count/old_index?v
GET _cat/count/new_index?v

# 字段抽样检查
POST new_index/_search
{
  "size": 0,
  "aggs": {
    "sample": {
      "sampler": {
        "shard_size": 200
      },
      "aggs": {
        "fields_check": {
          "terms": {
            "script": {
              "source": """
                def fields = [];
                for (entry in params['_source'].entrySet()) {
                  fields.add(entry.getKey() + ':' + entry.getValue().getClass().getSimpleName());
                }
                return fields;
              """
            },
            "size": 100
          }
        }
      }
    }
  }
}

未来随着ES官方对跨集群搜索(CCS)功能的增强,迁移工作可能会变得更加智能化。但无论技术如何发展,对数据安全的敬畏之心永远不能丢。