섹션5: 데이터 공유(Bash Operator)

류홍규·2023년 8월 13일
0

airflow

목록 보기
2/18
post-thumbnail

Bash 오퍼레이터에서 Xcom 사용하기

  • Bash 오퍼레이터는 env, bash_command 파라미터에서 JinJa Template 문법을 이용하여 push/pull 이 가능하다.
  • 템플릿 문법에서도 ti객체 사용이 가능하다.
  • bash_command에서는 ""로 감싸진 출력하려는 문장이 return으로 간주된다. 그런데 리턴이 3개가 있을 수는 없으므로, 마지막 출력문이 자동으로 return_value로 간주된다.
  • task_ids만 지정하면 자동으로 xcom_pull을 통해 return_value를 자동으로 찾아준다.
  • do_xcom_push 파라미터: default값은 True로 자동으로 bash_command에서 작성한 값이 xcom에 push가 된다. do_xcom_push를 False로 설정하면 xcom에 push하지 않는다.

코드 작성

from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator

with DAG(
    dag_id='dags_bash_with_xcom',
    schedule="10 0 * * *",
    start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    bash_push = BashOperator(
        task_id="bash_push",
        bash_command="echo START && "
                     "echo XCOM_PUSHED "
                     "{{ ti.xcom_push(key='bash_pushed', value='first_bash_message')}} && "
                     "echo COMPLETE "
    )
    
    bash_pull = BashOperator(
        task_id='bash_pull',
        env={"PUSHED_VALUE": "{{ ti.xcom_pull(key='bash_pushed')}}",
             "RETURN_VALUE": "{{ ti.xcom_pull(task_ids='bash_push')}}"},
        bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE ",
        do_xcom_push=False
    )
    
    # task flow
    bash_push >> bash_pull

[2023-08-14, 00:50:44 KST] {subprocess.py:93} INFO - first_bash_message
[2023-08-14, 00:50:44 KST] {subprocess.py:93} INFO - COMPLETE
profile
공대생의 코딩 정복기

1개의 댓글

comment-user-thumbnail
2023년 8월 13일

유익한 자료 감사합니다.

답글 달기