
Bash Operator 는 하나의 xcom_push와 return 값으로만 xcom 사용 가능하다 ?
xcom_push 함수를 한 번 호출하느냐, 두 번 호출하느냐입니다.
두 번째 코드에서 에러가 발생하는 원인은 Airflow의 Jinja 템플릿 렌더링 방식과 BashOperator의 작동 방식 때문입니다.
문제 분석:
Jinja 템플릿 렌더링: Airflow는 DAG 파일을 파싱할 때 Jinja 템플릿을 렌더링합니다. 이때 {{ ... }} 안에 있는 코드는 Python 코드로 실행됩니다. 하지만 bash_command 내의 xcom_push는 실제로 Bash 명령어가 아니기 때문에 렌더링 과정에서 오류가 발생합니다.
BashOperator 실행: BashOperator는 렌더링된 bash_command를 Bash 쉘에서 실행합니다. 이때 xcom_push는 Bash 명령어가 아니므로 "command not found" 에러 (종료 코드 127)가 발생합니다.
bash_command 활용 하는 push는 두개 이상은 안되는것 같고, env로 받아오는 pull은 여러개 받아오는게 가능한듯
bash_push = BashOperator(
task_id = 'bash_push',
bash_command="echo push Start"
"{{ti.xcom_push(key='key1', value='value1')}} && "
"echo push Finish"
)
#위 코드는 에러가 안나고 잘 수행이 되는데 아래코드는 에러 발생
bash_push = BashOperator(
task_id = 'bash_push',
bash_command="echo push Start"
"{{ti.xcom_push(key='key1', value='value1')}} && "
"{{ti.xcom_push(key='key2', value='value2')}} && "
"echo push Finish"
)
Bash Operator에서는 마지막 리턴값이 template에 저장됨
아래 코드에서 마지막 'COMPLETE'가 return_value에 저장됨
task_id로 xcom_pull 시 return_value값 불러옴
기존 템플릿과 유사
xcom에 넣을 때 {{ ti.xcom_push(key='bash_pushed',value='first_bash_message') }}
xcom에서 뺄 때 {{ ti.xcom_pull(key='bash_pushed') }}
from airflow import DAG
import pendulum
import datetime
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dags_bash_with_xcom",
schedule="10 0 * * *",
start_date=pendulum.datetime(2024, 5, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
bash_push = BashOperator(
task_id='bash_push',
bash_command="echo START && "
"echo XCOM_PUSHED "
"{{ ti.xcom_push(key='bash_pushed',value='first_bash_message') }} && "
"echo COMPLETE"
)
bash_pull = BashOperator(
task_id='bash_pull',
env={'PUSHED_VALUE':"{{ ti.xcom_pull(key='bash_pushed') }}",
'RETURN_VALUE':"{{ ti.xcom_pull(task_ids='bash_push') }}"},
bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE ",
do_xcom_push=False
)
bash_push >> bash_pull
