bash_push = BashOperator(
task_id='bash_push',
bash_command="echo START && "
"echo XCOM PUSHED "
"{{ ti.xcom_push(key='bash_pushed', value='first_bash_message') }} &&"
"echo COMPLETE"
)
bash_pull = BashOperator(
task_id='bash_pull',
env={'PUSHED_VALUE':"{{ ti.xcom_pull('bash_pushed') }}",
'RETURN_VALUE':"{{ ti.xcom_pull(task_ids='bash_push') }}"},
bash_command = "echo $PUSHED_VALUE && echo $RETURN_VALUE",
do_xcom_push=False
from airflow import DAG
import datetime
import pendulum
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dags_bash_with_xcom",
schedule="30 6 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False,
) as dag:
bash_push = BashOperator(
task_id="bash_push",
bash_command="echo START && "
"echo XCOM PUSHED "
"{{ ti.xcom_push(key='bash_pushed', value='first_bash_message') }} &&"
"echo COMPLETE",
)
bash_pull = BashOperator(
task_id="bash_pull",
env={
"PUSHED_VALUE": "{{ ti.xcom_pull('bash_pushed') }}",
"RETURN_VALUE": "{{ ti.xcom_pull(task_ids='bash_push') }}",
},
bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE",
do_xcom_push=False,
)
bash_push >> bash_pull
do_xcom_push=False는 return_value에 최종적으로 출력된 문장을 xcom에 담지 않기 위함입니다.
잘 실행되는 것을 확인할 수 있습니다.