아파치 에어플로우의 스케줄러는 워크플로우 자동화를 가능하게 하는 핵심 구성 요소입니다. 스케줄러는 모든 DAG(Directed Acyclic Graph)와 태스크를 지속적으로 모니터링하고, 실행 조건이 충족되면 해당 태스크를 트리거합니다.
스케줄러는 에어플로우 시스템에서 다음과 같은 역할을 담당합니다:
스케줄러는 설정된 airflow.cfg
구성 파일을 사용하여 동작하며, 프로덕션 환경에서는 지속적으로 실행되는 서비스로 설계되어 있습니다.
스케줄러는 DAG 디렉토리에 저장된 Python 파일을 지속적으로 스캔하고 파싱합니다. 이는 다음과 같이 수행됩니다:
dag_dir_list_interval
)마다 DAG 디렉토리에서 새 파일 확인스케줄러는 다음과 같은 단계로 작업을 예약합니다:
start_date
와 schedule_interval
을 기반으로 첫 번째 DAG Run을 생성합니다.schedule_interval
이 있는 DAG의 경우, 스케줄러는 해당 간격이 끝난 후에만 실행됩니다. 예를 들어, @daily
스케줄은 하루가 끝난 후 실행됩니다.에어플로우에서는 여러 방법으로 스케줄링 간격을 정의할 수 있습니다:
에어플로우는 편의를 위해 몇 가지 내장 매크로를 제공합니다:
Copydag = DAG(
'my_dag',
schedule_interval='@daily', # 매일 자정에 실행
start_date=datetime(2023, 1, 1),
)
주요 프리셋:
@once
: 한 번만 실행@hourly
: 매시간(0분)마다 실행@daily
: 매일 자정에 실행@weekly
: 매주 일요일 자정에 실행@monthly
: 매월 1일 자정에 실행@yearly
: 매년 1월 1일 자정에 실행None
: 수동 트리거만 허용더 복잡한 스케줄링을 위해 cron 표현식을 사용할 수 있습니다:
Copydag = DAG(
'my_dag',
schedule_interval='0 9 * * 1-5', # 월-금 오전 9시에 실행
start_date=datetime(2023, 1, 1),
)
Cron 표현식의 구성 요소:
주요 예시:
0 * * * *
: 매시간 정각에 실행0 0 * * *
: 매일 자정에 실행0 0 * * MON
: 매주 월요일 자정에 실행0 9-17 * * 1-5
: 평일 오전 9시부터 오후 5시까지 매시간 실행파이썬의 datetime.timedelta를 사용하여 시간 간격 기반으로 설정할 수 있습니다:
Copyfrom datetime import datetime, timedelta
dag = DAG(
'my_dag',
schedule_interval=timedelta(days=3), # 3일마다 실행
start_date=datetime(2023, 1, 1),
)
이 방법은 비정규적 시간 간격이 필요할 때 유용합니다:
timedelta(hours=6)
: 6시간마다 실행timedelta(days=2, hours=12)
: 2일 12시간마다 실행에어플로우 스케줄러의 작동 방식을 이해하기 위해서는 다음 개념을 파악하는 것이 중요합니다:
스케줄러는 다음과 같은 방식으로 태스크를 실행합니다:
>>
연산자로 정의)에 따라 실행 순서가 결정됩니다.데이터 처리 워크플로우에서는 일반적으로 다음과 같은 패턴을 사용합니다:
Copyfrom airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def process_data(ds, **kwargs):
"""
ds: 데이터 간격의 시작 날짜 (YYYY-MM-DD 형식)
"""
print(f"Processing data for {ds}")
# 여기에 데이터 처리 로직 작성
dag = DAG(
'data_processing',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
)
process_task = PythonOperator(
task_id='process_data',
python_callable=process_data,
dag=dag,
)
이 패턴에서 각 DAG Run은 특정 날짜(ds
)에 해당하는 데이터를 처리합니다.
에어플로우 스케줄러의 성능은 다양한 요소에 영향을 받습니다:
에어플로우 airflow.cfg
파일에서 다음 매개변수를 조정하여 성능을 최적화할 수 있습니다:
min_file_process_interval
: DAG 파일 재파싱 간격 (초)parsing_processes
: 병렬로 실행할 DAG 파싱 프로세스 수dag_dir_list_interval
: DAG 디렉토리 스캔 간격 (초)max_tis_per_query
: 스케줄링 메인 루프의 쿼리 배치 크기max_dagruns_to_create_per_loop
: 루프당 생성할 최대 DAG Run 수max_dagruns_per_loop_to_schedule
: 루프당 예약할 최대 DAG Run 수에어플로우는 성능 및 고가용성을 위해 여러 스케줄러를 동시에 실행할 수 있습니다.
여러 스케줄러를 실행하려면:
airflow.cfg
에서 use_row_level_locking = True
확인
다른 머신이나 컨테이너에서 추가 스케줄러 시작:
Copyairflow scheduler
SKIP LOCKED
또는 NOWAIT
SQL 절을 구현하지 않아 다중 스케줄러 운영 시 주의가 필요문제: DAG가 예상 시간에 실행되지 않음
해결방법:
start_date
가 과거 시점으로 설정되었는지 확인schedule_interval
에 따라 실행 시점이 interval이 끝난 후임을 이해execution_date
, data_interval_start/end
)를 로그에 출력하여 디버깅문제: 스케줄러가 느리거나 높은 리소스 사용률을 보임
해결방법:
min_file_process_interval
증가 고려(파싱 빈도 감소)문제: 다중 스케줄러 운영 시 데드록이나 데이터베이스 잠금 이슈 발생
해결방법:
max_dagruns_to_create_per_loop
값 낮추기아파치 에어플로우의 스케줄러는 데이터 파이프라인 자동화의 핵심 구성 요소입니다. 스케줄러의 작동 원리를 이해하고 적절히 구성하면 안정적이고 효율적인 워크플로우 관리가 가능합니다.
특히, 스케줄링 간격 설정 방법을 숙지하고 성능 최적화 매개변수를 적절히 조정하는 것이 중요합니다. 대규모 워크로드의 경우 다중 스케줄러 구성을 통해 성능과 가용성을 향상시킬 수 있습니다.
에어플로우 스케줄러를 활용하여 데이터 처리 작업을 효과적으로 자동화하고, 궁극적으로 데이터 엔지니어링 워크플로우를 더 쉽게 관리하세요.