[Airflow] XCOM 로 Task 간 데이터 공유하기

식빵·2025년 6월 14일
0

Airflow

목록 보기
5/9
post-thumbnail

이 게시물은 airflow 2.10.5 버전을 사용해서 작성됐습니다.
3.x 버전과 조금 다를 수 있으니 유의하시기 바랍니다.

📨 XCOM

XCOMcross-communications 의 약자로 TaskTask 간에
데이터를 공유하기 위한 메커니즘(=기능)입니다.

XCOM 은 각각의 Operator 에서 꺼내 쓸 수 있는 TaskInstance 의 메소드인
xcom_push 을 통해서 공유할 데이터를 넣고,
xcom_pull 을 통해서 공유할 데이터를 조회할 수 있습니다.

이렇게 XCOM 을 통해서 공유된 데이터들은 Airflow 에서 관리하는
메타데이터 DB 에 저장되고, 추후에 필요에 따라 조회할 수 있습니다.


예를 다음과 같이 Airflow WEB UI 에서 다음과 같이 조회할 수 있습니다.

  • DAG id, Task id 를 통해서 XCOM 의 출처를 정확히 파악할 수 있다는 점은 눈여겨 봐주세요!

기본적인 XCOM 의 정의와 특징을 알았으니
XCOM 기능을 Operator 에서 사용하는 방법에 대해 가볍게 알아보죠!

주의사항
XCOM 은 기본적으로 큰 데이터를 공유하기 위해서 만들어진 게 아니기 때문에
될 수 있으면 작은 데이터를 주고 받을 때 사용해주시기 바랍니다!



📡 PythonOperator 에서 XCOM 쓰는 법

XCOMTaskInstance 에 접근을 해야하는데,
PythonOperator 같이 함수를 사용하는 Operator 의 경우에는
함수의 파라미터에 **kwargs 를 하나 선언하고,
함수 내에서 kwargs.get('ti') 처럼해서 TaskInstance 를 꺼내올 수 있습니다.
예시 코드를 보면서 이 방법을 자세히 알아봅시다.


기본 예시

from airflow import DAG
from airflow.decorators import task
import pendulum

with DAG(
    dag_id="dags_python_xcom_sample1",
    start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
    schedule="30 6 * * *",
    catchup=False,
) as dag:

    # 데코레이터로 PythonOperator 생성
    @task(task_id="pushing_task_1")
    def pushing_task_1(**kwargs):
        # TaskInstance 꺼내오기!
        ti = kwargs["ti"]

        # xcom_push 메소드로 공유 데이터 INSERT
        ti.xcom_push(key="key1", value="value1")
        ti.xcom_push(key="list1", value=[1, 2])
        ti.xcom_push(key="map1", value={"name": "coding_toast"})
        
        return "pushing_task 111111 return value"
        # 위처럼 함수 마지막에 return 을 하면 이것은 자동으로
        # ti.xcom_push(key="return_value", value='pushing_task 111111 return value')
        # 와 같은 동작을 합니다.


	@task(task_id="pulling_task")
    def pulling_task(**kwargs):
        ti = kwargs["ti"]
        
        # pushing_task_1 태스크에서 push 했던 데이터를 조회합니다.
        keyVal = ti.xcom_pull(key="key1")
        listVal = ti.xcom_pull(key="list1")
        
        # ti.xcom_pull 에서 key 파라미터를 주지 않고,
        # 오로지 task_ids 를 작성하면, 해당 task 의 key='return_value' 를 조회합니다.
        return_value = ti.xcom_pull(task_ids="pushing_task_1")
        
        print(keyVal)
        print(listVal)
        print(return_value)

    pushing_task_1() >> pulling_task()

이후에 DAG 를 실행합니다.

이후에 pushing_task_1XCom 데이터를 위와 같이 조회할 수 있습니다.

예외적으로 return_value 만 함수의 return 값을 사용하는 것만 제외하고
저희가 코드에서 명시적으로 작성한 key,value 값들이 잘 세팅된 것을 확인할 수 있습니다.


이후에 pulling_task 의 Logs 를 보면 앞서 pushing_task_1 에서
XCOM 으로 넣었던 공유 데이터가 잘 조회되는 것을 확인할 수 있습니다!



🩻 tasks_id 파라미터 생략 시 주의사항

이번에는 주의해야될 부분을 알아보기 위해 다음과 같이 예시코드를 작성해봤습니다.

from airflow import DAG
from airflow.decorators import task
import pendulum

with DAG(
    dag_id="dags_python_xcom_sample1",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
    catchup=False,
) as dag:

    # 데코레이터로 PythonOperator 생성
    @task(task_id="pushing_task_1")
    def pushing_task_1(**kwargs):
        # TaskInstance 꺼내오기!
        ti = kwargs["ti"]

        # xcom_push 메소드로 공유 데이터 INSERT
        ti.xcom_push(key="key", value="value11111")
        ti.xcom_push(key="list", value=[1, 2, 3])
        return "return 111111"

    @task(task_id="pushing_task_2")
    def pushing_task_2(**kwargs):
        ti = kwargs["ti"]
        ti.xcom_push(key="key", value="value22222")
        ti.xcom_push(key="list", value=[4, 5, 6])
        return "return 222222"

    @task(task_id="pulling_task")
    def pulling_task(**kwargs):

        # push task 의 구동 순서 때문에 pushing_task_2 의 세팅값이 조회됩니다.
        ti = kwargs["ti"]
        keyVal = ti.xcom_pull(key="key")
        listVal = ti.xcom_pull(key="list")
        return_value = ti.xcom_pull(key='return_value')

        print("task_ids 값 주지 않고 조회하기:")
        print(keyVal)
        print(listVal)
        print(return_value)

        # 아래처럼 task_id 인자값을 줘야만 서로 다른 task 에서 같은 key 명칭을 쓰더라도 
        # 정확히 구분해서 조회할 수 있습니다!
        ti = kwargs["ti"]
        keyVal = ti.xcom_pull(key="key", task_ids="pushing_task_1")
        listVal = ti.xcom_pull(key="list", task_ids="pushing_task_1")
        return_value = ti.xcom_pull(key='return_value' # key='return_value' 는 생략가능!
                                    , task_ids="pushing_task_1") 
        
        print('task_ids="pushing_task_1" 세팅 후 조회하기:')
        print(keyVal)
        print(listVal)
        print(return_value)

    pushing_task_1() >> pushing_task_2() >> pulling_task()

위 DAG 를 실행시켜보겠습니다.


위와 같이 pushing_task_1 다음에 pushing_task_2 가 있는 형태입니다.


마지막 실행 Task 인 pulling_task 의 Log 를 확인해보면
task_ids 를 주지 않고 xcom_pull 을 한 경우 pushing_task_2
의해서 같은 key 명칭 의 데이터를 넣었던 pushing_task_1 의 value 가
보이지 않는 현상이 발생합니다 🩻

반대로 task_ids='pushing_task_1' 를 처럼 명시적으로
표기한 경우에는 정확히 pushing_task_1 가 넣었던 value 를 조회할 수 있습니다.

이렇듯 task_ids 는 웬만하면 꼭 표기해주는 것이 안전하고 정확하게 XCOM 데이터를
조회할 수 있습니다. 유의하시기 바랍니다!



📡 BashOperator 에서 XCOM 쓰는 법

BashOperator 같이 함수를 쓰지 못하는 OperatorJinja template
통해서 XCOM 을 사용할 수 있습니다. 예시 코드를 한번 보겠습니다.

예시 코드

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
import pendulum


with DAG(
    dag_id="dags_bash_xcom_sample1",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
    catchup=False,
) as dag:

    ### BashOperator 에서 push 하고 PythonOperator 에서 pull 하는 예시:

    # XCOM_PUSH 를 할 Bash Task 생성
    xcom_push_with_bash_task = BashOperator(
        task_id="xcom_push_with_bash_task",
        bash_command="echo PUSHING TO XCOM! "
        '{{ ti.xcom_push(key="name", value="CodingToastBread") }} && '
        "echo PUSH_COMPLETE",  # 마지막 출력문은 ti.xcom_push(key='return_value') 의 value 가 됩니다!
    )
    # 참고: 만약 마지막 출력문이 return_value 에 들어가지 않길 바라면
    # BashOperator 에서 do_xcom_push=False 처럼 파라미터 세팅.

    # XCOM_PULL 를 할 Python Task 생성
    @task(task_id="xcom_pull_with_python_task")
    def xcom_pull_with_python_task(**kwargs):
        ti = kwargs["ti"]
        name = ti.xcom_pull(key="name")
        return_value = ti.xcom_pull(task_ids="xcom_push_with_bash_task")
        print("xcom-data ==> name:", str(name))
        print("xcom-data ==> return_value:", return_value)


    xcom_push_with_bash_task >> xcom_pull_with_python_task()



    ### 반대로 PythonOperator 에서 push 하고 BashOperator 에서 pull 하는 예시:
    @task(task_id="push_with_python_task")
    def push_with_python_task(**kwargs):
        kwargs["ti"].xcom_push(
            key="personal_info", 
            value={"name": "CodingToastBread", "age": 0}
        )
        return "return value from push_with_python_task"


    pull_with_bash_task = BashOperator(
        task_id="pull_with_bash_task",
        env={
            "PERSONAL_INFO": '{{ ti.xcom_pull(key="personal_info", task_ids="push_with_python_task")["name"] }}',
            "RETURN_VALUE": '{{ ti.xcom_pull(task_ids="push_with_python_task") }}',
        },
        bash_command=
            'echo PERSONAL_INFO : $PERSONAL_INFO && '
            'echo RETURN_VALUE : $RETURN_VALUE'
    )

    push_with_python_task() >> pull_with_bash_task

이제 DAG 를 한번 실행시켜보겠습니다.


원한대로 4개의 Task 가 잘 생성된 것을 확인하고,


BashOperator 를 통해서 xcom_push 데이터가 정상적으로 저장됐는지 확인할 수 있고,
이 데이터가 xcom_pullPythonOperatorLog 를 통해 잘 전달된 것을 확인


반대로 PythonOperator 에서 데이터를 넣고,
BashOperator 에서 데이터를 정상적으로 조회할 수 있는 것을 확인할 수 있습니다!




참고한 링크

profile
백엔드 개발자로 일하고 있는 식빵(🍞)입니다.

0개의 댓글