Apache Airflow에서 XCom은 Cross Communication의 약자로, 하나의 DAG(Directed Acyclic Graph) 내에서 서로 다른 태스크(Task) 간에 데이터를 주고받을 수 있게 해주는 기능입니다. 이는 마치 작업자들이 메모를 주고받는 것처럼 태스크 간 소통을 가능하게 합니다.
XCom은 다음과 같은 상황에서 특히 유용합니다:
Airflow에는 두 가지 주요 데이터 공유 메커니즘이 있습니다: XCom과 Variable입니다. 이 둘의 주요 차이점을 알아보겠습니다:
구분 | XCom | Variable |
---|---|---|
범위 | DAG 내에서만 사용 가능 | 전역적으로 사용 가능 |
형식 | key-value 쌍 | key-value 쌍 |
용도 | 태스크 간 데이터 교환 | 전역 환경 설정 저장 |
생명주기 | DAG 실행과 함께 생성, 소멸 | 영구적으로 유지 |
XCom은 모든 종류의 데이터 교환에 적합하지는 않습니다:
XCom을 사용하는 방법은 크게 세 가지가 있습니다:
PythonOperator에서는 함수의 반환값이 자동으로 XCom에 저장됩니다:
Copydef return_xcom():
return "Hello from first task!"
first_task = PythonOperator(
task_id = 'first_task',
python_callable = return_xcom,
dag = dag
)
명시적으로 xcom_push
와 xcom_pull
메서드를 사용할 수 있습니다:
Copydef xcom_push_task(**context):
# XCom에 데이터 저장
context['task_instance'].xcom_push(key='my_value', value="Hello from push!")
return "Return value also stored automatically"
def xcom_pull_task(**context):
# 첫 번째 태스크의 반환값 가져오기
first_task_value = context['task_instance'].xcom_pull(task_ids='first_task')
# 키를 사용하여 값 가져오기
push_value = context['ti'].xcom_pull(key='my_value')
print(f"First task returned: {first_task_value}")
print(f"Push value is: {push_value}")
Jinja 템플릿을 사용하여 BashOperator 등에서 XCom 값을 활용할 수 있습니다:
Copybash_task = BashOperator(
task_id='bash_task',
bash_command='echo "The value from first task is: {{ task_instance.xcom_pull(task_ids="first_task") }}"',
dag=dag
)
다음은 XCom을 활용한 전체 DAG 예제입니다:
Copyfrom airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
dag = DAG(
dag_id = 'xcom_example',
start_date = datetime(2025, 4, 27),
catchup=False,
schedule_interval='@daily'
)
def return_xcom():
return "Hello from first task!"
def xcom_push_task(**context):
context['task_instance'].xcom_push(key='my_value', value="Hello from push!")
return "Return value also stored automatically"
def xcom_pull_task(**context):
first_task_value = context['task_instance'].xcom_pull(task_ids='first_task')
push_value = context['ti'].xcom_pull(key='my_value')
push_task_return = context['ti'].xcom_pull(task_ids='push_task')
print(f"First task returned: {first_task_value}")
print(f"Push value is: {push_value}")
print(f"Push task returned: {push_task_return}")
first_task = PythonOperator(
task_id = 'first_task',
python_callable = return_xcom,
dag = dag
)
push_task = PythonOperator(
task_id = 'push_task',
python_callable = xcom_push_task,
dag = dag
)
pull_task = PythonOperator(
task_id = 'pull_task',
python_callable = xcom_pull_task,
dag = dag
)
bash_task = BashOperator(
task_id = 'bash_task',
bash_command = 'echo "Value from first task: {{ ti.xcom_pull(task_ids="first_task") }}"',
dag = dag
)
first_task >> push_task >> pull_task >> bash_task
Airflow 웹 인터페이스에서 XCom 값을 확인할 수 있습니다:
Admin
> XComs
선택XCom은 다음과 같은 실제 시나리오에서 매우 유용합니다:
Airflow의 XCom은 태스크 간 데이터 공유를 위한 강력한 메커니즘입니다. 소량의 데이터를 전송하는 데 적합하며, push-pull 방식의 간단한 인터페이스를 제공합니다. 복잡한 워크플로우에서 태스크 간 커뮤니케이션이 필요할 때 XCom을 사용하면 더 유연하고 모듈화된 DAG를 구성할 수 있습니다.
다만, 대용량 데이터 전송에는 적합하지 않으므로 S3, GCS, HDFS 같은 외부 스토리지를 사용하는 것이 좋습니다.
참고자료: