Airflow Study13- PythonOperator with Xcom

박성현·2024년 6월 2일

Airflow

목록 보기
20/28
post-thumbnail

Xcom(Cross Communication)

  • Airflow DAG 안 Task 간 데이터 공유를 위해 사용되는 기술
    ex) Task1의 수행 중 내용이나 결과를 Task2에서 사용 또는 입력으로 사용하고 싶은 경우
  • 주로 작은 규모의 데이터를 공유하기 위해 사용
    ( Xcom 내용은 메타 DB의 xcom 테이블에 값이 저장됨 , 1GB 이상은 대용량 AWS s3, HDFS 활용 )

sample1

@task(task_id = 'python_xcom_push_task)
def xcom_push(**kwargs):
	ti = kwargs['ti']
    ti.xcom_push(key="result1", value="value_1")
    ti.xcom_push(key="result2", value=[1,2,3])
    
@task(task_id = 'python_xcom_pull_task)
def xcom_pull(**kwargs):
	ti = kwargs['ti']
    value_key1 = ti.xcom_pull(key="result1")
    #여러 task가 있을때에는 최근 값을 불러 오기때문에 task_id를 명시하는 것이 좋음 
    value_key2 = ti.xcom_pull(key="result2",task_ids = 'python_xcom_push_task) 
    print(value_key1)
    print(value_key2)

sample2

@task(task_id='xcom_push_by_return') 
def xcom_push_by_return(**kwargs):
    transaction_value = 'status Good'
    return transaction_value

@task(task_id='xcom_pull_by_return')
def xcom_pull_by_return(status, **kwargs):
    print(status)

xcom_pull_by_return(xcom_push_by_return())

sample3

return 값은 자동을 xcom에 저장되기 때문에 별도에 작성이 필요없음

@task(task_id='xcom_push_by_return') 
def xcom_push_by_return(**kwargs):
    transaction_value = 'status Good'
    return transaction_value


@task(task_id='xcom_pull_by_return')
def xcom_pull_return_by_method(**kwargs):
    ti = kwargs['ti']
	pull_value = ti.xcom_pull(key='return_value', task_ids='xcom_push_by_return')
	print(pull_value)

xcom_pull_by_return(xcom_push_by_return())

dags_python_with_xcom_eg1.py

최근 값인 xcom_push2의 값들이 불러오게 됨으로, xcom_push1을 값을 불러오려면 task_id명시 필요


from airflow import DAG
import pendulum
import datetime
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_xcom_eg1",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    @task(task_id='python_xcom_push_task1')
    def xcom_push1(**kwargs):
        ti = kwargs['ti']
        ti.xcom_push(key="result1", value="value_1")
        ti.xcom_push(key="result2", value=[1,2,3])

    @task(task_id='python_xcom_push_task2')
    def xcom_push2(**kwargs):
        ti = kwargs['ti']
        ti.xcom_push(key="result1", value="value_2")
        ti.xcom_push(key="result2", value=[1,2,3,4])

    @task(task_id='python_xcom_pull_task')
    def xcom_pull(**kwargs):
        ti = kwargs['ti']
        value1 = ti.xcom_pull(key="result1")
        value2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task1')
        print(value1)
        print(value2)


    xcom_push1() >> xcom_push2() >> xcom_pull()



dags_python_with_xcom_eg2.py

from airflow import DAG
import pendulum
import datetime
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_xcom_eg2",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    
    @task(task_id='python_xcom_push_by_return')
    def xcom_push_result(**kwargs):
        return 'Success'


    @task(task_id='python_xcom_pull_1')
    def xcom_pull_1(**kwargs):
        ti = kwargs['ti']
        value1 = ti.xcom_pull(task_ids='python_xcom_push_by_return')
        print('xcom_pull 메서드로 직접 찾은 리턴 값:' + value1)

    @task(task_id='python_xcom_pull_2')
    def xcom_pull_2(status, **kwargs):
        print('함수 입력값으로 받은 값:' + status)


    python_xcom_push_by_return = xcom_push_result()
    xcom_pull_2(python_xcom_push_by_return)
    python_xcom_push_by_return >> xcom_pull_1()


profile
다소Good한 데이터 엔지니어

0개의 댓글