Airflow XCom으로 데이터를 주고받는 5가지 방법

NewNewDaddy·4일 전

AIRFLOW

목록 보기
5/5

0. INTRO

모든 워크플로우 자동화 도구가 그렇듯, Apache Airflow에서도 테스크(Task) 간에 데이터를 주고받아야 하는 상황이 빈번하게 발생합니다. Airflow에서는 이를 위해 XCom(Cross-Communication)이라는 메커니즘을 제공합니다.

XCom이란?

XCom은 Airflow의 메타데이터베이스에 저장되는 작은 데이터 조각으로, 태스크 간에 데이터를 공유할 수 있게 해줍니다. 예를 들어, 한 태스크에서 생성한 파일 경로, 처리된 데이터의 요약 정보, 또는 다음 태스크에 필요한 설정값 등을 전달할 수 있습니다.

XCom의 크기 제한

XCom을 사용할 때 주의해야 할 중요한 점은 데이터 크기 제한입니다. 표준 XCom 백엔드를 사용할 경우, XCom의 크기 제한은 사용 중인 메타데이터 데이터베이스에 따라 결정됩니다:

데이터베이스크기 제한
PostgreSQL1 GB
SQLite2 GB
MySQL64 KB

보시다시피, 특히 MySQL의 경우 64KB라는 매우 작은 제한이 있어 큰 데이터를 전달하기에는 부적합합니다. 만약 XCom을 통해 전달하려는 데이터가 메타데이터 데이터베이스의 크기 제한을 초과할 가능성이 있다면, Custom XCom 저장소를 구축하여 활용하는 것을 고려해야 합니다.


1. **context 파라미터 활용하기

전통적인 PythonOperator 방식에서 호출되는 함수에 **context를 인자로 받아 ti(Task Instance) 객체에 접근하는 방법입니다.

def push_to_xcom(**context):
    message = "사과"
    ti = context["ti"]
    
    ti.xcom_push(
        key='message',
        value=message
    )
    return message # 'return_value'라는 키로도 자동 저장됨 

def pull_from_xcom(**context):
    ti = context["ti"]
    xcom_value = ti.xcom_pull(
        task_ids='py1',
        key='message'
    )
    print("py1에서 전달받은 결과 : ", xcom_value)
  • 장점: Airflow의 모든 컨텍스트 정보에 명시적으로 접근할 수 있습니다.
  • 특징: ti.xcom_push를 사용할 때 고유한 key를 지정할 수 있습니다.

💡 잠깐! **context에는 무엇이 들어있을까요?

많은 분들이 **context를 프린트했을 때 쏟아지는 방대한 양의 메타데이터에 당황하곤 합니다. 이 데이터들은 현재 실행 중인 DAG과 태스크의 '상태 정보'입니다.

주요 항목들을 표로 정리하면 다음과 같습니다:

키(Key)설명예시
ds태스크가 실행되는 논리적 날짜 (Date String)'2024-01-14'
ds_nodash대시(-)가 제거된 날짜 문자열'20240114'
ti / task_instance현재 실행 중인 태스크 인스턴스 객체<TaskInstance: ...>
dag현재 태스크가 속한 DAG 객체<DAG: ...>
logical_date태스크의 논리적 실행 시점 (Pendulum 객체)2024-01-14T00:00:00+00:00
run_id현재 DAG Run의 고유 식별자'scheduled__2024-01-14T00:00:00+00:00'

print(context) 실행 시 실제 출력 예시

{
    'ds': '2024-01-14',
    'ds_nodash': '20240114',
    'logical_date': DateTime(2024, 1, 14, 0, 0, 0, tzinfo=Timezone('UTC')),
    'dag': <DAG: xcom01_dag>,
    'ti': <TaskInstance: xcom01_dag.py1 [running]>,
    'run_id': 'scheduled__2024-01-14T00:00:00+00:00',
    'params': {},
    ... (중략) ...
}

따라서 **context를 사용한다는 것은, "Airflow가 태스크를 실행하면서 들고 있는 모든 보따리(메타데이터)를 다 넘겨줘!"라고 요청하는 것과 같습니다.


2. get_current_context() 사용하기

함수의 인자로 **context를 넘기지 않더라도, get_current_context()를 통해 현재 실행 중인 태스크의 컨텍스트를 동적으로 가져올 수 있습니다.

from airflow.sdk import get_current_context

def push_to_xcom():
    message = "사과"
    context = get_current_context()
    ti = context['ti']
    
    ti.xcom_push(key='message', value=message)
    return message

def pull_from_xcom():
    context = get_current_context()
    ti = context['ti']
    
    xcom_value = ti.xcom_pull(task_ids='py1', key='message')
    print("py1에서 전달받은 결과 : ", xcom_value)
  • 장점: 함수의 시그니처를 깔끔하게 유지할 수 있습니다.

3. TaskFlow API에서 return 사용하기 (현대적인 방식)

Airflow 2.0 이상에서 권장되는 TaskFlow API를 사용하면, 복잡한 xcom_push/pull 코드 없이 함수의 return 값만으로 데이터를 전달할 수 있습니다.

@task(task_id="first")
def first_func(args):
    join_list = ' '.join(args)
    return join_list # 자동으로 XCom에 push됨

@task(task_id='second')
def second_func(message):
    # 인자로 넘겨받은 message는 이전 태스크의 return 값 (XCom pull)
    changed_list = '!' + message + '!'
    return changed_list

# DAG 내에서 호출
message = first_func(['FLOWER', 'AIRFLOW'])
second_func(message)
  • 장점: Python 함수를 호출하듯 직관적으로 태스크 간 데이터 흐름을 정의할 수 있습니다.

4. TaskFlow API 옵션 활용 (multiple_outputs)

태스크가 여러 개의 결과값을 딕셔너리 형태로 반환할 때, multiple_outputs=True 옵션을 주면 각각의 키 값이 개별 XCom 항목으로 저장됩니다.

@task(task_id="first", do_xcom_push=True, multiple_outputs=True)
def first_func(args):
    join_list = ' '.join(args)
    return {"key1": join_list} # 'key1'이라는 이름으로 XCom에 저장됨
  • 특징: 반환된 딕셔너리의 키를 통해 특정 데이터만 Pull 할 수 있어 관리가 용이합니다.

5. 다른 Operator에서 Jinja 템플릿 활용하기

Python이 아닌 다른 오퍼레이터(예: BashOperator)에서 XCom 데이터를 사용하고 싶을 때는 Jinja 템플릿 형식을 사용합니다.

t1 = PythonOperator(
    task_id="make_dirname",
    python_callable=make_dirname, # 내부에서 ti.xcom_push(key="dir_path", ...) 수행
)

t2 = BashOperator(
    task_id="make_dir",
    bash_command="mkdir -p {{ ti.xcom_pull(task_ids='make_dirname', key='dir_path') }}"
)
  • 장점: 서로 다른 언어나 환경을 사용하는 오퍼레이터 간의 협업이 가능해집니다.

6. 요약

방식특징추천 상황
`context`**명시적 객체 전달복잡한 컨텍스트 제어가 필요할 때
get_current_context깔끔한 함수 시그니처함수 인자를 단순하게 유지하고 싶을 때
TaskFlow return가장 간결하고 직관적일반적인 Python 기반 태스크 연결 시
multiple_outputs딕셔너리 자동 분리여러 결과값을 개별적으로 전달할 때
Jinja 템플릿오퍼레이터 간 통합Bash, SQL 등 다른 오퍼레이터로 전달할 때

7. 참고 문서

profile
데이터 엔지니어의 작업공간 / #PYTHON #CLOUD #SPARK #AWS #GCP #NCLOUD

0개의 댓글