Airflow의 XCom이란? 간단한 사용법

GarionNachal·2025년 4월 27일
0

airflow

목록 보기
7/8
post-thumbnail

1. XCom 개요

Apache Airflow에서 XCom은 Cross Communication의 약자로, 하나의 DAG(Directed Acyclic Graph) 내에서 서로 다른 태스크(Task) 간에 데이터를 주고받을 수 있게 해주는 기능입니다. 이는 마치 작업자들이 메모를 주고받는 것처럼 태스크 간 소통을 가능하게 합니다.

XCom은 다음과 같은 상황에서 특히 유용합니다:

  • 첫 번째 태스크의 결과를 두 번째 태스크의 입력으로 사용해야 할 때
  • 여러 태스크가 각기 다른 Worker에서 실행되는 환경에서 데이터를 공유해야 할 때
  • 워크플로우의 중간 결과를 저장하고 나중에 참조해야 할 때

2. XCom vs Variable

Airflow에는 두 가지 주요 데이터 공유 메커니즘이 있습니다: XCom과 Variable입니다. 이 둘의 주요 차이점을 알아보겠습니다:

구분XComVariable
범위DAG 내에서만 사용 가능전역적으로 사용 가능
형식key-value 쌍key-value 쌍
용도태스크 간 데이터 교환전역 환경 설정 저장
생명주기DAG 실행과 함께 생성, 소멸영구적으로 유지

3. XCom 사용 시 주의사항

XCom은 모든 종류의 데이터 교환에 적합하지는 않습니다:

  • 소량의 데이터만 전달하는 것을 권장합니다.
  • DataFrames나 대용량 데이터 전송에는 적합하지 않습니다.
  • JSON으로 직렬화 가능한 데이터만 저장할 수 있습니다.

4. XCom 사용법

XCom을 사용하는 방법은 크게 세 가지가 있습니다:

4.1 PythonOperator에서 return 값 사용하기

PythonOperator에서는 함수의 반환값이 자동으로 XCom에 저장됩니다:

Copydef return_xcom():
    return "Hello from first task!"

first_task = PythonOperator(
    task_id = 'first_task',
    python_callable = return_xcom,
    dag = dag
)

4.2 push-pull 메서드 사용하기

명시적으로 xcom_push와 xcom_pull 메서드를 사용할 수 있습니다:

Copydef xcom_push_task(**context):
    # XCom에 데이터 저장
    context['task_instance'].xcom_push(key='my_value', value="Hello from push!")
    return "Return value also stored automatically"

def xcom_pull_task(**context):
    # 첫 번째 태스크의 반환값 가져오기
    first_task_value = context['task_instance'].xcom_pull(task_ids='first_task')
    # 키를 사용하여 값 가져오기
    push_value = context['ti'].xcom_pull(key='my_value')

    print(f"First task returned: {first_task_value}")
    print(f"Push value is: {push_value}")

4.3 Jinja 템플릿 사용하기

Jinja 템플릿을 사용하여 BashOperator 등에서 XCom 값을 활용할 수 있습니다:

Copybash_task = BashOperator(
    task_id='bash_task',
    bash_command='echo "The value from first task is: {{ task_instance.xcom_pull(task_ids="first_task") }}"',
    dag=dag
)

5. 전체 예제 DAG

다음은 XCom을 활용한 전체 DAG 예제입니다:

Copyfrom airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

dag = DAG(
    dag_id = 'xcom_example',
    start_date = datetime(2025, 4, 27),
    catchup=False,
    schedule_interval='@daily'
)

def return_xcom():
    return "Hello from first task!"

def xcom_push_task(**context):
    context['task_instance'].xcom_push(key='my_value', value="Hello from push!")
    return "Return value also stored automatically"

def xcom_pull_task(**context):
    first_task_value = context['task_instance'].xcom_pull(task_ids='first_task')
    push_value = context['ti'].xcom_pull(key='my_value')
    push_task_return = context['ti'].xcom_pull(task_ids='push_task')

    print(f"First task returned: {first_task_value}")
    print(f"Push value is: {push_value}")
    print(f"Push task returned: {push_task_return}")

first_task = PythonOperator(
    task_id = 'first_task',
    python_callable = return_xcom,
    dag = dag
)

push_task = PythonOperator(
    task_id = 'push_task',
    python_callable = xcom_push_task,
    dag = dag
)

pull_task = PythonOperator(
    task_id = 'pull_task',
    python_callable = xcom_pull_task,
    dag = dag
)

bash_task = BashOperator(
    task_id = 'bash_task',
    bash_command = 'echo "Value from first task: {{ ti.xcom_pull(task_ids="first_task") }}"',
    dag = dag
)

first_task >> push_task >> pull_task >> bash_task

6. Airflow UI에서 XCom 확인하기

Airflow 웹 인터페이스에서 XCom 값을 확인할 수 있습니다:

  1. Airflow UI에 로그인
  2. 상단 메뉴에서 Admin > XComs 선택
  3. 여기서 모든 XCom 항목을 확인할 수 있으며, DAG ID, Task ID, Key, Value 등 정보를 볼 수 있습니다.

7. XCom의 실제 활용 사례

XCom은 다음과 같은 실제 시나리오에서 매우 유용합니다:

  1. 데이터 처리 파이프라인
    • 첫 번째 태스크는 데이터를 추출(Extract)
    • 두 번째 태스크는 XCom을 통해 받은 데이터를 변환(Transform)
    • 세 번째 태스크는 변환된 데이터를 로드(Load)
  2. 조건부 워크플로우
    • 한 태스크에서 API 상태 확인
    • 상태를 XCom에 저장
    • 다음 태스크에서 XCom 값에 따라 다른 작업 수행
  3. 구성 설정 공유
    • 하나의 태스크가 데이터베이스에서 구성 설정 로드
    • 설정 값을 XCom에 저장
    • 다른 태스크들이 이 설정을 사용

8. 결론

Airflow의 XCom은 태스크 간 데이터 공유를 위한 강력한 메커니즘입니다. 소량의 데이터를 전송하는 데 적합하며, push-pull 방식의 간단한 인터페이스를 제공합니다. 복잡한 워크플로우에서 태스크 간 커뮤니케이션이 필요할 때 XCom을 사용하면 더 유연하고 모듈화된 DAG를 구성할 수 있습니다.

다만, 대용량 데이터 전송에는 적합하지 않으므로 S3, GCS, HDFS 같은 외부 스토리지를 사용하는 것이 좋습니다.


참고자료:

profile
AI를 꿈꾸는 BackEnd개발자

0개의 댓글