天天看点

ES 大规模数据迁移

背景

对于生产环境中产生的数据,可能会存在于不同的ES集群,同时随着业务迭代、数据规划改变等各种原因,可能会需要对现存ES中的数据进行迁移。

迁移方式分类

数据的迁移从操作方面可以分为以下一些主流的方式

分类 方式 ES版本 人工干预 使用门槛 备注
自动迁移 设置ILM配合nodeattr 7.x以上 可能会存在license问题
半自动迁移 不同节点分片/副本移动 不限 需要规划allocate顺序
手动迁移 elasticsearch-dump 不限 github地址
手动迁移 snapshot+restore 6.x以上 需要所有节点都注册共享存储
手动迁移 自研脚本 不限(注意版本兼容) 自由度高开发成本高

迁移注意事项

  1. 迁出集群和迁入集群的版本兼容问题
    1. 不同版本的ES可能会存在索引结构不兼容
      1. 如2.x中支持的多type,在6.x里面被废弃,到7里会建议只使用

        _doc

    2. 不同版本间的集群、索引、字段配置不兼容
      1. 如2.x中不索引的方式是

        "index":"not_analyze"

        ,到了6.x里变成了

        "index":false

    3. 数据字段种类不兼容的情况
      1. 如2.x中的

        string

        在6.x以上被分为了

        text

        keyword

    4. snapshot + restore方式只能从低版本的集群往高版本的集群里迁移
    5. 不同版本的ES插件会指明适用ES版本,如果版本之间差异过大会出现不兼容
    6. 对于不同版本的客户端
      1. 低版本的客户端中有些接口在高版本中被废弃
      2. 高版本客户端会默认在query里拼上一些低版本不支持的参数
  2. 迁出索引自身配置及配套配置
    1. 一般索引会包含

      settings

      mappings

      等,在迁移的时候要注意上面提到的版本兼容的问题
    2. 除了索引自身的配置之外,还要考虑在创建索引的时候是否预设了包括 pipeline (

      GET http://ip:port/_ingest/pipeline

      )和 template (

      GET http://ip:9200/_template

      )在内的数据预处理配置
    3. 普通的 pipeline 和 template 可能会包含通配符对包括数据字段、index名称等进行条件筛选,所以有可能会存在 pipeline 相互引用和 template 相互覆盖(继承)的问题
  3. 数据是否需要双写
    1. 数据挎集群双写甚至多写的时候需要考虑是否存在数据异构的问题
      1. 比如双写2.x和6.x的集群,2.x中的多type在6.x里是不支持的
      2. 有些数据配置、甚至之前使用的client可能不兼容,需要区别处理
    2. 数据双写中的数据一致性问题
    3. 数据双写带来的额外的网络、磁盘等开销
    4. 数据双写中 consumer/sinker 间的数据竞争
  4. 数据是否需要挎集群读取
    1. 存量数据是否需要应对检索
    2. 增量数据是否需要应对检索
    3. 当同一条数据同时出现在源/目标索引中,是否需要进行去重
  5. 大量数据迁移的时候会对集群产生负载,需要优先保证搜索的稳定性还是优先保证数据导出/导入的速度

迁移手顺

Snapshot方案

  1. 在机器上挂载nfs盘(其他共享存储请参考官网文档)
    1. 安装nfs相关软件
      1. apt update && apt install nfs-common

        => ubuntu
      2. yum install -y nfs-common

        => centos
    2. 创建挂载点文件夹
      mkdir /mnt/disk1
                 
    3. 修改挂载配置
      vi /etc/fstab
                 
    4. 挂载磁盘
      mount -a
                 
    5. (可选)在挂载磁盘中创建子目录
      mkdir /mnt/disk1/logs
                 
  2. 把对应的共享路径配置进ES里(如果要使用子路径,需要把子路径也注册在config文件里)
    echo 'path.repo: ["/mnt/disk1"]' >> /$ES_HOME/config/elasticsearch.yml
               
  3. 在ES集群中进行 rolling restart 使配置生效
    1. (可选)关闭部分不需要的索引
      curl -X POST http://ip:port/$index/_close
                 
    2. (可选)把索引分片挪出待重启节点,把副本关掉
      curl -X PUT -H 'content-type: application/json;charset=UTF-8' -d '{
              "settings":{
                  "index.routing.allocation.exclude._name": "$node_name",
                  "index.routing.allocation.exclude._ip_": "$node_ip",
                  "index.number_of_replicas": 0
              }
          }' http://ip:port/$index/_settings
                 
    3. 查看分片状态
      curl -X GET http://ip:port/_cat/shards
                 
      ⬆️正在迁移的分片显示为
      $old_index_name $shard_num $shard_type RELOCATING $doc_num $index_size $old_node_ip $old_node_name -> $new_node_ip $new_index_id $new_node_name
      
      ==> 
      
      index1 6 p RELOCATING 3929901 3gb 192.168.100.1 node-121 -> 192.168.100.2 fjfF3dYOQgGMNHL1g08Few node-103
                 
    4. 没有分片在待迁移节点上之后,开始重启
      1. 检查一下目标进程
        ps -ef | grep elasticsearch | grep -v grep
        dev      16043     1 19  2019 ?        68-12:37:06 /home/javen/jdk1.8.0_171/bin/java -Xms16g -Xmx16g -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -Des.networkaddress.cache.ttl=60 -Des.networkaddress.cache.negative.ttl=10 -XX:+AlwaysPreTouch -Xss1m -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Djna.nosys=true -XX:-OmitStackTraceInFastThrow -Dio.netty.noUnsafe=true -Dio.netty.noKeySetOptimization=true -Dio.netty.recycler.maxCapacityPerThread=0 -Dlog4j.shutdownHookEnabled=false -Dlog4j2.disable.jmx=true -Djava.io.tmpdir=/tmp/elasticsearch-946212112948700879 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=data -XX:ErrorFile=logs/hs_err_pid%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -XX:+PrintGCApplicationStoppedTime -Xloggc:logs/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=32 -XX:GCLogFileSize=64m -Des.path.home=/$ES_HOME -Des.path.conf=/$ES_HOME/config -Des.distribution.flavor=default -Des.distribution.type=tar -cp /$ES_HOME/lib/* org.elasticsearch.bootstrap.Elasticsearch -d
        dev      16069 16043  0  2019 ?        00:00:00 /$ES_HOME/modules/x-pack-ml/platform/linux-x86_64/bin/controller
                   
      2. 杀进程重启(opt1)
        ps -ef | grep elasticsearch | grep -v grep | awk '{ print $2 }' | xargs kill -15
                   
        ./$ES_HOME/bin/elasticsearch -d
                   
      3. 服务重启(opt2)
        sudo systemctl restart elasticsearch
                   
  4. 把共享存储路径注册为snapshot仓库
    1. 注册存储仓库(如果这里的location需要被注册在config文件里)
      curl -X PUT -H 'content-type: application/json;charset=UTF-8' -d '{
          "type": "fs",
          "settings": {
              "location": "/mnt/disk1"
          }
      }' http://ip:port/_snapshot/es_backup
                 
      ⚠️ location:刚才我们注册的共享存储的路径,需要集群中所有节点都能访问到的共享存储位置`
    2. 检查是否成功,正常的返回是
      {
          "acknowledged" : true
      }
                 
      通过命令查看注册状态,正常结果应该和我们设置(PUT)进去的一致
      curl -X GET /_snapshot/es_backup
                 
      结果为:
      {
          "project" : {
              "type" : "fs",
              "settings" : {
                  "location" : "/mnt/disk1"
              }
          }
      }
                 
  5. 创建存储
    1. 执行命令:
      curl -X PUT -H 'content-type: application/json;charset=UTF-8' -d 
      '{
          "indices": "index_1,index_2,index_tag_*",
          "ignore_unavailable": true,
          "include_global_state": false,
          "metadata": {
              "taken_by": "steven",
              "taken_because": "backup before migration"
          }
      }' http://ip:port/_snapshot/es_backup/my_backup_20200820
                 
      • indices:需要迁移的索引名称,可以支持通配符
      • ignore_unavailable:忽略不可用索引
      • include_global_state:包含全局配置(包括索引模板之类的,但是这里需要考虑,如果新集群中索引模板需要修改建议这里设为false)
      • metadata:制作这个snapshot的一些说明,不会影响数据
    2. 校验一下数据制作结果
      curl -X GET http://ip:port/_snapshot/es_backup/my_backup_20200820
      {
          "snapshots" : [
              {
                  "snapshot" : "my_backup_20200820",
                  "uuid" : "m-uZwk7zTOiPzT0FO-hVLw",
                  "version_id" : 6080099,
                  "version" : "6.8.0",
                  "indices" : [
                      "index_1",
                      "index_2",
                      "index_tag_1",
                      "index_tag_2"
                  ],
                  "include_global_state" : false,
                  "state" : "IN_PROGRESS",
                  "start_time" : "2020-08-20T03:14:08.517Z",
                  "start_time_in_millis" : 1597893248517,
                  "end_time" : "1970-01-01T00:00:00.000Z",
                  "end_time_in_millis" : 0,
                  "duration_in_millis" : -1597893248517,
                  "failures" : [ ],
                  "shards" : {
                      "total" : 0,
                      "failed" : 0,
                      "successful" : 0
                  }
              }
          ]
      }
                 
      • indices:这个snapshot里包含的索引,需要检查里面是不是包含了你想要的所有索引(

        GET _cat/indices

        )
      • state:snapshot创建状态
        • IN_PROGRESS:正在进行
          • 可以通过添加

            wait_for_completion=true

            来等他进行完
          • PUT /_snapshot/es_backup/my_backup_20200820?wait_for_completion=true

        • PARTIAL:部分完成
      • failures:snapshot创建失败的列表
        {
            "index" : "index1",
            "index_uuid" : "index1",
            "shard_id" : 4,
            "reason" : "IndexShardSnapshotFailedException[Failed to snapshot]; nested: ElasticsearchException[failed to create blob container]; nested: AccessDeniedException[/mnt/disk1/logs/indices/0szV2p6oTOCzjNi-r1QnNg/4]; ",
            "node_id" : "fjfF3dYOQgGMNHL1g08Few",
            "status" : "INTERNAL_SERVER_ERROR"
        }
                   
        • index:索引名称
        • shard_id:分片编号
        • reason:失败原因
        • node_id:集群节点id
        • status:错误种类
      • start_time/end_time/duration_in_millis:创建的开始、结束、持续时间
      • shards:
        • total:总共处理的分片数
        • failed:失败的分片数
        • successful:成功的分片数
  6. 导出所需pipeline
    1. 找出需要迁移的pipeline
      curl -X GET http://ip:port/_ingest/pipeline
                 
      ⚠️ 主要找和索引有直接关系的pipeline,比如带

      date_index_name

      set

      user_agent

      之类的pipeline
  7. 导出所需索引模板
    1. 找出需要迁移的索引模板,主要关注

      index_patterns

      里面能匹配到目标索引的模板
      curl -X GET http://ip:port/_template
                 
      ⚠️ 索引模板会根据

      order

      从小到大的顺序执行,所以多个

      index_pattern

      同时匹配一个索引时会出现彼此覆盖(或者叫继承)的问题,需要将相关的 所有 索引模板都备份出来
  8. 复制(同版本目标集群)/创建(不同版本目标集群)当前集群中所用自定义插件
    1. 插件目录为

      $ES_HOME/plugins

    2. 里面的目录理论上都是自定义插件,都得做迁移
  9. 准备snapshot迁移目标集群环境,操作同导出集群设置
    1. 注册共享存储
    2. 加载snapshot配置
    3. 把刚才备份出来的自定义插件放置在新的节点的

      $ES_HOME/plugins

      目录里
    4. (重启)使配置生效
    5. 创建snapshot仓库

      ⚠️ 这里的仓库路径需要和导出的目录一致,确保这个集群的所有节点都能访问到之前导出的数据文件

      ⚠️ 自定义插件描述文件中的ES版本号需与当前ES版本一致

      less $ES_HOME/plugins/${plugin-name}/plugin-descriptor.properties
       ...
       elasticsearch.version=$ES_VERSION
       ...
                 
  10. 导入刚才备份出来的

    pipeline

    index template

    1. pipeline
      curl -X PUT -H 'content-type: application/json;charset=UTF-8' -d '{
          "description" : "pipeline description",
          "processors" : [
              ...
          ]
      }' http://ip:port/_ingest/pipeline/${pipeline_name}
                 
    2. index template
      curl -X PUT -H 'content-type: application/json;charset=UTF-8' -d '{
          "order": 0,
          "index_patterns":[*],
          "settings": {},
          "mappings": {},
          "aliaes": {}
      }' http://ip:port/_template${template_name}
                 
  11. 恢复(导入)刚才创建的snapshot
    1. 运行命令
      curl -X POST -H 'content-type: application/json;charset=UTF-8' -d '{
          "indices": "index_1",
          "ignore_unavailable": true,
          "include_global_state": false,              
          "rename_pattern": "index_(.+)",
          "rename_replacement": "restored_index_$1",
          "include_aliases": false,
          "index_settings": {
              "index.number_of_replicas": 0
          },
          "ignore_index_settings": [
              "index.refresh_interval"
          ]
      }' /_snapshot/es_backup/my_backup_20200820/_restore
                 
      • indices:指定需要恢复的索引名称,支持通配符
      • ignore_unavailable:忽略不可用索引,如当前集群已存在索引之类的
      • include_global_state:包含全局设置,作用见导出
      • rename_pattern:如果需要对导入索引进行重命名,则通过这里的正则进行元素获取
      • rename_replacement:配合

        rename_pattern

        使用,将其匹配出来的元素组合成新的索引名称
      • include_aliases:是否包含索引别名
      • ignore_index_settings:忽略原有索引中的配置
      • index_settings:将里面的设置覆盖掉原有的索引设置

        ⚠️ 这里不支持修改分片数,如果要修改需要通过

        shrink

        接口
    2. 检查索引是否恢复完成
      1. 运行命令

继续阅读