Airflow는 기본적으로 하나의 오퍼레이터의 결과가 다른 오퍼레이터에 영향을주지 않는다. 각각이 독립적으로 실행되기 때문에 서로 통신할 수단이 없다. 하지만 워크플로우를 만들다보면 이전 작업의 결과, 요소 등을 다음 작업에 전달할 경우가 생긴다. 이때 Xcom을 이용해 전달할 수 있다.
하지만 xcom은 task 간의 통신을 위 한 메모 정도로 설계되었기 때문에 대용량 파일 전송 등의 용도로는 부적합하다. (MAX XCOM size 48Kb)
이전 글에서 비트코인 가격을 불러오는 작업을 TaskFlow가 아닌 기존의 Operator로 만든 코드이다.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
import logging
def extract_price():
url = f"https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd&include_market_cap=true&\
include_24hr_vol=true&include_24hr_change=true&include_last_updated_at=true"
print(url)
return requests.get(url).json()
def process_data(**context):
response = context['task_instance'].xcom_pull(task_ids=f'extract_bitcoin_price')
response = response['bitcoin']
logging.info(response)
return {'usd': response['usd'], 'change': response['usd_24h_change']}
def store_data(**context):
data = context['task_instance'].xcom_pull(task_ids="process_data")
logging.info(f"Store: {data['usd']} with change {data['change']}")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
"start_date": datetime(2022, 1, 17),
"provide_context": True
}
with DAG(
'price_tracking',
default_args=default_args,
schedule_interval=timedelta(minutes=10),
catchup=False,
max_active_runs=3
) as dag:
extract = PythonOperator(
task_id=f'extract_bitcoin_price',
python_callable=extract_price,
dag=dag
)
process = PythonOperator(
task_id="process_data",
python_callable=process_data,
op_kwargs={
}
)
store = PythonOperator(
task_id='saving_data',
python_callable=store_data
)
extract >> process >> store
이때 extract_price
의 결과를 가져와서 proces_data
에서 처리해야 하는데 이 과정에서 Xcom을 사용한다.
response = context['ti'].xcom_pull(task_id='extract_bitcoin_price')
ti
는 task_instance의 줄임말이고 pull을 통해 정보를 가져오는 것이다. 반대로 push는 정보를 보내 나중에 pull해서 사용할 수 있도록 한다. 그리고 함수의 리턴값은 항상 Xcom_push와 같은 결과를 가진다.
webserver에서 확인해보면 extract_price
를 통해 Xcom에 위와 같은 딕셔너리가 push된 것을 볼 수 있다.
그 다음 작업인 process_data
는 저 딕셔너리를 불러와서 사용하게 되는 것이다.