Python 오퍼레이터 with XCOM

우상욱·2024년 3월 27일

Airflow Master Class

목록 보기
20/24

1. Xcom(Cross communication)


  • Airflow DAG 안 Task 간 데이터 공유를 위해 사용되는 기술
    • Task1 수행 중 내용이나 결과를 Task2에서 사용 또는 입력으로 주고 싶은 경우
  • 주로 작은 규모의 데이터 공유를 위해 사용
    • (Xcom 내용은 메타 DB의 xcom 테이블에 값이 저장됨)
    • 1GB 이상의 대용량 데이터 공유를 위해서는 외부 솔루션 사용 필요(AWS S3, HDFS 등)

2. Python 오퍼레이터에서 Xcom 사용하기


크게 두 가지 방법으로 Xcom 사용 가능

1) **kwargs에 이미 존재하는 ti(task_instance) 객체 활용

@task(task_id='python_xcom_push_task')
def xcom_push(**kwargs):
   ti = kwargs['ti']
   ti.xcom_push(key="result1", value="value_1")
   ti.xcom_push(key="result2", value=[1,2,3])

@task(task_id='python_xcom_pull_task')
def xcom_pull(**kwargs):
   ti = kwargs['ti']
   value_key1 = ti.xcom_pull(key="result1")
   value_key2 = ti.xcom_pull(key="result2", task_ids='python_xcome_push_task")
print(value_key1)
print(value_key2)

만약 task를 5개 가지고 있는 task가 있을 경우, 같은 키 값으로 받았을 때 문제가 생길 수 있으므로, 키 값을 똑같은 걸 가져와야한다면, xcom_pull 함수에서 task_ids 값을 파라미터로 작성해줘야한다.

2) 파이썬 함수의 return 값 활용(1안)

@task(task_id='xcom_push_by_return')
def xcom_push_by_return(**kwargs):
    transaction_value = 'status Good'
    return transaction_value

@task(task_id='xcom_pull_by_return')
def xcom_pull_by_return(status, **kwargs):
    print(status)
    
xcom_pull_by_return(xcom_push_by_return())
  • Task 데코레이터를 사용하면 함수 입력/출력 관계만으로 Task Flow가 정의됩니다.
  • 에어플로우는 리턴을 하면 자동으로 xcom에 저장하게 됩니다.
  • 또한 그 함수를 사용하면, 자동으로 Xcom에 있는 값을 꺼내옵니다.

3) 파이썬 함수의 return 값 활용(2안)

리턴한 값은 항상 자동으로 xcom에, key='return_value', task_ids=task_id로 저장됩니다.

@task(task_id='xcom_push_by_return')
def xcom_push_return(**kwargs):
    transaction_value='status Good'
    return transaction_value

@task(task_id='xcom_pull_by_return')
def xcom_pull_return_by_method(**kwargs):
    ti = kwargs['ti']
    pull_value = ti.xcom_pull(key='return_value', task_ids='xcom_push_by_return') # 여러 개의 태스크가 있을 땐, task_ids를 사용합니다.
    print(pull_value)

xcom_push_by_return() >> xcom_pull_return_by_method()

실습


1. 예제 1

with DAG(
    dag_id="dags_python_with_xcom_eg1",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False,
) as dag:

    @task(task_id="python_xcom_push_task1")
    def xcom_push1(**kwargs):
        ti = kwargs["ti"]
        ti.xcom_push(key="result1", value="value1")
        ti.xcom_push(key="result2", value=[1, 2, 3])

    @task(task_id="python_xcom_push_task2")
    def xcom_push2(**kwargs):
        ti = kwargs["ti"]
        ti.xcom_push(key="result1", value="value_2")
        ti.xcom_push(key="result2", value=[1, 2, 3, 4])

    @task(task_id="python_xcom_pull_task")
    def xcom_pull(**kwargs):
        ti = kwargs["ti"]
        value1 = ti.xcom_pull(key="result1")
        value2 = ti.xcom_pull(key="result2", task_ids="python_xcom_push_task1")
        print(value1)
        print(value2)

    xcom_push1() >> xcom_push2() >> xcom_pull()

예제 2.

from airflow import DAG
import datetime
import pendulum
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_xcom_eg2",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False,
) as dag:

    @task(task_id="python_xcom_push_by_return")
    def xcom_push_result(**kwargs):
        return "Success"

    @task(task_id="python_xcom_pull_1")
    def xcom_pull_1(**kwargs):
        ti = kwargs["ti"]
        value1 = ti.xcom_pull(task_ids="python_xcom_push_by_return")
        print("xcom_pull 메서드로 직접 찾은 리턴 값" + value1)

    @task(task_id="python_xcom_pull_2")
    def xcom_pull_2(status, **kwargs):
        print("함수 입력값으로 받은 값:" + status)

    python_xcom_push_by_return = xcom_push_result()
    xcom_pull_2(python_xcom_push_by_return)
    python_xcom_push_by_return >> xcom_pull_1()

실행 후에 각 task의 탭에서, XCom을 클릭하면, 저장 되어있는 Xcom 데이터를 확인할 수 있습니다.

profile
데이터엔지니어

0개의 댓글