섹션5: Python &Bash 오퍼레이터 간 Xcom 사용

류홍규·2023년 8월 13일
0

airflow

목록 보기
3/18
post-thumbnail

1. Python -> Bash 오퍼레이터 Xcom 전달

  • python Operator에서 return값은 자동으로 Xcom에 저장됨
  • Bash Operator에서 ti.xcom_pull/push는 envbash_command에서만 가능함

2. Bash -> Python 오퍼레이터 Xcom 전달

  • Bash Operator에서는 마지막에 출력된 문장이 return 값으로 간주되어 Xcom에 저장됨
  • ti객체를 활용해서 key와 task_ids 파라미터로 값을 가져와서 value를 출력하는 구조

from airflow import DAG
import pendulum
from airflow.decorators import task
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_python_with_xcom",
    schedule="30 9 * * *",
    start_date=pendulum.datetime(2023, 8, 1, tz='Asia/Seoul'),
    catchup=False
) as dag:
    
    # 1. python -> bash 
    @task(task_id='python_push')
    def python_push_xcom():
        result_dict = {'status': 'Good', 'data': [1, 2, 3], 'options_cnt': 100}
        return result_dict
    
    bash_pull = BashOperator(
        task_id='bash_pull',
        env={
            'STATUS': "{{ ti.xcom_pull(task_ids='python_push')['status']}}",
            'DATA': "{{ti.xcom_pull(task_ids='python_push')['data']}}",
            'OPTIONS_CNT': "{{ ti.xcom_pull(task_ids='python_push')['options_cnt']}}"
        },
        bash_command='echo $STATUS && echo $DATA && echo $OPTIONS_CNT'
    )
    
    python_push_xcom() >> bash_pull
    
    # 2. bash -> python
    bash_push = BashOperator(
        task_id = "bash_push",
        bash_command='echo PUSH_START '
                     '{{ ti.xcom_push(key="bash_pushed", value=200) }} && '
                     'echo PUSH_COMPLETE'
    )
    
    @task(task_id='python_pull')
    def python_pull_xcom(**kwargs):
        ti = kwargs['ti']
        status_value = ti.xcom_pull(key='bash_pushed')
        return_value = ti.xcom_pull(task_ids='bash_push')
        print("status_value: " + str(status_value))
        print("return_value: " + return_value)
    
    bash_pull >> python_pull_xcom()

# python -> bash 
[2023-08-14, 01:15:47 KST] {subprocess.py:86} INFO - Output:
[2023-08-14, 01:15:47 KST] {subprocess.py:93} INFO - Good
[2023-08-14, 01:15:47 KST] {subprocess.py:93} INFO - [1, 2, 3]
[2023-08-14, 01:15:47 KST] {subprocess.py:93} INFO - 100
# bash -> python
[2023-08-14, 01:15:48 KST] {logging_mixin.py:150} INFO - status_value: 200
[2023-08-14, 01:15:48 KST] {logging_mixin.py:150} INFO - return_value: PUSH_COMPLETE

profile
공대생의 코딩 정복기

0개의 댓글