[Opensearch]Reindex

Choise.o·2025년 8월 10일
0

데이터 마이그레이션 작업을 하고 있다.

opesearch에서 노드당 허용 가능한 샤드 갯수가 정해져 있는데 일별 인덱스를 생성하는 구조로 인해 이 설정 값을 초과한 샤드 생성으로 opensearch 사용에 문제가 발생했다.

이 설정값은 명령어를 사용하거나 opensearch.yamlcluster.opensearchmax_shards_per_node 항목을 수정한 뒤 재시작하여 변경할 수 있다.

설정값 변경으로 문제는 해결하였으나 이 값을 무한정 늘릴 수는 없다.

현재 상태에서 개선 포인트를 잡아보자면,
일별 인덱스에 데이터가 약 1천개 내외로 적게 쌓이고 있다. 한 달치 데이터를 병합한 월별 인덱스 형태로 변경하면 1년 동안 생성되는 인덱스 갯수를 약 350개 줄일 수 있다.

그래서 일별 인덱스를 월별 인덱스로 옮기는 마이그레이션 작업을 진행하고 있다. 그 방법으로 reindex를 사용했다.


1. Reindex란

  • 한 인덱스의 데이터를 다른 인덱스로 복사하는 기능
  • 기존 document를 그대로 유지하면서 인덱스 매핑 변경, 데이터 필터링, 정제 등의 작업 가능
  • 내부적으로 Scroll + Bulk를 활용하여 처리
POST _reindex
{
  "source": {
    "index": "log_20250801"	// 데이터의 출처 인덱스명
  },
  "dest": {
    "index": "log_202508"	// 데이터가 복사되어 저장되는 인덱스명
  }
}

2. 내부 동작 방식

_reindex는 내부적으로 아래의 단계별 작업을 수행한다. 단일 스레드로 진행되기 때문에 병렬 작업에는 제한이 있다.(slices 설정 미사용 시)

  1. Scroll로 Source Index 읽기

    • Scroll을 이용해 일정 크기(size)의 document batch를 반복적으로 조회
    • _source, _id, _routing 등의 기본 메타정보 포함

참고

  • opensearch 의 _search API는 기본적으로 페이징된 데이터를 반환
  • 반환 데이터 크기가 너무 클 경우 시스템 상의 문제가 발생할 수 있기 때문
  • default = 10 이며 size 옵션 조정을 통해 크기를 조절 할 수 있지만 그마저도 한계가 있음
  • 데이터 전체를 조회하려면 scroll 등을 사용해야 함
  1. Reindex Task 생성 및 진행

    • OpenSearch는 이 작업을 내부적으로 task queue에 등록해 관리
    • _tasks API로 상태 추적 가능
    • 작업 상태를 확인하거나 중단 가능
  2. Bulk Insert 수행

    • 읽어온 document들을 dest.indexbulk 요청을 통해 삽입
    • 기존 _id를 유지하며 중복 시 덮어쓰기 됨(op_type: index 기본값)
  3. 에러 및 충돌 처리

    • 매핑 오류, 스키마 불일치 등은 개별 document 단위로 실패 처리
    • "conflicts": "proceed" 옵션 추가 시 실패 무시 가능

2-1) Reindex 외의 방식

reindex 내부 동작인 scroll과 bulk를 직접 구현할 수도 있다.
두 방식을 비교해보자면,

Reindex vs Scroll + Bulk

Reindex 사용Scroll + Bulk 직접 구현
개발 난이도낮음 (API만 호출하면 됨)높음 (Scroll, Bulk, Error Handling 등)
구현 속도빠름느림
커스터마이징한계 있음자유도 높음
운영 안정성상대적으로 높음직접 안정성 고려 필요
성능 최적화 여지scroll, batch size 설정 가능전체 제어 가능

둘 다 간단하게 구현해서 테스트 해봤는데 reindex 방식이 더 빨랐다.
물론 병렬 처리 관련 설정을 더 정교하게 조절했다면 결과가 달라질 수 있겠지만 일정과 테스트 결과 등을 고려했을 때 reindex로도 충분하다고 판단했다.


2-2) 매핑이 다른 경우

source 인덱스에 어느 시점부터 keyword 타입의 매핑이 추가됐다.
source와 dest의 인덱스 매핑 정보가 다를 경우 실패할 수도 있다고해서 코드 작성 전에 테스트해봤는데 잘 동작했다.

✅ reindex가 실패하지 않은 이유?

  • reindex는 bulk insert를 사용
  • OpenSearch는 insert할 때 필드가 없으면 단순히 없는 상태로 저장 (에러 없음)
  • 만약 dest 인덱스에 매핑 정보가 없는 데이터가 source에 있다면 해당 데이터의 타입을 자동으로 추론한 후 매핑을 추가함(Dynamic Mapping, 단 옵션에 따라 다르게 동작)

충돌이 발생해서 실패하는 경우 ⇒ 필드명이 같은데 타입이 다를 때
ex1) userName(type=keyword) 인덱스를 userName(type=text) 인덱스에 reindex 할 때
ex2) dest 인덱스에 없는 필드가 source index에 있고, destination이 동적 매핑을 허용하지 않는 경우


3. 대량 처리 성능 최적화

3-1) 병렬 처리

Reindex 자체는 단일 스레드로 동작하기 때문에 병렬 처리가 불가능하다.
⇒ 많게는 1000개의 인덱스를 reindex 해야 해서 인덱스 단위로 병렬화 처리를 했다.


List<String> indexNameList = getIndexNameAll(); // 마이그레이션 대상 일별 인덱스명 전체 조회

Map<String, List<String>> indexByMonthMap = groupingDailyIndex(indexNameList); // 월별 그루핑

List<CompletableFuture<Void>> jobs = new ArrayList<>();
ForkJoinPool customThreadPool = new ForkJoinPool(5);

indexByMonthMap.forEach((month, dailyIndexList) -> 
	jobs.add(CompletableFuture.runAsync(() -> startMigration(month, dailyIndexList), customThreadPool)
));

CompletableFuture.allOf(jobs.toArray(new CompletableFuture[0])).join();
// 마이그레이션 완료 후 필요한 작업 진행

3-2) 설정 변경

설정 변경을 통해 단위 데이터를 쪼개거나 동기 작업 완료 시점까지 기다리지 않을 수 있다. 그 외에도 데이터 처리에 필요한 자원의 유효 시간 설정을 조절해야 한다.

  • Scroll : scroll 컨텍스트를 유지하는 시간. 너무 짧으면 세션 만료, 너무 길면 리소스가 낭비되기 때문에 조절 필요
  • Batch size : 한 번의 scroll 요청에서 가져올 doc 개수. 너무 작으면 I/O 부하, 너무 크면 메모리를 초과하기 때문에 조절 필요
  • slices : 소스 인덱스를 여러 slice로 나누고, 각 slice를 독립적인 scroll 작업으로 병렬 처리. 기본 값은 샤드 수에 맞춰 자동으로 결정된다.
  • wait_for_completion : reindex 요청이 완료될 때까지 API 응답을 기다릴지 여부. 기본값은 true.

4. 기타

1) Task 확인 및 취소

진행 중인 Reindex 작업을 조회하고 중단할 수도 있다. 리소스 사용량 과다 등의 이유로 중단이 필요하다면 사용할 수도 있지 않을까..

GET _tasks?actions=*reindex
DELETE _tasks/<task_id>

2) 주의사항

  • 원본 인덱스는 변경되지 않음 : 실수로 복사 누락된 데이터는 원본에 남는다. 마이그레이션이 끝났다면 원본 인덱스는 직접 삭제해야 한다.
  • 성능 영향 : 대규모 데이터 복사 시 클러스터 리소스 사용량이 급증할 수 있다.
  • 실패 document : 일부 doc 복사 실패 시 _reindex 결과를 분석하여 재처리가 필요하다.

0개의 댓글