Airflow Tutorial

스르륵·2022년 1월 28일
0

Airflow

목록 보기
2/3

Airflow는 task를 정의하고 task 간의 의존관계를 설정하여 DAG를 만들면 그 순서대로 작업을 실행시켜준다.

task정의 방법에는 크게 세 가지가 있다
1. Operator
2. Sensor
3. Taskflow

Operator

operator는 가장 기본적은 airflow의 task 정의 방법이다. 대표적으로 많이 쓰이는 operator는 bash command를 실행하는 BashOperator와 python 함수를 실행시키는 PythonOperator가 있다. 그 외에도 이메일을 전송하는 EmailOperatorMysqlOperator 등이 있다.

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
    'owner': 'airflow',
    'catchup': False,
    'execution_timeout': timedelta(hours=6),
    'depends_on_past': False,
}
dag = DAG(
    'sample',
    default_args=default_args,
    description="sample description",
    schedule_interval="@daily",
    start_date=days_ago(3),
    tags=['daily'],
    max_active_runs=3,
    concurrency=1
)

sample_a = BashOperator(
    task_id='sample_a',
    bash_command='echo hello',
    dag=dag)

sample_b = BashOperator(
    task_id='sample_b', 
    bash_command='echo world',
    dag=dag)

sample_a << sample_b

위 코드는 두 개의 BashOperator를 정의하였다. 각각 'hello'와 'world'를 출력하는 작업을 수행하는데, DAG 부분에서 하루에 한 번씩 실행하도록 설정했다.

Sensor

센서는 파일, 시간 등 외부 이벤트가 확인될 때 까지 기다리도록 하여 조건이 충족되야 다음 작업을 진행하도록 하는 것이다.
특정 파일의 존재 유무를 확인하는 FileSensor, 특정 시간을 기다리는 TimeSensor 등이 있다.

from airflow import DAG
from datetime import datetime, timedelta
from airflow.sensors.filesystem import FileSensor
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

default_args = {
    "start_date": days_ago(1),
    'retires': 0,
    "catchup": False,
    "retry_delay": timedelta(minutes=5)
}

with DAG(
    'sensor_test',
    default_args=default_args,
    schedule_interval="@once"
) as dag:
    t1 = FileSensor(
        task_id='sensor_a',
        fs_conn_id='file_sensor',
        filepath='a.txt',
        dag=dag
    )

    t2 = BashOperator(
        task_id='cat_a',
        bash_command='cat /Users/honginyoon/airflow/dags/a.txt && ls /Users/honginyoon/airflow/dags/',
        dag=dag
    )

    t1 >> t2

a.txt 라는 파일이 있으면 해당 파일 내용과 dags 디렉토리의 파일을 모두 출력하는 작업을 하도록 하는 코드이다.
만약 a.txt라는 파일이 없으면 t2에 해당하는 bash command는 실행되지 않는다.

TaskFlow

Airflow 2.0에서 추가된 기능으로, 파이썬의 데코레이터를 사용하여 기존의 Operator보다 간단하고 보기 쉬운 코드를 작성할 수 있도록 해준다.

from airflow.decorators import dag, task

from datetime import datetime, timedelta
import requests
import logging

default_args = {
    'onwer': 'airflow',
    'depends_on_past': False,
    "start_date": datetime(2022, 1, 17),
    "provide_context": True
}


@dag(default_args=default_args, schedule_interval=timedelta(minutes=5), catchup=False)
def taskflow():

    @task(task_id=f'extract', retries=3)
    def extract_price():
        url = f"https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd&include_market_cap=true&\
                    include_24hr_vol=true&include_24hr_change=true&include_last_updated_at=true"

        return requests.get(url).json()

    @task(task_id='process', multiple_outputs=True)
    def process_data(response):
        data = response['bitcoin']
        logging.info(data)
        return {'usd': data['usd'], 'change': data['usd_24h_change']}

    @task(task_id='store')
    def store_data(data):
        logging.info(f"Store: {data['usd']} with change {data['change']}")

    store_data(process_data(extract_price()))


dag = taskflow()

coingecko API를 통해 5분마다 비트코인 가격을 검색하고 저장하는 과정을 Airflow로 스케쥴링 한 것이다. (실제로 저장을 하진 않는다)

Airflows는 기본적으로 Operator가 독립적이기 때문에 서로 다른 operator간의 정보 교환을 위해서는 Xcom이라는 인터페이스를 사용해야 하지만, TaskFlow API에서는 함수 형태로 다른 task의 인자로 사용하여 좀더 간편하게 이전 task의 결과를 다음 task로 전달 할 수도 있다.

profile
기록하는 블로그

0개의 댓글