Airflow Study14- BashOperator with Xcom

박성현·2024년 6월 3일

Airflow

목록 보기
21/28
post-thumbnail

주의사항

Bash Operator 는 하나의 xcom_push와 return 값으로만 xcom 사용 가능하다 ?

xcom_push 함수를 한 번 호출하느냐, 두 번 호출하느냐입니다.
두 번째 코드에서 에러가 발생하는 원인은 Airflow의 Jinja 템플릿 렌더링 방식과 BashOperator의 작동 방식 때문입니다.
문제 분석:
Jinja 템플릿 렌더링: Airflow는 DAG 파일을 파싱할 때 Jinja 템플릿을 렌더링합니다. 이때 {{ ... }} 안에 있는 코드는 Python 코드로 실행됩니다. 하지만 bash_command 내의 xcom_push는 실제로 Bash 명령어가 아니기 때문에 렌더링 과정에서 오류가 발생합니다.
BashOperator 실행: BashOperator는 렌더링된 bash_command를 Bash 쉘에서 실행합니다. 이때 xcom_push는 Bash 명령어가 아니므로 "command not found" 에러 (종료 코드 127)가 발생합니다.

bash_command 활용 하는 push는 두개 이상은 안되는것 같고, env로 받아오는 pull은 여러개 받아오는게 가능한듯

bash_push = BashOperator(
        task_id = 'bash_push',
        bash_command="echo push Start"
                    "{{ti.xcom_push(key='key1', value='value1')}} && "
                    "echo push Finish"
    ) 

#위 코드는 에러가 안나고 잘 수행이 되는데 아래코드는 에러 발생 

    bash_push = BashOperator(
        task_id = 'bash_push',
        bash_command="echo push Start"
                    "{{ti.xcom_push(key='key1', value='value1')}} && "
                    "{{ti.xcom_push(key='key2', value='value2')}} && "
                    "echo push Finish"
    )

Bash Operator에서는 마지막 리턴값이 template에 저장됨

아래 코드에서 마지막 'COMPLETE'가 return_value에 저장됨
task_id로 xcom_pull 시 return_value값 불러옴

기존 템플릿과 유사
xcom에 넣을 때 {{ ti.xcom_push(key='bash_pushed',value='first_bash_message') }}
xcom에서 뺄 때 {{ ti.xcom_pull(key='bash_pushed') }}

from airflow import DAG
import pendulum
import datetime
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_with_xcom",
    schedule="10 0 * * *",
    start_date=pendulum.datetime(2024, 5, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    bash_push = BashOperator(
    task_id='bash_push',
    bash_command="echo START && "
                 "echo XCOM_PUSHED "
                 "{{ ti.xcom_push(key='bash_pushed',value='first_bash_message') }} && "
                 "echo COMPLETE"
    )

    bash_pull = BashOperator(
        task_id='bash_pull',
        env={'PUSHED_VALUE':"{{ ti.xcom_pull(key='bash_pushed') }}",
            'RETURN_VALUE':"{{ ti.xcom_pull(task_ids='bash_push') }}"},
        bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE ",
        do_xcom_push=False
    )

    bash_push >> bash_pull

profile
다소Good한 데이터 엔지니어

0개의 댓글