
이 게시물은
airflow 2.10.5버전을 사용해서 작성됐습니다.
3.x버전과 조금 다를 수 있으니 유의하시기 바랍니다.
XCOM 는 cross-communications 의 약자로 Task 와 Task 간에
데이터를 공유하기 위한 메커니즘(=기능)입니다.
XCOM 은 각각의 Operator 에서 꺼내 쓸 수 있는 TaskInstance 의 메소드인
xcom_push 을 통해서 공유할 데이터를 넣고,
xcom_pull 을 통해서 공유할 데이터를 조회할 수 있습니다.
이렇게 XCOM 을 통해서 공유된 데이터들은 Airflow 에서 관리하는
메타데이터 DB 에 저장되고, 추후에 필요에 따라 조회할 수 있습니다.
예를 다음과 같이 Airflow WEB UI 에서 다음과 같이 조회할 수 있습니다.
기본적인 XCOM 의 정의와 특징을 알았으니
XCOM 기능을 Operator 에서 사용하는 방법에 대해 가볍게 알아보죠!
주의사항
XCOM은 기본적으로 큰 데이터를 공유하기 위해서 만들어진 게 아니기 때문에
될 수 있으면 작은 데이터를 주고 받을 때 사용해주시기 바랍니다!
XCOM 을 TaskInstance 에 접근을 해야하는데,
PythonOperator 같이 함수를 사용하는 Operator 의 경우에는
함수의 파라미터에 **kwargs 를 하나 선언하고,
함수 내에서 kwargs.get('ti') 처럼해서 TaskInstance 를 꺼내올 수 있습니다.
예시 코드를 보면서 이 방법을 자세히 알아봅시다.
from airflow import DAG
from airflow.decorators import task
import pendulum
with DAG(
dag_id="dags_python_xcom_sample1",
start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
schedule="30 6 * * *",
catchup=False,
) as dag:
# 데코레이터로 PythonOperator 생성
@task(task_id="pushing_task_1")
def pushing_task_1(**kwargs):
# TaskInstance 꺼내오기!
ti = kwargs["ti"]
# xcom_push 메소드로 공유 데이터 INSERT
ti.xcom_push(key="key1", value="value1")
ti.xcom_push(key="list1", value=[1, 2])
ti.xcom_push(key="map1", value={"name": "coding_toast"})
return "pushing_task 111111 return value"
# 위처럼 함수 마지막에 return 을 하면 이것은 자동으로
# ti.xcom_push(key="return_value", value='pushing_task 111111 return value')
# 와 같은 동작을 합니다.
@task(task_id="pulling_task")
def pulling_task(**kwargs):
ti = kwargs["ti"]
# pushing_task_1 태스크에서 push 했던 데이터를 조회합니다.
keyVal = ti.xcom_pull(key="key1")
listVal = ti.xcom_pull(key="list1")
# ti.xcom_pull 에서 key 파라미터를 주지 않고,
# 오로지 task_ids 를 작성하면, 해당 task 의 key='return_value' 를 조회합니다.
return_value = ti.xcom_pull(task_ids="pushing_task_1")
print(keyVal)
print(listVal)
print(return_value)
pushing_task_1() >> pulling_task()
이후에 DAG 를 실행합니다.
이후에 pushing_task_1 의 XCom 데이터를 위와 같이 조회할 수 있습니다.
예외적으로 return_value 만 함수의 return 값을 사용하는 것만 제외하고
저희가 코드에서 명시적으로 작성한 key,value 값들이 잘 세팅된 것을 확인할 수 있습니다.

이후에 pulling_task 의 Logs 를 보면 앞서 pushing_task_1 에서
XCOM 으로 넣었던 공유 데이터가 잘 조회되는 것을 확인할 수 있습니다!
이번에는 주의해야될 부분을 알아보기 위해 다음과 같이 예시코드를 작성해봤습니다.
from airflow import DAG
from airflow.decorators import task
import pendulum
with DAG(
dag_id="dags_python_xcom_sample1",
schedule="30 6 * * *",
start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
catchup=False,
) as dag:
# 데코레이터로 PythonOperator 생성
@task(task_id="pushing_task_1")
def pushing_task_1(**kwargs):
# TaskInstance 꺼내오기!
ti = kwargs["ti"]
# xcom_push 메소드로 공유 데이터 INSERT
ti.xcom_push(key="key", value="value11111")
ti.xcom_push(key="list", value=[1, 2, 3])
return "return 111111"
@task(task_id="pushing_task_2")
def pushing_task_2(**kwargs):
ti = kwargs["ti"]
ti.xcom_push(key="key", value="value22222")
ti.xcom_push(key="list", value=[4, 5, 6])
return "return 222222"
@task(task_id="pulling_task")
def pulling_task(**kwargs):
# push task 의 구동 순서 때문에 pushing_task_2 의 세팅값이 조회됩니다.
ti = kwargs["ti"]
keyVal = ti.xcom_pull(key="key")
listVal = ti.xcom_pull(key="list")
return_value = ti.xcom_pull(key='return_value')
print("task_ids 값 주지 않고 조회하기:")
print(keyVal)
print(listVal)
print(return_value)
# 아래처럼 task_id 인자값을 줘야만 서로 다른 task 에서 같은 key 명칭을 쓰더라도
# 정확히 구분해서 조회할 수 있습니다!
ti = kwargs["ti"]
keyVal = ti.xcom_pull(key="key", task_ids="pushing_task_1")
listVal = ti.xcom_pull(key="list", task_ids="pushing_task_1")
return_value = ti.xcom_pull(key='return_value' # key='return_value' 는 생략가능!
, task_ids="pushing_task_1")
print('task_ids="pushing_task_1" 세팅 후 조회하기:')
print(keyVal)
print(listVal)
print(return_value)
pushing_task_1() >> pushing_task_2() >> pulling_task()
위 DAG 를 실행시켜보겠습니다.
위와 같이 pushing_task_1 다음에 pushing_task_2 가 있는 형태입니다.
마지막 실행 Task 인 pulling_task 의 Log 를 확인해보면
task_ids 를 주지 않고 xcom_pull 을 한 경우 pushing_task_2 에
의해서 같은 key 명칭 의 데이터를 넣었던 pushing_task_1 의 value 가
보이지 않는 현상이 발생합니다 🩻
반대로 task_ids='pushing_task_1' 를 처럼 명시적으로
표기한 경우에는 정확히 pushing_task_1 가 넣었던 value 를 조회할 수 있습니다.
이렇듯 task_ids 는 웬만하면 꼭 표기해주는 것이 안전하고 정확하게 XCOM 데이터를
조회할 수 있습니다. 유의하시기 바랍니다!
BashOperator 같이 함수를 쓰지 못하는 Operator 도 Jinja template 을
통해서 XCOM 을 사용할 수 있습니다. 예시 코드를 한번 보겠습니다.
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
import pendulum
with DAG(
dag_id="dags_bash_xcom_sample1",
schedule="0 0 * * *",
start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
catchup=False,
) as dag:
### BashOperator 에서 push 하고 PythonOperator 에서 pull 하는 예시:
# XCOM_PUSH 를 할 Bash Task 생성
xcom_push_with_bash_task = BashOperator(
task_id="xcom_push_with_bash_task",
bash_command="echo PUSHING TO XCOM! "
'{{ ti.xcom_push(key="name", value="CodingToastBread") }} && '
"echo PUSH_COMPLETE", # 마지막 출력문은 ti.xcom_push(key='return_value') 의 value 가 됩니다!
)
# 참고: 만약 마지막 출력문이 return_value 에 들어가지 않길 바라면
# BashOperator 에서 do_xcom_push=False 처럼 파라미터 세팅.
# XCOM_PULL 를 할 Python Task 생성
@task(task_id="xcom_pull_with_python_task")
def xcom_pull_with_python_task(**kwargs):
ti = kwargs["ti"]
name = ti.xcom_pull(key="name")
return_value = ti.xcom_pull(task_ids="xcom_push_with_bash_task")
print("xcom-data ==> name:", str(name))
print("xcom-data ==> return_value:", return_value)
xcom_push_with_bash_task >> xcom_pull_with_python_task()
### 반대로 PythonOperator 에서 push 하고 BashOperator 에서 pull 하는 예시:
@task(task_id="push_with_python_task")
def push_with_python_task(**kwargs):
kwargs["ti"].xcom_push(
key="personal_info",
value={"name": "CodingToastBread", "age": 0}
)
return "return value from push_with_python_task"
pull_with_bash_task = BashOperator(
task_id="pull_with_bash_task",
env={
"PERSONAL_INFO": '{{ ti.xcom_pull(key="personal_info", task_ids="push_with_python_task")["name"] }}',
"RETURN_VALUE": '{{ ti.xcom_pull(task_ids="push_with_python_task") }}',
},
bash_command=
'echo PERSONAL_INFO : $PERSONAL_INFO && '
'echo RETURN_VALUE : $RETURN_VALUE'
)
push_with_python_task() >> pull_with_bash_task
이제 DAG 를 한번 실행시켜보겠습니다.
원한대로 4개의 Task 가 잘 생성된 것을 확인하고,
BashOperator 를 통해서 xcom_push 데이터가 정상적으로 저장됐는지 확인할 수 있고,
이 데이터가 xcom_pull 한 PythonOperator 의 Log 를 통해 잘 전달된 것을 확인
반대로 PythonOperator 에서 데이터를 넣고,
BashOperator 에서 데이터를 정상적으로 조회할 수 있는 것을 확인할 수 있습니다!