아파치 에어플로우(Apache Airflow)의 스케줄러 이해하기와 활용법

GarionNachal·2025년 4월 16일
0

airflow

목록 보기
3/8

목차

  1. 스케줄러(Scheduler)란?
  2. 스케줄러의 주요 기능과 역할
  3. 스케줄링 간격 설정 방법
  4. 일반적인 스케줄러 사용 패턴
  5. 스케줄러 성능 최적화
  6. 다중 스케줄러 운영
  7. 자주 발생하는 문제와 해결 방법

1. 스케줄러(Scheduler)란?

아파치 에어플로우의 스케줄러는 워크플로우 자동화를 가능하게 하는 핵심 구성 요소입니다. 스케줄러는 모든 DAG(Directed Acyclic Graph)와 태스크를 지속적으로 모니터링하고, 실행 조건이 충족되면 해당 태스크를 트리거합니다.

스케줄러는 에어플로우 시스템에서 다음과 같은 역할을 담당합니다:

  • DAG 파일을 지속적으로 파싱하여 DB와 동기화
  • 실행 시간이 된 DAG의 새로운 실행(DAG Run) 생성
  • 태스크 인스턴스의 의존성을 확인하고 실행 조건 충족 시 작업 예약
  • 실행자(Executor)에게 태스크 실행 위임
  • 리소스 풀(Pool) 및 병렬성 제한 관리

스케줄러는 설정된 airflow.cfg 구성 파일을 사용하여 동작하며, 프로덕션 환경에서는 지속적으로 실행되는 서비스로 설계되어 있습니다.

2. 스케줄러의 주요 기능과 역할

DAG 파일 처리

스케줄러는 DAG 디렉토리에 저장된 Python 파일을 지속적으로 스캔하고 파싱합니다. 이는 다음과 같이 수행됩니다:

  • 지정된 간격(dag_dir_list_interval)마다 DAG 디렉토리에서 새 파일 확인
  • 각 파일을 파싱하여 DAG 객체로 변환
  • 파싱된 DAG 정보를 메타데이터 데이터베이스에 저장
  • 변경사항을 감지하여 자동으로 업데이트

스케줄링 프로세스

스케줄러는 다음과 같은 단계로 작업을 예약합니다:

  1. 새로운 DAG Run이 필요한 DAG를 확인하고 생성
  2. 실행 가능한 태스크 인스턴스를 찾기 위해 활성 DAG Run 검사
  3. 의존성을 충족한 태스크를 식별(즉, 선행 태스크가 성공적으로 완료됨)
  4. 리소스 풀, 병렬성 제한 등을 고려하여 실행 가능한 태스크 선택
  5. 선택된 태스크를 실행자(Executor)에게 전달

중요한 특성

  • 스케줄러는 DAG에 정의된 start_date와 schedule_interval을 기반으로 첫 번째 DAG Run을 생성합니다.
  • schedule_interval이 있는 DAG의 경우, 스케줄러는 해당 간격이 끝난 후에만 실행됩니다. 예를 들어, @daily 스케줄은 하루가 끝난 후 실행됩니다.
  • 이로 인해 UI에서는 마치 Airflow가 작업을 하루 "늦게" 실행하는 것처럼 보일 수 있습니다.

3. 스케줄링 간격 설정 방법

에어플로우에서는 여러 방법으로 스케줄링 간격을 정의할 수 있습니다:

1) 프리셋 매크로 사용

에어플로우는 편의를 위해 몇 가지 내장 매크로를 제공합니다:

Copydag = DAG(
    'my_dag',
    schedule_interval='@daily',  # 매일 자정에 실행
    start_date=datetime(2023, 1, 1),
)

주요 프리셋:

  • @once: 한 번만 실행
  • @hourly: 매시간(0분)마다 실행
  • @daily: 매일 자정에 실행
  • @weekly: 매주 일요일 자정에 실행
  • @monthly: 매월 1일 자정에 실행
  • @yearly: 매년 1월 1일 자정에 실행
  • None: 수동 트리거만 허용

2) Cron 표현식 사용

더 복잡한 스케줄링을 위해 cron 표현식을 사용할 수 있습니다:

Copydag = DAG(
    'my_dag',
    schedule_interval='0 9 * * 1-5',  # 월-금 오전 9시에 실행
    start_date=datetime(2023, 1, 1),
)

Cron 표현식의 구성 요소:

  • 분 (0-59)
  • 시간 (0-23)
  • 일 (1-31)
  • 월 (1-12)
  • 요일 (0-6 또는 SUN-SAT)

주요 예시:

  • 0 * * * *: 매시간 정각에 실행
  • 0 0 * * *: 매일 자정에 실행
  • 0 0 * * MON: 매주 월요일 자정에 실행
  • 0 9-17 * * 1-5: 평일 오전 9시부터 오후 5시까지 매시간 실행

3) Timedelta 사용

파이썬의 datetime.timedelta를 사용하여 시간 간격 기반으로 설정할 수 있습니다:

Copyfrom datetime import datetime, timedelta

dag = DAG(
    'my_dag',
    schedule_interval=timedelta(days=3),  # 3일마다 실행
    start_date=datetime(2023, 1, 1),
)

이 방법은 비정규적 시간 간격이 필요할 때 유용합니다:

  • timedelta(hours=6): 6시간마다 실행
  • timedelta(days=2, hours=12): 2일 12시간마다 실행

4. 일반적인 스케줄러 사용 패턴

DAG 실행 흐름 이해하기

에어플로우 스케줄러의 작동 방식을 이해하기 위해서는 다음 개념을 파악하는 것이 중요합니다:

  • execution_date: 데이터 간격의 시작을 나타내는 시점입니다.
  • next_execution_date: 다음 데이터 간격의 시작 시점입니다.
  • data_interval_start/end: 데이터 처리 구간의 시작과 끝입니다.

스케줄러와 실행 모델

스케줄러는 다음과 같은 방식으로 태스크를 실행합니다:

  1. DAG Run 생성: 스케줄 간격에 따라 새로운 DAG Run이 생성됩니다.
  2. 태스크 인스턴스 생성: DAG Run이 시작되면 DAG의 모든 태스크에 대한 태스크 인스턴스가 생성됩니다.
  3. 의존성 확인: 태스크 의존성(>> 연산자로 정의)에 따라 실행 순서가 결정됩니다.
  4. 리소스 제약 적용: 풀, 병렬성 및 우선순위 설정이 적용됩니다.
  5. 태스크 실행: 조건을 만족하는 태스크가 실행자에게 위임됩니다.

데이터 분할 전략

데이터 처리 워크플로우에서는 일반적으로 다음과 같은 패턴을 사용합니다:

Copyfrom airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def process_data(ds, **kwargs):
    """
    ds: 데이터 간격의 시작 날짜 (YYYY-MM-DD 형식)
    """
    print(f"Processing data for {ds}")
    # 여기에 데이터 처리 로직 작성

dag = DAG(
    'data_processing',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
)

process_task = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    dag=dag,
)

이 패턴에서 각 DAG Run은 특정 날짜(ds)에 해당하는 데이터를 처리합니다.

5. 스케줄러 성능 최적화

에어플로우 스케줄러의 성능은 다양한 요소에 영향을 받습니다:

성능에 영향을 미치는 요소

  1. 파일 시스템 성능: DAG 파일 파싱은 I/O 집약적 작업입니다.
  2. 데이터베이스 연결: 많은 DAG와 태스크는 많은 DB 연결을 필요로 합니다.
  3. CPU 사용량: DAG 파싱 및 스케줄링 로직에 필요합니다.
  4. 메모리 사용량: 많은 DAG와 태스크는 많은 메모리를 사용합니다.

주요 최적화 설정

에어플로우 airflow.cfg 파일에서 다음 매개변수를 조정하여 성능을 최적화할 수 있습니다:

  • min_file_process_interval: DAG 파일 재파싱 간격 (초)
  • parsing_processes: 병렬로 실행할 DAG 파싱 프로세스 수
  • dag_dir_list_interval: DAG 디렉토리 스캔 간격 (초)
  • max_tis_per_query: 스케줄링 메인 루프의 쿼리 배치 크기
  • max_dagruns_to_create_per_loop: 루프당 생성할 최대 DAG Run 수
  • max_dagruns_per_loop_to_schedule: 루프당 예약할 최대 DAG Run 수

성능 향상을 위한 접근 방법

  1. DAG 코드 최적화: 복잡한 탑레벨 코드를 피하고 DAG 파싱 시 외부 데이터 소스 접근을 최소화
  2. 리소스 활용 개선: 필요에 따라 스케줄러, 파싱 프로세스 수 증가
  3. 하드웨어 용량 증가: CPU, 메모리, I/O 병목 현상 해결을 위한 리소스 증가
  4. 다중 스케줄러 운영: 성능이 CPU에 제한된 경우 스케줄러 추가

6. 다중 스케줄러 운영

에어플로우는 성능 및 고가용성을 위해 여러 스케줄러를 동시에 실행할 수 있습니다.

다중 스케줄러 요구사항

  • 데이터베이스 지원: PostgreSQL 12+, MySQL 8.0+ 권장
  • 행 수준 잠금(Row-level Locking): 스케줄러 간 조정을 위해 필요
  • 충분한 DB 연결: 각 스케줄러는 여러 DB 연결을 사용

다중 스케줄러 설정 방법

여러 스케줄러를 실행하려면:

  1. airflow.cfg에서 use_row_level_locking = True 확인

  2. 다른 머신이나 컨테이너에서 추가 스케줄러 시작:

    Copyairflow scheduler
    

주의사항

  • MariaDB는 버전 10.6.0 이전에서는 SKIP LOCKED 또는 NOWAIT SQL 절을 구현하지 않아 다중 스케줄러 운영 시 주의가 필요
  • PostgreSQL 사용 시 PGBouncer와 같은 연결 풀러 사용 권장

7. 자주 발생하는 문제와 해결 방법

DAG가 예상대로 실행되지 않는 경우

문제: DAG가 예상 시간에 실행되지 않음

해결방법:

  • start_date가 과거 시점으로 설정되었는지 확인
  • schedule_interval에 따라 실행 시점이 interval이 끝난 후임을 이해
  • DAG의 시간 관련 변수(execution_datedata_interval_start/end)를 로그에 출력하여 디버깅

스케줄러 성능 이슈

문제: 스케줄러가 느리거나 높은 리소스 사용률을 보임

해결방법:

  • 복잡한 탑레벨 DAG 코드 단순화
  • DAG 파일 수와 크기 최적화
  • min_file_process_interval 증가 고려(파싱 빈도 감소)
  • 하드웨어 리소스 증가 또는 다중 스케줄러 구성

데드록 또는 잠금 이슈

문제: 다중 스케줄러 운영 시 데드록이나 데이터베이스 잠금 이슈 발생

해결방법:

  • 지원되는 데이터베이스 버전 사용 확인(PostgreSQL 12+, MySQL 8.0+)
  • DB 연결 풀러(예: PGBouncer) 설정
  • max_dagruns_to_create_per_loop 값 낮추기

마치며

아파치 에어플로우의 스케줄러는 데이터 파이프라인 자동화의 핵심 구성 요소입니다. 스케줄러의 작동 원리를 이해하고 적절히 구성하면 안정적이고 효율적인 워크플로우 관리가 가능합니다.

특히, 스케줄링 간격 설정 방법을 숙지하고 성능 최적화 매개변수를 적절히 조정하는 것이 중요합니다. 대규모 워크로드의 경우 다중 스케줄러 구성을 통해 성능과 가용성을 향상시킬 수 있습니다.

에어플로우 스케줄러를 활용하여 데이터 처리 작업을 효과적으로 자동화하고, 궁극적으로 데이터 엔지니어링 워크플로우를 더 쉽게 관리하세요.

참고 자료

profile
AI를 꿈꾸는 BackEnd개발자

0개의 댓글