
@task(task_id = 'python_xcom_push_task)
def xcom_push(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key="result1", value="value_1")
ti.xcom_push(key="result2", value=[1,2,3])
@task(task_id = 'python_xcom_pull_task)
def xcom_pull(**kwargs):
ti = kwargs['ti']
value_key1 = ti.xcom_pull(key="result1")
#여러 task가 있을때에는 최근 값을 불러 오기때문에 task_id를 명시하는 것이 좋음
value_key2 = ti.xcom_pull(key="result2",task_ids = 'python_xcom_push_task)
print(value_key1)
print(value_key2)
@task(task_id='xcom_push_by_return')
def xcom_push_by_return(**kwargs):
transaction_value = 'status Good'
return transaction_value
@task(task_id='xcom_pull_by_return')
def xcom_pull_by_return(status, **kwargs):
print(status)
xcom_pull_by_return(xcom_push_by_return())
return 값은 자동을 xcom에 저장되기 때문에 별도에 작성이 필요없음
@task(task_id='xcom_push_by_return')
def xcom_push_by_return(**kwargs):
transaction_value = 'status Good'
return transaction_value
@task(task_id='xcom_pull_by_return')
def xcom_pull_return_by_method(**kwargs):
ti = kwargs['ti']
pull_value = ti.xcom_pull(key='return_value', task_ids='xcom_push_by_return')
print(pull_value)
xcom_pull_by_return(xcom_push_by_return())
최근 값인 xcom_push2의 값들이 불러오게 됨으로, xcom_push1을 값을 불러오려면 task_id명시 필요
from airflow import DAG
import pendulum
import datetime
from airflow.decorators import task
with DAG(
dag_id="dags_python_with_xcom_eg1",
schedule="30 6 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='python_xcom_push_task1')
def xcom_push1(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key="result1", value="value_1")
ti.xcom_push(key="result2", value=[1,2,3])
@task(task_id='python_xcom_push_task2')
def xcom_push2(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key="result1", value="value_2")
ti.xcom_push(key="result2", value=[1,2,3,4])
@task(task_id='python_xcom_pull_task')
def xcom_pull(**kwargs):
ti = kwargs['ti']
value1 = ti.xcom_pull(key="result1")
value2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task1')
print(value1)
print(value2)
xcom_push1() >> xcom_push2() >> xcom_pull()


from airflow import DAG
import pendulum
import datetime
from airflow.decorators import task
with DAG(
dag_id="dags_python_with_xcom_eg2",
schedule="30 6 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='python_xcom_push_by_return')
def xcom_push_result(**kwargs):
return 'Success'
@task(task_id='python_xcom_pull_1')
def xcom_pull_1(**kwargs):
ti = kwargs['ti']
value1 = ti.xcom_pull(task_ids='python_xcom_push_by_return')
print('xcom_pull 메서드로 직접 찾은 리턴 값:' + value1)
@task(task_id='python_xcom_pull_2')
def xcom_pull_2(status, **kwargs):
print('함수 입력값으로 받은 값:' + status)
python_xcom_push_by_return = xcom_push_result()
xcom_pull_2(python_xcom_push_by_return)
python_xcom_push_by_return >> xcom_pull_1()

