(elasticsearch) migrating data

duo2208·2024년 12월 23일

Data

목록 보기
2/2
post-thumbnail

1. 개요


네트워크 사용과 프로그램 반입이 자유롭지 않은 환경에서 데이터를 이관해야 할 일이 생겼다. 단기로 급하게 지원을 나온 상태라 내부 히스토리도 잘 모르는 상황이었다. 나중에 운영으로 데이터를 이관해야할 땐 Apache nifi 반입 신청을 하겠지만 지금 당장은 쓸 수없는 것 처럼 보였기에, ELK Stack을 쓸 방법을 생각해 봤다.

1. Elasticsaerch data 디렉토리를 통째로 복사
2. Logstash를 이용한 복사
3. Elasticsaerch Snapshot 일 이용한 복원 (restore)


1-1. 고려 사항

  • 전환 시에 시스템 다운 타임이 발생하는가?
  • 네트워크가 사용 가능한 환경인가?
  • 동일한 버전인가 ?
ElasticsearchKibanaLogstash
6.7.x6.7.x5.6.x-6.8.x
7.10.x7.10.x6.8.x-7.17.x

🔗 Elastic 공식 : 지원 매트릭스


1-2. 테스트 환경

깨끗한 elasticsearch cluster 1대에서 1개의 노드로 테스트 진행

$ curl -XDELETE localhost:${elasticsearchPort}/_all

2. Elasticsaerch data 디렉토리를 통째로 복사


🧩 사용 → input ES-Cluster / ouput ES-Cluster

1. 가장 간단한 방법:
Elasticsearch 데이터 디렉토리를 통째로 복사하여 새로운 클러스터로 데이터를 옮길 수 있음.
2. Index ID 값 보존:
데이터를 통째로 복사하기 때문에 Index ID 값이 기존과 동일하게 유지됨.
3. 시간 소요:
elasticsearch-x.x.x/data 디렉토리를 묶고 푸는 과정에서 처리 속도가 느려질 수 있음.
4. 버전 호환성 문제:
Elasticsearch 버전 간 차이로 인해 복사된 데이터가 제대로 작동하지 않을 가능성이 있음.
5. 다운타임 발생:
Elasticsearch 인스턴스를 중지한 상태에서 복사를 진행해야 하므로 클러스터에 다운타임이 발생함.
6. 분산 환경에서의 제약:
다수의 노드로 구성된 분산 환경에서는 데이터 일관성 및 복잡성 문제로 인해 이 방법을 사용하기 어려움.
7. 안전한 복원 보장 어려움:
복사한 데이터가 새로운 클러스터에서 안정적으로 작동한다는 보장이 없음.

elastic 공식에서 권장하지 않는 방법이다. 가볍게 사용할 테스트 서버를 만들 때, 그리고 이관 될 elastic과 버전이 같을만 써본 방법이다.


2-1. 방법


  1. 복사 대상의 elasticsearch-x.x.x/data 폴더를 tar.gz로 묶어서 가져오기.
    • remote 서버에서 가져올 시, 🧩filezilla 혹은 🧩scp 명령어 사용.
    • 네트워크 사용이 불가능한 환경이라면, 물리적 미디어에 복사하여 가져와야 함.
  2. 현재 서버의 elasticsearch-x.x.x/data 폴더에 부은 후 재시작.

2-2. 테스트


  • 동일 버전 (ELS 6.7.0 → ELS 6.7.0)
    • remote → local

      els 6.7.0실행 문제없음
      kibana 6.7.0실행 문제없음
  • 상위 버전 (ELS 6.7.0 → ELS 7.10.2)
    • remote → local

      els 7.10.2실행 문제 없음
      kibana 6.7.0버전 호환 ❌
      kibana 7.10.2매핑 이슈 🔥

▼ 매핑 이슈

최상위 수준 또는 일부 하위 필드에서 dynamic: strict 를 사용하면 mappings 정의에 맞지 않는 문서를 거부하며 사전 정의된 매핑 준수를 강제하는 이슈가 발생합니다. 해결 방법은 하기와 같습니다.

kibana_* 인덱스의 매핑을 변경하기 위해 하기의 단계를 수행합니다.

  1. kibana 실행 중지.
  2. $ curl -XDELETE localhost:${elasticsearchPort}/.kibana* 명령어 수행.
  3. kibana 재실행 후 대쉬보드 정상 동작 확인.

🔗 Discuss Kibana: mapping set to strict, ...

🔗 Elastic 공식 : 너무 많은 필드 수! Elasticsearch에서 매핑 폭발을 방지하는 3가지 방법


3. Logstash 를 이용한 복사


🧩 사용 → input ES-Cluster / ouput ES-Cluster / Logstash

a. 네트워크 환경 : input / output Plugin
b. 네트워크 차단된 환경 : Elasticsearch 데이터를 JSON 형식으로 덤프

1. CPU 성능 및 구성 설정 의존:
처리 속도가 CPU 성능과 Logstash 설정(configuration)에 크게 좌우됨.
2. Index ID 변경:
_bulk API를 사용하기 때문에 새로운 Index ID 값이 생성됨.
3. 버전 호환성 문제:
Elasticsearch 버전 간 호환성 문제 발생 가능.
4. 다운타임 최소화:
Elasticsearch를 가동한 상태에서 데이터 전송이 가능하므로 다운타임이 최소화됨.
5. 클러스터링 설정:
복사 대상 클러스터가 기존과 비슷한 수준의 클러스터링 설정을 갖추고 있어야 함.


3-1. 방법


3-1-1. 네트워크 환경 : input / output Plugin (feat. reIndexAPI)

  1. shard 및 replica 설정
    shard 및 replica 를 설정하지 않고 이관하면 ES6 기본 설정인 shard 5개 replica 1개로 색인합니다. 프라이머리 샤드 수는 인덱스를 처음 생성할 때 지정하며, 인덱스를 재색인 하지 않는 이상 바꿀 수 없습니다. 따라서 색인 전에 설정해주세요.
PUT _template/shard1replica0
{
    "order" : 0,
    "index_patterns" : [
       ".openquery*",
       "tr*"
    ],
    "settings" : {
      "index" : {
        "number_of_shards" : "1",
        "number_of_replicas" : "0"
      }
    },
    "mappings" : { },
    "aliases" : { }
}
  1. logstash-x.x.x/config/migration.conf 파일 생성
input {
  elasticsearch {
    hosts => ["${input_ES_path}"]
    index => "*,-.kibana_1,-.task"          # 가져올 인덱스 명. 시스템 인덱스를 제외한 모든 일반 인덱스.
    scroll => "10m"       # 데이터를 검색할 때 사용할 스크롤 컨텍스트의 유지 시간.
    size => 1000          # 한 번에 가져올 문서 수.
    codec => "json"       # Elasticsearch 데이터를 JSON 형식으로 처리.
    docinfo => true
  }
}

output {
  elasticsearch {
    hosts => ["${output_ES_path}"]
    index => "%{[@metadata][_index]}"  # 문서를 저장할 인덱스, 메타데이터를 참조하여 원본 인덱스를 유지.
  }
  stdout { codec => rubydebug { metadata => true } } # 디버깅 용도.
  • index => "*,-.kibana_1,-.task"
    : 시스템 인덱스를 제외한 모든 일반 인덱스. 설정.
  • 배치 처리
    size : 기본적으로 1000 으로 시작하고, 시스템 성능을 모니터링하며 사이즈를 늘려주세요.
  1. logstash 실행
$ logstash-x.x.x/bin>logstash -f ../config/migration.conf --config.reload.automatic
  • --config.reload.automatic 플래그
    : logstash를 재 시작할 필요 없이 수정된 conf파일을 reload 하여 적용해주는 옵션
  1. kibana 실행 후 확인

하기의 오류를 접할 시,

[warning][migrations] Another Kibana instance appears to be migrating the index. Waiting for that migration to complete. If no other Kibana instance is attempting migrations, you can get past this message by deleting index .kibana_1 and restarting Kibana.
  • kibana 실행 중지.
  • $ curl -XDELETE localhost:${port}/.kibana*` 명령어 수행.
  • kibana 재실행 후 대쉬보드 정상 동작 확인.

🔥 Error 및 이슈

▼ logsatash 버전 호환성 이슈

 Encountered a retryable error (will retry with exponential backoff)
 {:code=>400, url=>"http://[IP]:[port]/_template/logstash/_bulk"}

elasticsearch ouput plugin 사용 시, es 쿼리가 정상적이지 않아 요청이 실패 되면서 logstash가 지속적으로 재시도를 하게 되는 현상. logstash 와 es 버전이 안맞을 때 발생.


▼ bulk insert 시 컨텐츠 길이 이슈

 Encountered a retryable error (will retry with exponential backoff)
 {:code=>400, :url=>"http://[IP:port]/_bulk", :content_length=>37171}

/config/elasticsearch.yml에 http:.max_content_length 설정이 있는데 설정된 용량보다 초과된 메시지를 보낼 때 나타난다.

메시지 용량 사이즈를 늘려줘야한다. default 값은 100mb이며, 설정 가능한 최대 값은 2gb이다. 2gb 이상의 값을 설정하면 하기와 같은 로그가 찍히면서 변경 값이 적용되지 않는다.

[WARN][http.netty               ] [Magdalena] maxContentLength[2.9gb] set to high value, resetting it to [100mb]

실행 중 ES가 다운되는 이슈

[ERROR][logstash.outputs.elasticsearch][main][85b115c56ef75d7e7a4b5e780ec36b6cd437e86ab6333be9c55e5590f8dd33d4] Attempted to send a bulk request to elasticsearch' but Elasticsearch appears to be unreachable or down! {:error_message=>"Elasticsearch Unreachable: [http://localhost:7550/][Manticore::SocketException] Connection reset", :class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError", :will_retry_in_seconds=>2}

.kibana* 인덱스들을 migration 하는 과정에서 버전성이 호환되지 않아 꺼지는 것으로 파악됩니다. logstash-x.x.x/config/migration.conf 파일의 input Plugin 설정에서 .kibana* 관련 인덱스 exclude 를 빼먹지 않았는지 확인해 주세요.


실행 중 네트워크 연결이 다운되는 경우 : 🧩 Redis 활용

 Error: Connection refused - Failed to open TCP connection to 192.168.0.4:xxxx (Connection refused - connect(2) for "192.168.0.4" port xxxx)

네트워크 꺼짐으로 인해 logstash 혹은 elasticsearch에 문제가 생길 경우 데이터 loss 또는 데이터 duplicate 가 발생할 수 있습니다.

임시 저장소인 🧩Redis 를 활용하여 데이터 손실이 발생하지 않도록 전송할 데이터를 일시적으로 혹은 일정 기간 동안 보관했다가 logstash와 elasticsearch가 정상화 되었을 때 다시 전달할 수 있도록 만들 수 있습니다.

🔗 기술블로그 : logstash tcp로 데이터 전송 시 손실없는 데이터 전달 흐름 설계


3-1-2. 네트워크 차단된 환경 : Elasticsearch 데이터를 JSON 형식으로 덤프 (feat. bulkAPI)

  1. shard 및 replica 설정
  2. 메모리 최적화 설정
  3. 파이프라인 설정
  4. logstash-x.x.x/config/migration_out.conf 파일 생성
input {
  elasticsearch {
    hosts => ["${input_ES_path}"]
    # 시스템 인덱스를 제외한 모든 일반 인덱스
    index => "*,-.kibana_1,-.tasks"
    scroll => "10m"
    size => 1000
    codec => "json"
    docinfo => true
  }
}

filter {
  mutate {
    # 사용하지 않는 불필요한 필드 제거
    remove_field => ["@version", "@timestamp", "host", "path"]
    # 동적 인덱스명 사용을 위한 index 필드 추가
    add_field =>  { "index" => "%{[@metadata][_index]}" }
  }
}

output {
  file {
    codec => "json_lines"
    path => "${yout_path/elasticsearch_output.json}"      # json 파일 저장 경로
  }
  stdout { codec => rubydebug { metadata => true } }
}
  1. logstash 실행 : elasticsearch cluster data → json file
$ logstash-x.x.x/bin>logstash -f ../config/migration_out.conf --config.reload.automatic --log.level debug
  1. logstash-x.x.x/config/migration_in.conf 파일 생성
input {
  file {
    codec => "json"
    path => "${yout_path/elasticsearch_output.json}"
    start_position => "beginning"
  }
}

filter {
  # _source 안의 id 값을 metadata의 _id로 설정
  # _source 안의 index 값을 metadata의 _index로 추가
  # 불필요한 필드 제거
  mutate {
    copy => {"id" => "[@metadata][_id]"}
    copy => {"index" => "[@metadata][_index]"}
    remove_field => ["@version", "@timestamp", "host", "path", "index"]
  }
}

output {
  elasticsearch {
    hosts => ["${output_ES_path}"]
    index => "%{[@metadata][_index]}"      # metadata 를 통해 index명 동적 설정
	  document_id => "%{[@metadata][_id]}"   # metadata 의 _id 값을 document_id로 사용
  }
  stdout { codec => rubydebug { metadata => true } }
}
  • input

    • codec : 입력받은 데이터가 message 필드 안에 json 스트링으로 보여집니다. key-value 로 보기 위해서는 input 형식이 json 타입이라는 것을 명시해 주어야 합니다.
  • filter

    • ruby : bulkAPI 를 통해 이관되므로, 고유 id 값이 새로 생성됩니다. 고유 id 값을 원본과 동일하게 만들기위해 _source 내부의 id 값을 사용하여 고유 _id 값으로 변환하는 로직입니다.
  1. logstash 실행 : json file → elasticsearch
$ logstash-x.x.x/bin>logstash -f ../config/migration_in.conf --config.reload.automatic --log.level debug
  1. kibana 실행 후 데이터 확인

🔥이슈
메모리 부족으로 인한 데이터 누락
메모리가 부족하여 logstash 실행 중 elasticsearch가 다운 (7) 되며, 일부 데이터 개수가 맞지 않거나 원하는 결과가 추출되지 않는 상황이 발생합니다.

# elasticsearch
[INFO ][o.e.i.IndexingMemoryController] [yongin-node-1] now throttling indexing for shard [[.openquery-monitor-indices-20230626][0]]: segment writing can't keep up
[INFO ][o.e.i.IndexingMemoryController] [yongin-node-1] stop throttling indexing for shard [[.openquery-monitor-indices-20230629][0]]
[INFO ][o.e.m.j.JvmGcMonitorService] [yongin-node-1] [gc][3918] overhead, spent [346ms] collecting in the last [1s]
  • 쿼리 날리기
    elasticsearch에서 디스크 사용량이 90% 이상 차게 되면 키바나에서 데이터를 read-only 모드로 바꿀 수 있습니다. 모든 인덱스를 다시 쓰기 가능 모드로 변경하기 위해 하기의 명령어를 이용합니다.
PUT /_settings
{
  "index.blocks.read_only_allow_delete": null
}
  • configuration 조정하기
    시스템 성능에 맞춰 메모리 사용량을 높여주세요.
    ⚙️힙 사이즈는 사용가능한 RAM을 기반으로 해야합니다.
    ⚙️ Xms, Xmx 사이즈는 같아야합니다.
    ⚙️ 시스템 메모리의 50% 은 루씬 파일 캐시를 위해 남겨둬야 합니다. 만약 로컬 PC의 RAM이 16g 라면, Xms 와 Xmx 총 합을 절반인 8g로 맞춥니다.
# elasticsearch-x.x.x/config/jvm.options
-Xms4g
-Xmx4g

👻 테스트

  • 동일 버전 (remote ELS 6.7.0 → local ELS 6.7.0 / logstash 6.7.0)
    • 3개 인덱스 이관 확인. ( json file 1.6MB )
    • 데이터 이관 실패. ( json file 6.6GB )
      • 일부 인덱스 누락.kibana_ 인덱스 제외하고 원본 총 114개 / 복사본 총 100개
        • bulk API가 100개의 인덱스까지만 지원하진 않는지? 공식문서에 따르면, limit for bulk index 가 100개입니다. 100개 이상을 이관 할 때는 하기의 방법을 사용하면 되겠습니다.
          1. Logstash의 Splitting: 데이터를 청크로 분할하여 용량을 줄이고 각각의 청크를 별도의 Bulk API 요청으로 전송.

          2. 일시적인 인덱싱: 먼저 100개의 문서를 인덱싱하고, 그 다음에 나머지 15개의 문서를 따로 처리.

            🔗 [ Elastic 공식 | Limits ]

      • 건 수 많은 인덱스들 각각 14,000건 정도까지 이관 되다가 정지.
        • 로컬 자원 한계로 파악됨. (힙사이즈 8gb)
  • 동일 버전 (remote ELS 6.7.0 → remote ELS 6.7.0 / logstash 6.7.0)
    • 3개 인덱스 이관 확인. ( json file 1.6MB )
    • 데이터 이관 확인. ( json file 6.6GB )

4. Elasticsearch Snapshot을 이용한 복원 (restore)


🧩 사용 → input ES-Cluster / ouput ES-Cluster

1. 공식 권장 방법:
Elasticsearch에서 권장하는 안전한 데이터 복사 및 복원 방법.
2. Index ID 변경:
restore를 사용하는 복원 과정에서 새로운 Index ID 값이 생성됨.
3. 데이터 무결성 보장:
Snapshot은 데이터를 백업하고 복원하는 과정을 철저히 관리하여 데이터 무결성을 보장함.
4. 다운타임 최소화:
Elasticsearch를 가동한 상태에서 Snapshot 및 복원을 진행할 수 있어 다운타임을 최소화할 수 있음.
5. 버전 호환성 확인 필요:
Snapshot 및 복원 과정에서 Elasticsearch 버전 간 호환성을 사전에 확인해야 함.
6. 대규모 데이터 지원:
대규모 데이터 마이그레이션에 적합하며, 분산 환경에서도 안정적으로 작동.

4-1. 고려사항

  • snapshot 의 path를 외부 서버로 둘 수 있는가?
    : 기존 존재하는 인덱스에 복구 시에는 같은 수에 shards를 가지고 있어야 하며, 반드시 해당 인덱스를 closed 하고 난 다음 진행해야 함.
  • local에 snapshot을 하고, 해당 결과물 파일로 백업(외부서버) 및 복구가 가능한가?
    : 가능함. path.repo에 등록한 디렉토리 통으로 tar 압축.

🔗 기술블로그 : elasticsearch-backup-and-restore

🔗 기술블로그 : Elasticsearch Backup/Restore
🔗 기술블로그 : elastic 백업 (스냅샷)

4-2. 방법

a. 복사 대상 ES Cluster 작업 : Backup

  1. snapshot 저장 경로 지정
    path.repo 를 추가해주세요. 경로를 설정한 이후에 반드시 elasticsearch 를 재시작하세요.
# elasticsearch-x.x.x/config/elasticsearch.yml
path.repo: ["/home/yongin/appl/elastic/elasticsearch-6.7.0/data_backup/snapshot"]
  1. snapshot 리포지토리 생성 후 확인
PUT _snapshot/2024_05_31
{
  "type": "fs",
  "settings": {
    "location": "C:\\elasticsearch-6.7.0\\data_backup\\snapshot"
  }
}
GET_snapshot
  1. 백업 시작. 비동기 스냅샷 생성
PUT _snapshot/2024_05_31/snapshot_1
  1. 진행 상황 확인
GET /_snapshot/2024_05_31/snapshot_1/_status
GET _snapshot/2024_05_31/_all?pretty
  1. 레파지토리 생성된 경로에서 파일을 tar.gz 로 압축

b. 전달될 ES Cluster 작업 : Restore

  1. snapshot 저장 경로 지정
# elasticsearch-x.x.x/config/elasticsearch.yml
path.repo: C:\\elasticsearch-6.7.0\\data_backup\\snapshot
  1. snapshot 리포지토리 생성 후 확인
PUT _snapshot/2024_05_31
{
  "type": "fs",
  "settings": {
    "location": "C:\\elasticsearch-6.7.0\\data_backup\\snapshot"
  }
}
  1. 저장했던 tar.gz 파일을 snapshot 저장 경로에 옮기고 풀기

  2. 현재 노드를 close 하기

curl -X POST "localhost:xxxx/_all/_close"
  1. restore 하기
POST /_snapshot/2024_05_31/snapshot_1/_restore
{
	"indices": "-.kibana*",
  "index_settings":{
              "index.number_of_replicas":0
      },
      "ignore_index_settings":[
              "index.refresh_interval"
    ]
}
  1. ** 복원된 indices 조회
GET _cat/indices?pretty&s=i

4-3. 테스트

  • 동일 버전 (remote ELS 6.7.0 → local ELS 6.7.0 )
    : 대용량 데이터 이관 완료 및 확인.
  • 상위 버전 (remote ELS 6.7.0 → local ELS 7.10.2 )
    : 대용량 데이터 이관 완료 및 확인.
    • 다만, 도트(.)로 시작하는 인덱스 이름이 숨겨진 인덱스와 시스템 인덱스에만 예약되도록 변경 됨
    • 날짜 포맷이 변경 됨


📌 그 외 방법 찾아보기

🔗 Elastic 공식: Migrationg data
🔗 기술블로그 : Elastic Cloud의 클러스트럴 스냅샷과 리스토어 기능을 이용해서 마이그레이션 하는 방법
🔗 기술블로그 : Elasticsearch 3TB의 인덱스를 reindex 하는 방법
🔗 기술블로그 : 배달의민족 광고데이터 이관기
🔗 기술블로그 : Elasticsearch에서 데이터를 백업하고 복원하는 방안

0개의 댓글