[airflow] Xcoms 에 대한 리뷰

오현우·2022년 6월 13일
0

airflow

목록 보기
14/20

airflow를 이용하다 자주 마주치는 문제

airflow를 이용하다 보면 이전 task에서 생긴 작은 정보, file 경로 등을 이어 받고 싶을 때가 있다.

예시를 들어보자. 우리가 json 객체 http provider를 이용해 받았다고 생각해보자.

그럼 우리는 해당 json 객체를 db에 저장한 후 다시 db에서 꺼내오는 일을 해야한다.

그러나 위의 과정은 누군가의 입장에서 불편할 수 있다. 그냥 정보를 직접적으로 인계해주면 되지 굳이 쿼리까지 써가며 가져와야 한다는 의문이 있을 수 있다.

이럴때 사용하는 것이 Xcom 이다.

Xcom의 정의

airflow 공식 문서에서는 Xcom을 아래와 같이 정의한다.

XComs (short for “cross-communications”) are a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines.

즉 각각의 테스크 끼리 작은 정보를 주고 받을 수 있게하는 방법이다.

Xcom의 중요한 component

key: Xcom에 저장된 객체를 불러오기 위해 알아야할 요소 중 하나이다.
따로 지정해주지 않으면 default로 return_value로 설정되어 있다.

task_id: Xcom에 저장된 객체를 불러오기 위해 알아야할 요소 중 하나이다.
해당 task에서 발생되는 Xcom에 자연스럽게 매핑된다.

value: Xcom에 저장된 객체 또는 값이다.
해당 value를 불러오기 위해서는 key, task_id가 필요하다.

Xcom 간단하게 활용해보기

return 활용한 방법

우리는 머신러닝 모델 3개를 갖고 있고 어떤 것이 가장 높은 accuracy를 보이는지 체크해 보려한다.

때문에 각각 머신러닝 모델을 학습 시킨 뒤 정확도를 재보자.

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.subdag import SubDagOperator
from airflow.utils.task_group import TaskGroup

from random import uniform
from datetime import datetime

default_args = {
    'start_date': datetime(2020, 1, 1)
}

def _training_model():
    accuracy = uniform(0.1, 10.0)
    print(f'model\'s accuracy: {accuracy}')
    return accuracy

def _choose_best_model():
    print('choose best model')

with DAG('xcom_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:

    downloading_data = BashOperator(
        task_id='downloading_data',
        bash_command='sleep 3'
    )

    with TaskGroup('processing_tasks') as processing_tasks:
        training_model_a = PythonOperator(
            task_id='training_model_a',
            python_callable=_training_model
        )

        training_model_b = PythonOperator(
            task_id='training_model_b',
            python_callable=_training_model
        )

        training_model_c = PythonOperator(
            task_id='training_model_c',
            python_callable=_training_model
        )

    choose_model = PythonOperator(
        task_id='task_4',
        python_callable=_choose_best_model
    )

    downloading_data >> processing_tasks >> choose_model

어큐러시는 uniform 분포를 활용하여 0.1과 10 사이의 값에서 랜덤하게 추출하였다.

dag를 작동시켜보자.

작동시킨 후 해당 dag에서 발생한 xcom을 확인 해보면 아래와 같다.

key는 default로 return_value 로 저장되어 있고, 해당 task_id는 각각에 맞춰 매핑되어 있다.

잘 생각해보면 우리는 xcom을 따로 사용하지 않았는데도, 자동으로 xcom을 사용하고 있었다. 이게 무슨일 일까?

정답은

Xcom을 디폴트로 사용하는 오퍼레이터이기 때문이다. (ex bash, python)

때문에 공백의 return_value가 존재한다.

이번엔 우리가 직접 key를 지정해준 뒤 xcom_pull 및 push를 활용해보자.

xcom pull push로 활용하기

아래와 같이 traning_model을 통해 xcom에 key와 task_id를 통해 정확도를 넘겨주고, choose_best_model 함수를 통해 해당 정보들을 모두 불러 왔다.

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.subdag import SubDagOperator
from airflow.utils.task_group import TaskGroup

from random import uniform
from datetime import datetime

default_args = {
    'start_date': datetime(2020, 1, 1)
}

def _training_model(ti):
    accuracy = uniform(0.1, 10.0)
    print(f'model\'s accuracy: {accuracy}')
    ti.xcom_push(key='model_accuracy', value=accuracy)

def _choose_best_model(ti):
    print('choose best model')
    accuracies = ti.xcom_pull(key="model_accuracy", task_ids=[
        'processing_tasks.training_model_a',
        'processing_tasks.training_model_b',
        'processing_tasks.training_model_c'
    ])
    print(accuracies)

with DAG('xcom_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:

    downloading_data = BashOperator(
        task_id='downloading_data',
        bash_command='sleep 3'
    )

    with TaskGroup('processing_tasks') as processing_tasks:
        training_model_a = PythonOperator(
            task_id='training_model_a',
            python_callable=_training_model
        )

        training_model_b = PythonOperator(
            task_id='training_model_b',
            python_callable=_training_model
        )

        training_model_c = PythonOperator(
            task_id='training_model_c',
            python_callable=_training_model
        )

    choose_best_model = PythonOperator(
        task_id='choose_best_model',
        python_callable=_choose_best_model
    )

    downloading_data >> processing_tasks >> choose_best_model

xcom에 정상적으로 저장이 되었고, 잘 출력이 되었다.

여기서 비어있는 return value를 없애고자 하면 다음과 같이 하면 된다.

downloading_data = BashOperator(
	do_xcom_push=False,
    task_id='downloading_data',
    bash_command='sleep 3'
)

주의할 점

Xcom을 DAG에서 다음 task에 인수나 정보를 넘겨주는 아주 좋은 역할을 하지만, 해당 크기는 크지 않아야 한다.

구체적으로 우리가 covid data를 웹상에서 받은 뒤 (10gb 넘는 파일) 우리가 원하는 형태로 가공하려고 한다.

이러한 경우 xcom을 활용하면 db의 메모리가 터지므로 주의해서 사용해야 한다.

profile
핵심은 같게, 생각은 다르게

0개의 댓글