
- Apache Airflow is a platform created by the community to programmatically author, schedule and monitor workflows. - Apache Airflow official
출처: 위키피디아

1. Metadata DB
작업 및 파이프라인의 메타데이터 저장소
task status(ex. queued, scheduled, running, success, failed, etc)가 저장됨
Airflow 첫 다운로드시 SQLite가 설치되는데, 이를 사용하기 위해서는 mySQL or Postgres를 연결해야함
2. Web Server
- 기본 뷰 구성
- 트리 뷰
- 그래프 뷰3. Scheduler
Airflow 구성요소의 핵심
일 다했어? 그럼 이거 시작해! 하는 역할 (DAG 파일을 모니터링 하고, 실행 시점이 되면 실행 큐(Executor)로 보내는 역할
모든 작업과 DAG를 모니터링하다가 Metadatabase 내 모든 작업의 status를 모니터링
특정 작업의 dependency가 만족되면 이를 실행시킬 뿐만 아니라, 모든 작업의 실행 순서 또한 결정
task_a >> tast_b로 정의한다면 Airflow는 이 의존성(dependency)이 충족되었는지 확인하고 다음을 실행 4. Worker
Scheduler와 짝을 이루는 실제 task를 수행하는 구성요소
Scheduler가 task를 실행하라고 지시하면 Worker가 실제로 파이썬 코드, Spark Job, SQL등을 실행
[Scheduler] -> (실행 명령) -> [Worker] -> (작업 처리: Python, SQL, Spark 등)
필요에 따라 scale-out 되어 병렬 작업이나 동시에 여러 task를 진행할 수 있음
Executor 및 airflow.cfg에 의해 작업 환경 구성이 완성됨
5. Airflow.cfg
6. DAGs
# datetime 모듈에서 timedelta 클래스를 import하여 시간 간격을 설정하는 데 사용
from datetime import timedelta
# Airflow의 DAG 클래스를 import (DAG: Directed Acyclic Graph)
from airflow import DAG
# BashOperator: Bash 명령어를 실행하는 Airflow Operator
from airflow.operators.bash import BashOperator
# days_ago 함수는 DAG의 시작일을 현재 날짜로부터 며칠 전으로 설정할 때 사용
from airflow.utils.dates import days_ago
# 기본 인자 설정: DAG 내 모든 Task에 공통 적용될 기본값
default_args = {
'owner': 'airflow', # DAG 소유자 이름
'retries': 1, # Task 실패 시 재시도 횟수
'retry_delay': timedelta(minutes=5), # 재시도 사이의 대기 시간
}
# DAG 정의: 'tutorial'이라는 ID를 가진 DAG 인스턴스 생성
with DAG(
'tutorial', # DAG의 고유 ID
default_args=default_args, # 위에서 정의한 기본 인자 적용
description='A simple tutorial DAG', # DAG 설명
schedule_interval=timedelta(days=1), # DAG 실행 주기: 하루마다 실행
start_date=days_ago(2), # DAG의 시작일: 현재로부터 2일 전
tags=['example'], # DAG를 분류하기 위한 태그
) as dag: # with 문을 통해 dag 변수에 DAG 인스턴스를 바인딩
# 첫 번째 Task 정의: 현재 날짜를 출력하는 Bash 명령 실행
t1 = BashOperator(
task_id='print_date', # Task의 고유 ID
bash_command='date', # 실행할 Bash 명령어
)
# 두 번째 Task 정의: 5초 동안 대기하는 Bash 명령 실행
t2 = BashOperator(
task_id='sleep', # Task의 고유 ID
depends_on_past=False, # 이전 실행 결과에 의존하지 않음
bash_command='sleep 5', # 실행할 Bash 명령어 (5초 대기)
retries=3, # Task 실패 시 최대 3회 재시도
)
# Task 간 의존성 정의: t1 → t2 순서로 실행됨
t1 >> t2
## 출처: KT cloud T-story
1. DAG
2. Task
3. Upstream / Downstream
4. start_date
catchup=False 설정 5. end_date
6. schedule_interval
@daily, timedelta(days=1), crontab('0 8 * * *') 7. Trigger DAG
8. Clear
9. Retry
에어플로우에 대해 잘 읽고 갑니다. 전혀 몰랐는데 꽤나 유용한 도구같아서 더 배워보고싶어졌어요! 감사합니다.