Ingest pipeline 용례

sooknow·2026년 2월 20일

ingest pipeline 용례

1. 하나의 인덱스에서 필드 조작

기본적인 조작 순서는 아래 순서와 같다.

  • 만약 새로운 이름의 인덱스를 만드는 것이라면 2과정을 생략한다.
  • 하나의 필드를 쪼개기, 여러개의 필드 붙이기는 3번 과정만 다르고 나머지과정은 동일하다.
  1. 원본 인덱스(A)로부터 backup 인덱스를 만든다.
  2. A 인덱스 삭제
  3. ingest 파이프라인을 생성한다.
    3-1. 파이프라인이 잘 동작하는지 시뮬레이션
  4. backup인덱스 -> A 인덱스로 리인덱싱(dest에 pipeline 지정)한다.

하나의 필드 -> 여러개의 필드로 쪼개기

  1. 원본인덱스로부터 backup 인덱스를 만든다.
POST _reindex
{
    "source": {
        "index": "A"
    },
    "dest": {
        "index": "A-backup"
    }
}
  1. A 삭제
DELETE A
  1. ==🔴ingest pipeline을 생성한다.==
  • split 메서드를 사용해서 구분자 기준으로 나눈다. => 배열
  • painless script로 나누어진 결과배열의 값을 새로운 필드에 담는다.
  • 원본 필드를 삭제한다.
    아래 예시는 Game 필드는 “2025 Summer”로 값이 들어있는데 이걸 year, season으로 나눈다.
PUT _ingest/pipeline/split-field
{
    "description": "split one field into two",
    "processors": [
      {
        "split": {
            "field": "Games",
            "separator": " "
        }
      },
      {
        "script": {
            "lang": "painless",
            "source": """
                ctx.year = ctx.Games[0];
                ctx.season = ctx.Games[1];
                """
        }
      },
      {
        "remove": {
            "field": "Games",
            "ignore_missing": false
        }
      }
    ]
}

3-1. 파이프라인 시뮬레이션
리인덱싱으로 강을 건너기 전에, 연습데이터를 넣어서 파이프라인이 잘 만들어 졌는지 확인한다.

POST _ingest/pipeline/파이프라인이름/_simulate
{
    "docs": [
        {
            "_source": {
                "Games": "1998 Summer"
            }
        }
    ]
}
  1. 리인덱싱
    dest에 pipeline을 지정한다.
POST _reindex
{
    "source": {
        "index":"olympic-events"
    },
    "dest": {
        "index": "splitted-olympic",
        "pipeline": "split-field" // 파이프라인 이름
    }
}

여러개의 필드를 하나로 합치기

나머지 과정은 동일하므로 ingest pipeline 구현부만 살펴보자.

  • set 프로세서 : 두 필드를 합쳐 ==🟡새로운 필드를 추가==한다.
  • remove : 기존의 필드는 이제 제거한다.
PUT _ingest/pipeline/reunion-pipeline
{
    "description": "reunion year and season into games",
    "processors": [
      {
        "set": {
            "field": "games",
            "value": "{{year}}&{{season}}"
        }
      },
      {
        "remove": {
            "field": ["year", "season"],
            "ignore_missing": true
        }
      }
    ]
}

2. 두개의 인덱스를 합쳐 필드 늘리기

📌 policy는 B(2번 인덱스)의 입장에서 “어떤 데이터를 나누어 줄지”를 결정하고, pipeline은 A(1번 인덱스)입장에서 에게 새정보를 어떤 형태로 붙일지를 결정한다.
📌 policy는 반드시 Excute 해야한다!

  1. policy 정의하기
  2. policy 실행하기
  3. pipeline 정의하기
  4. 인덱스 생성하

1번 Index

ABC

2번 index

AD
  • 여기서 교집합 필드는 A
ABCD

1-1.policy 정의하기

PUT _enrich/policy/정책
{
  "match": {
    "indices": "인덱스B",
    "match_field": "교집합 필드", // 새로운 문서에서 읽을 값
    "enrich_fields":["채울 필드"]
  }
}

1-2.policy 실행

POST _enrich/policy/정책/_execute

2-1. 파이프라인 정의

PUT /_ingest/pipeline/파이프라인_이름
{
  "processors": [
    {
      "enrich": {
        "policy_name": "정책",
        "field": "교집합 필드", // 기존 문서에서 읽을 값
        "target_field": "새로운 필드" // 다리 필드와 채울 필드가 이 필드의 하위에 생성됩니다.
      }
    }
  ]
}

3.최종 인덱스 생성

POST _reindex
{
  "source": {
    "index": "A인덱스"
  },
  "dest": {
    "index": "C 인덱스",
    "pipeline": "enrich-pipeline"
  }
}

# 먄약 기존의 인덱스(A) 자체를 확장해야 한다면 
POST 인덱스A/_update_by_query?pipeline=enrich-pipeline&wait_for_completion=false

flatten field

enrich 프로세서는 기본적으로 ==중첩 구조==로 합친다. 그래서 추가되는 필드는 루트에 생성되는 것이 아니라 교집합 필드의 하위로 생성되게 된다. 다른 말로 하면, enrich 프로세서는 target_field로 지정된 필드 ==안에== 조회된 데이터를 집어넣는다.

따라서 문제에 따라 ==🟡모든 필드를 최상위 루트로 끌어올리는 평탄화 작업==이 필요하다.
평탄화를 위해서는script프로세서를 추가하여 하위 필드를 꺼내고 불필요해진 target_field를 삭제해야 한다.

1번 인덱스(A, B, C)에 2번 인덱스(A, D, E)의 데이터를 합치는 상황을 예를 들어 보자.

3번째 pipeline을 만드는 과정이 다음과 같이 변경된다.
1. enrich 메서드를 사용하되, target_field를 temp로 지정한다.

temp
ㄴ D
ㄴ E
  1. painless script를 사용해서 하위필드를 루트 필드로 만든다.
  2. 임시 temp 필드를 삭제한다.
PUT /_ingest/pipeline/flatten-enrich-pipeline
{
  "processors": [
    {
      "enrich": {
        "policy_name": "your_policy_name",
        "field": "A",
        "target_field": "tmp_enrich" 
      }
    },
    {
      "script": {
        "description": "하위 필드를 최상위로 복사",
        "lang": "painless",
        "source": """
          if (ctx.tmp_enrich != null) {
            ctx.D = ctx.tmp_enrich.D;
            ctx.E = ctx.tmp_enrich.E;
          }
        """
      }
    },
    {
      "remove": {
        "description": "임시 타겟 필드 삭제",
        "field": "tmp_enrich",
        "ignore_missing": true
      }
    }
  ]
}
profile
keep on pushing

0개의 댓글