데이터 마이그레이션 작업을 하고 있다.
opesearch에서 노드당 허용 가능한 샤드 갯수가 정해져 있는데 일별 인덱스를 생성하는 구조로 인해 이 설정 값을 초과한 샤드 생성으로 opensearch 사용에 문제가 발생했다.
이 설정값은 명령어를 사용하거나 opensearch.yaml의 cluster.opensearchmax_shards_per_node 항목을 수정한 뒤 재시작하여 변경할 수 있다.
설정값 변경으로 문제는 해결하였으나 이 값을 무한정 늘릴 수는 없다.
현재 상태에서 개선 포인트를 잡아보자면,
일별 인덱스에 데이터가 약 1천개 내외로 적게 쌓이고 있다. 한 달치 데이터를 병합한 월별 인덱스 형태로 변경하면 1년 동안 생성되는 인덱스 갯수를 약 350개 줄일 수 있다.
그래서 일별 인덱스를 월별 인덱스로 옮기는 마이그레이션 작업을 진행하고 있다. 그 방법으로 reindex를 사용했다.
POST _reindex
{
"source": {
"index": "log_20250801" // 데이터의 출처 인덱스명
},
"dest": {
"index": "log_202508" // 데이터가 복사되어 저장되는 인덱스명
}
}
_reindex는 내부적으로 아래의 단계별 작업을 수행한다. 단일 스레드로 진행되기 때문에 병렬 작업에는 제한이 있다.(slices 설정 미사용 시)
Scroll로 Source Index 읽기
_source, _id, _routing 등의 기본 메타정보 포함참고
- opensearch 의
_search API는 기본적으로페이징된 데이터를 반환- 반환 데이터 크기가 너무 클 경우 시스템 상의 문제가 발생할 수 있기 때문
default = 10이며 size 옵션 조정을 통해 크기를 조절 할 수 있지만 그마저도 한계가 있음- 데이터 전체를 조회하려면 scroll 등을 사용해야 함
Reindex Task 생성 및 진행
_tasks API로 상태 추적 가능Bulk Insert 수행
dest.index로 bulk 요청을 통해 삽입_id를 유지하며 중복 시 덮어쓰기 됨(op_type: index 기본값)에러 및 충돌 처리
"conflicts": "proceed" 옵션 추가 시 실패 무시 가능reindex 내부 동작인 scroll과 bulk를 직접 구현할 수도 있다.
두 방식을 비교해보자면,
Reindex vs Scroll + Bulk
| Reindex 사용 | Scroll + Bulk 직접 구현 | |
|---|---|---|
| 개발 난이도 | 낮음 (API만 호출하면 됨) | 높음 (Scroll, Bulk, Error Handling 등) |
| 구현 속도 | 빠름 | 느림 |
| 커스터마이징 | 한계 있음 | 자유도 높음 |
| 운영 안정성 | 상대적으로 높음 | 직접 안정성 고려 필요 |
| 성능 최적화 여지 | scroll, batch size 설정 가능 | 전체 제어 가능 |
둘 다 간단하게 구현해서 테스트 해봤는데 reindex 방식이 더 빨랐다.
물론 병렬 처리 관련 설정을 더 정교하게 조절했다면 결과가 달라질 수 있겠지만 일정과 테스트 결과 등을 고려했을 때 reindex로도 충분하다고 판단했다.
source 인덱스에 어느 시점부터 keyword 타입의 매핑이 추가됐다.
source와 dest의 인덱스 매핑 정보가 다를 경우 실패할 수도 있다고해서 코드 작성 전에 테스트해봤는데 잘 동작했다.
✅ reindex가 실패하지 않은 이유?
충돌이 발생해서 실패하는 경우 ⇒ 필드명이 같은데 타입이 다를 때
ex1) userName(type=keyword) 인덱스를 userName(type=text) 인덱스에 reindex 할 때
ex2) dest 인덱스에 없는 필드가 source index에 있고, destination이 동적 매핑을 허용하지 않는 경우
Reindex 자체는 단일 스레드로 동작하기 때문에 병렬 처리가 불가능하다.
⇒ 많게는 1000개의 인덱스를 reindex 해야 해서 인덱스 단위로 병렬화 처리를 했다.
List<String> indexNameList = getIndexNameAll(); // 마이그레이션 대상 일별 인덱스명 전체 조회
Map<String, List<String>> indexByMonthMap = groupingDailyIndex(indexNameList); // 월별 그루핑
List<CompletableFuture<Void>> jobs = new ArrayList<>();
ForkJoinPool customThreadPool = new ForkJoinPool(5);
indexByMonthMap.forEach((month, dailyIndexList) ->
jobs.add(CompletableFuture.runAsync(() -> startMigration(month, dailyIndexList), customThreadPool)
));
CompletableFuture.allOf(jobs.toArray(new CompletableFuture[0])).join();
// 마이그레이션 완료 후 필요한 작업 진행
설정 변경을 통해 단위 데이터를 쪼개거나 동기 작업 완료 시점까지 기다리지 않을 수 있다. 그 외에도 데이터 처리에 필요한 자원의 유효 시간 설정을 조절해야 한다.
진행 중인 Reindex 작업을 조회하고 중단할 수도 있다. 리소스 사용량 과다 등의 이유로 중단이 필요하다면 사용할 수도 있지 않을까..
GET _tasks?actions=*reindex
DELETE _tasks/<task_id>
_reindex 결과를 분석하여 재처리가 필요하다.