Bash 오퍼레이터 with XCOM

우상욱·2024년 3월 27일

Airflow Master Class

목록 보기
21/24

1. Bash 오퍼레이터에서 Xcom 사용


  • BASH 오퍼레이터는 env, bash_command 파라미터에서 Template 이용하여 push/pull
  • bash_command에서 마지막 출력 문장은 자동으로 return_value에 저장됩니다.
  • xcom pull 시 task_ids만 지정하면, 기본적으로 return_value 키 값에 저장된 것을 가져옵니다.
bash_push = BashOperator(
    task_id='bash_push',
    bash_command="echo START && "
                 "echo XCOM PUSHED "
                 "{{ ti.xcom_push(key='bash_pushed', value='first_bash_message') }} &&"
                 "echo COMPLETE"
    )

bash_pull = BashOperator(
    task_id='bash_pull',
    env={'PUSHED_VALUE':"{{ ti.xcom_pull('bash_pushed') }}",
         'RETURN_VALUE':"{{ ti.xcom_pull(task_ids='bash_push') }}"},
    bash_command = "echo $PUSHED_VALUE && echo $RETURN_VALUE",
    do_xcom_push=False

2. 실습


from airflow import DAG
import datetime
import pendulum
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_with_xcom",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False,
) as dag:

    bash_push = BashOperator(
        task_id="bash_push",
        bash_command="echo START && "
        "echo XCOM PUSHED "
        "{{ ti.xcom_push(key='bash_pushed', value='first_bash_message') }} &&"
        "echo COMPLETE",
    )

    bash_pull = BashOperator(
        task_id="bash_pull",
        env={
            "PUSHED_VALUE": "{{ ti.xcom_pull('bash_pushed') }}",
            "RETURN_VALUE": "{{ ti.xcom_pull(task_ids='bash_push') }}",
        },
        bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE",
        do_xcom_push=False,
    )
    bash_push >> bash_pull
  • do_xcom_push=False는 return_value에 최종적으로 출력된 문장을 xcom에 담지 않기 위함입니다.

잘 실행되는 것을 확인할 수 있습니다.

profile
데이터엔지니어

0개의 댓글