기본적인 조작 순서는 아래 순서와 같다.
POST _reindex
{
"source": {
"index": "A"
},
"dest": {
"index": "A-backup"
}
}
DELETE A
split 메서드를 사용해서 구분자 기준으로 나눈다. => 배열painless script로 나누어진 결과배열의 값을 새로운 필드에 담는다.Game 필드는 “2025 Summer”로 값이 들어있는데 이걸 year, season으로 나눈다.PUT _ingest/pipeline/split-field
{
"description": "split one field into two",
"processors": [
{
"split": {
"field": "Games",
"separator": " "
}
},
{
"script": {
"lang": "painless",
"source": """
ctx.year = ctx.Games[0];
ctx.season = ctx.Games[1];
"""
}
},
{
"remove": {
"field": "Games",
"ignore_missing": false
}
}
]
}
3-1. 파이프라인 시뮬레이션
리인덱싱으로 강을 건너기 전에, 연습데이터를 넣어서 파이프라인이 잘 만들어 졌는지 확인한다.
POST _ingest/pipeline/파이프라인이름/_simulate
{
"docs": [
{
"_source": {
"Games": "1998 Summer"
}
}
]
}
POST _reindex
{
"source": {
"index":"olympic-events"
},
"dest": {
"index": "splitted-olympic",
"pipeline": "split-field" // 파이프라인 이름
}
}
나머지 과정은 동일하므로 ingest pipeline 구현부만 살펴보자.
set 프로세서 : 두 필드를 합쳐 ==🟡새로운 필드를 추가==한다.remove : 기존의 필드는 이제 제거한다. PUT _ingest/pipeline/reunion-pipeline
{
"description": "reunion year and season into games",
"processors": [
{
"set": {
"field": "games",
"value": "{{year}}&{{season}}"
}
},
{
"remove": {
"field": ["year", "season"],
"ignore_missing": true
}
}
]
}
📌 policy는 B(2번 인덱스)의 입장에서 “어떤 데이터를 나누어 줄지”를 결정하고, pipeline은 A(1번 인덱스)입장에서 에게 새정보를 어떤 형태로 붙일지를 결정한다.
📌 policy는 반드시 Excute 해야한다!
1번 Index
| A | B | C |
|---|
2번 index
| A | D |
|---|
| A | B | C | D |
|---|---|---|---|
1-1.policy 정의하기
PUT _enrich/policy/정책
{
"match": {
"indices": "인덱스B",
"match_field": "교집합 필드", // 새로운 문서에서 읽을 값
"enrich_fields":["채울 필드"]
}
}
1-2.policy 실행
POST _enrich/policy/정책/_execute
2-1. 파이프라인 정의
PUT /_ingest/pipeline/파이프라인_이름
{
"processors": [
{
"enrich": {
"policy_name": "정책",
"field": "교집합 필드", // 기존 문서에서 읽을 값
"target_field": "새로운 필드" // 다리 필드와 채울 필드가 이 필드의 하위에 생성됩니다.
}
}
]
}
3.최종 인덱스 생성
POST _reindex
{
"source": {
"index": "A인덱스"
},
"dest": {
"index": "C 인덱스",
"pipeline": "enrich-pipeline"
}
}
# 먄약 기존의 인덱스(A) 자체를 확장해야 한다면
POST 인덱스A/_update_by_query?pipeline=enrich-pipeline&wait_for_completion=false
enrich 프로세서는 기본적으로 ==중첩 구조==로 합친다. 그래서 추가되는 필드는 루트에 생성되는 것이 아니라 교집합 필드의 하위로 생성되게 된다. 다른 말로 하면, enrich 프로세서는 target_field로 지정된 필드 ==안에== 조회된 데이터를 집어넣는다.
따라서 문제에 따라 ==🟡모든 필드를 최상위 루트로 끌어올리는 평탄화 작업==이 필요하다.
평탄화를 위해서는script프로세서를 추가하여 하위 필드를 꺼내고 불필요해진 target_field를 삭제해야 한다.
1번 인덱스(A, B, C)에 2번 인덱스(A, D, E)의 데이터를 합치는 상황을 예를 들어 보자.
3번째 pipeline을 만드는 과정이 다음과 같이 변경된다.
1. enrich 메서드를 사용하되, target_field를 temp로 지정한다.
temp
ㄴ D
ㄴ E
PUT /_ingest/pipeline/flatten-enrich-pipeline
{
"processors": [
{
"enrich": {
"policy_name": "your_policy_name",
"field": "A",
"target_field": "tmp_enrich"
}
},
{
"script": {
"description": "하위 필드를 최상위로 복사",
"lang": "painless",
"source": """
if (ctx.tmp_enrich != null) {
ctx.D = ctx.tmp_enrich.D;
ctx.E = ctx.tmp_enrich.E;
}
"""
}
},
{
"remove": {
"description": "임시 타겟 필드 삭제",
"field": "tmp_enrich",
"ignore_missing": true
}
}
]
}