Airflow는 task를 정의하고 task 간의 의존관계를 설정하여 DAG를 만들면 그 순서대로 작업을 실행시켜준다.
task정의 방법에는 크게 세 가지가 있다
1. Operator
2. Sensor
3. Taskflow
operator는 가장 기본적은 airflow의 task 정의 방법이다. 대표적으로 많이 쓰이는 operator는 bash command를 실행하는 BashOperator
와 python 함수를 실행시키는 PythonOperator
가 있다. 그 외에도 이메일을 전송하는 EmailOperator
나 MysqlOperator
등이 있다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
'owner': 'airflow',
'catchup': False,
'execution_timeout': timedelta(hours=6),
'depends_on_past': False,
}
dag = DAG(
'sample',
default_args=default_args,
description="sample description",
schedule_interval="@daily",
start_date=days_ago(3),
tags=['daily'],
max_active_runs=3,
concurrency=1
)
sample_a = BashOperator(
task_id='sample_a',
bash_command='echo hello',
dag=dag)
sample_b = BashOperator(
task_id='sample_b',
bash_command='echo world',
dag=dag)
sample_a << sample_b
위 코드는 두 개의 BashOperator
를 정의하였다. 각각 'hello'와 'world'를 출력하는 작업을 수행하는데, DAG
부분에서 하루에 한 번씩 실행하도록 설정했다.
센서는 파일, 시간 등 외부 이벤트가 확인될 때 까지 기다리도록 하여 조건이 충족되야 다음 작업을 진행하도록 하는 것이다.
특정 파일의 존재 유무를 확인하는 FileSensor
, 특정 시간을 기다리는 TimeSensor
등이 있다.
from airflow import DAG
from datetime import datetime, timedelta
from airflow.sensors.filesystem import FileSensor
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
default_args = {
"start_date": days_ago(1),
'retires': 0,
"catchup": False,
"retry_delay": timedelta(minutes=5)
}
with DAG(
'sensor_test',
default_args=default_args,
schedule_interval="@once"
) as dag:
t1 = FileSensor(
task_id='sensor_a',
fs_conn_id='file_sensor',
filepath='a.txt',
dag=dag
)
t2 = BashOperator(
task_id='cat_a',
bash_command='cat /Users/honginyoon/airflow/dags/a.txt && ls /Users/honginyoon/airflow/dags/',
dag=dag
)
t1 >> t2
a.txt 라는 파일이 있으면 해당 파일 내용과 dags
디렉토리의 파일을 모두 출력하는 작업을 하도록 하는 코드이다.
만약 a.txt라는 파일이 없으면 t2
에 해당하는 bash command는 실행되지 않는다.
Airflow 2.0에서 추가된 기능으로, 파이썬의 데코레이터를 사용하여 기존의 Operator보다 간단하고 보기 쉬운 코드를 작성할 수 있도록 해준다.
from airflow.decorators import dag, task
from datetime import datetime, timedelta
import requests
import logging
default_args = {
'onwer': 'airflow',
'depends_on_past': False,
"start_date": datetime(2022, 1, 17),
"provide_context": True
}
@dag(default_args=default_args, schedule_interval=timedelta(minutes=5), catchup=False)
def taskflow():
@task(task_id=f'extract', retries=3)
def extract_price():
url = f"https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd&include_market_cap=true&\
include_24hr_vol=true&include_24hr_change=true&include_last_updated_at=true"
return requests.get(url).json()
@task(task_id='process', multiple_outputs=True)
def process_data(response):
data = response['bitcoin']
logging.info(data)
return {'usd': data['usd'], 'change': data['usd_24h_change']}
@task(task_id='store')
def store_data(data):
logging.info(f"Store: {data['usd']} with change {data['change']}")
store_data(process_data(extract_price()))
dag = taskflow()
coingecko API를 통해 5분마다 비트코인 가격을 검색하고 저장하는 과정을 Airflow로 스케쥴링 한 것이다. (실제로 저장을 하진 않는다)
Airflows는 기본적으로 Operator가 독립적이기 때문에 서로 다른 operator간의 정보 교환을 위해서는 Xcom이라는 인터페이스를 사용해야 하지만, TaskFlow API에서는 함수 형태로 다른 task의 인자로 사용하여 좀더 간편하게 이전 task의 결과를 다음 task로 전달 할 수도 있다.