- Bash 오퍼레이터는
env
, bash_command
파라미터에서 JinJa Template 문법을 이용하여 push/pull 이 가능하다.
- 템플릿 문법에서도 ti객체 사용이 가능하다.
- bash_command에서는 ""로 감싸진 출력하려는 문장이 return으로 간주된다. 그런데 리턴이 3개가 있을 수는 없으므로, 마지막 출력문이 자동으로 return_value로 간주된다.
- task_ids만 지정하면 자동으로 xcom_pull을 통해 return_value를 자동으로 찾아준다.
do_xcom_push
파라미터: default값은 True로 자동으로 bash_command에서 작성한 값이 xcom에 push가 된다. do_xcom_push를 False로 설정하면 xcom에 push하지 않는다.
코드 작성
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
with DAG(
dag_id='dags_bash_with_xcom',
schedule="10 0 * * *",
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
bash_push = BashOperator(
task_id="bash_push",
bash_command="echo START && "
"echo XCOM_PUSHED "
"{{ ti.xcom_push(key='bash_pushed', value='first_bash_message')}} && "
"echo COMPLETE "
)
bash_pull = BashOperator(
task_id='bash_pull',
env={"PUSHED_VALUE": "{{ ti.xcom_pull(key='bash_pushed')}}",
"RETURN_VALUE": "{{ ti.xcom_pull(task_ids='bash_push')}}"},
bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE ",
do_xcom_push=False
)
bash_push >> bash_pull

[2023-08-14, 00:50:44 KST] {subprocess.py:93} INFO - first_bash_message
[2023-08-14, 00:50:44 KST] {subprocess.py:93} INFO - COMPLETE
유익한 자료 감사합니다.