Airflow Xcom 인터페이스

스르륵·2022년 1월 28일
0

Airflow

목록 보기
3/3

Airflow는 기본적으로 하나의 오퍼레이터의 결과가 다른 오퍼레이터에 영향을주지 않는다. 각각이 독립적으로 실행되기 때문에 서로 통신할 수단이 없다. 하지만 워크플로우를 만들다보면 이전 작업의 결과, 요소 등을 다음 작업에 전달할 경우가 생긴다. 이때 Xcom을 이용해 전달할 수 있다.

하지만 xcom은 task 간의 통신을 위 한 메모 정도로 설계되었기 때문에 대용량 파일 전송 등의 용도로는 부적합하다. (MAX XCOM size 48Kb)

Example

이전 글에서 비트코인 가격을 불러오는 작업을 TaskFlow가 아닌 기존의 Operator로 만든 코드이다.

from airflow import DAG
from airflow.operators.python import PythonOperator

from datetime import datetime, timedelta
import requests
import logging


def extract_price():
    url = f"https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd&include_market_cap=true&\
                include_24hr_vol=true&include_24hr_change=true&include_last_updated_at=true"
    print(url)
    return requests.get(url).json()


def process_data(**context):
    response = context['task_instance'].xcom_pull(task_ids=f'extract_bitcoin_price')
    response = response['bitcoin']
    logging.info(response)
    return {'usd': response['usd'], 'change': response['usd_24h_change']}


def store_data(**context):
    data = context['task_instance'].xcom_pull(task_ids="process_data")
    logging.info(f"Store: {data['usd']} with change {data['change']}")


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    "start_date": datetime(2022, 1, 17),
    "provide_context": True
}

with DAG(
    'price_tracking',
    default_args=default_args,
    schedule_interval=timedelta(minutes=10),
    catchup=False,
    max_active_runs=3
) as dag:
    extract = PythonOperator(
        task_id=f'extract_bitcoin_price',
        python_callable=extract_price,
        dag=dag
    )

    process = PythonOperator(
        task_id="process_data",
        python_callable=process_data,
        op_kwargs={

        }
    )

    store = PythonOperator(
        task_id='saving_data',
        python_callable=store_data
    )

    extract >> process >> store

이때 extract_price의 결과를 가져와서 proces_data에서 처리해야 하는데 이 과정에서 Xcom을 사용한다.

response = context['ti'].xcom_pull(task_id='extract_bitcoin_price')

ti는 task_instance의 줄임말이고 pull을 통해 정보를 가져오는 것이다. 반대로 push는 정보를 보내 나중에 pull해서 사용할 수 있도록 한다. 그리고 함수의 리턴값은 항상 Xcom_push와 같은 결과를 가진다.


webserver에서 확인해보면 extract_price를 통해 Xcom에 위와 같은 딕셔너리가 push된 것을 볼 수 있다.
그 다음 작업인 process_data는 저 딕셔너리를 불러와서 사용하게 되는 것이다.

profile
기록하는 블로그

0개의 댓글