크게 두 가지 방법으로 Xcom 사용 가능
**kwargs에 이미 존재하는 ti(task_instance) 객체 활용@task(task_id='python_xcom_push_task')
def xcom_push(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key="result1", value="value_1")
ti.xcom_push(key="result2", value=[1,2,3])
@task(task_id='python_xcom_pull_task')
def xcom_pull(**kwargs):
ti = kwargs['ti']
value_key1 = ti.xcom_pull(key="result1")
value_key2 = ti.xcom_pull(key="result2", task_ids='python_xcome_push_task")
print(value_key1)
print(value_key2)

만약 task를 5개 가지고 있는 task가 있을 경우, 같은 키 값으로 받았을 때 문제가 생길 수 있으므로, 키 값을 똑같은 걸 가져와야한다면, xcom_pull 함수에서 task_ids 값을 파라미터로 작성해줘야한다.
@task(task_id='xcom_push_by_return')
def xcom_push_by_return(**kwargs):
transaction_value = 'status Good'
return transaction_value
@task(task_id='xcom_pull_by_return')
def xcom_pull_by_return(status, **kwargs):
print(status)
xcom_pull_by_return(xcom_push_by_return())
리턴한 값은 항상 자동으로 xcom에, key='return_value', task_ids=task_id로 저장됩니다.
@task(task_id='xcom_push_by_return')
def xcom_push_return(**kwargs):
transaction_value='status Good'
return transaction_value
@task(task_id='xcom_pull_by_return')
def xcom_pull_return_by_method(**kwargs):
ti = kwargs['ti']
pull_value = ti.xcom_pull(key='return_value', task_ids='xcom_push_by_return') # 여러 개의 태스크가 있을 땐, task_ids를 사용합니다.
print(pull_value)
xcom_push_by_return() >> xcom_pull_return_by_method()
with DAG(
dag_id="dags_python_with_xcom_eg1",
schedule="30 6 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False,
) as dag:
@task(task_id="python_xcom_push_task1")
def xcom_push1(**kwargs):
ti = kwargs["ti"]
ti.xcom_push(key="result1", value="value1")
ti.xcom_push(key="result2", value=[1, 2, 3])
@task(task_id="python_xcom_push_task2")
def xcom_push2(**kwargs):
ti = kwargs["ti"]
ti.xcom_push(key="result1", value="value_2")
ti.xcom_push(key="result2", value=[1, 2, 3, 4])
@task(task_id="python_xcom_pull_task")
def xcom_pull(**kwargs):
ti = kwargs["ti"]
value1 = ti.xcom_pull(key="result1")
value2 = ti.xcom_pull(key="result2", task_ids="python_xcom_push_task1")
print(value1)
print(value2)
xcom_push1() >> xcom_push2() >> xcom_pull()
from airflow import DAG
import datetime
import pendulum
from airflow.decorators import task
with DAG(
dag_id="dags_python_with_xcom_eg2",
schedule="30 6 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False,
) as dag:
@task(task_id="python_xcom_push_by_return")
def xcom_push_result(**kwargs):
return "Success"
@task(task_id="python_xcom_pull_1")
def xcom_pull_1(**kwargs):
ti = kwargs["ti"]
value1 = ti.xcom_pull(task_ids="python_xcom_push_by_return")
print("xcom_pull 메서드로 직접 찾은 리턴 값" + value1)
@task(task_id="python_xcom_pull_2")
def xcom_pull_2(status, **kwargs):
print("함수 입력값으로 받은 값:" + status)
python_xcom_push_by_return = xcom_push_result()
xcom_pull_2(python_xcom_push_by_return)
python_xcom_push_by_return >> xcom_pull_1()
실행 후에 각 task의 탭에서, XCom을 클릭하면, 저장 되어있는 Xcom 데이터를 확인할 수 있습니다.


