[Python] 스케줄링 관리는 Airflow로

Junkyu_Kang·2025년 10월 8일

오랜만에 블로그에 들어와서 글을 적네요. 이번 프로젝트가 생각보다 바빠서 9월 한달은 300시간 넘게 근무하며 보냈습니다.. 물론 아직 안끝난게 공포지만..

이번 포스팅에서는 평소 하던 스케줄링 방법에서 더 좋고 효율적인 방법을 찾아 변환한 과정을 적어보려합니다.

평소 00시, 03시 등에 데이터 취합 혹은 계산 후 저장 외에도 삭제, 수정 등의 스케줄링은 어느 개발 서비스에서든 필요한 요소라 판단했습니다.

저는 보통 Python의 내장 함수인 Schedule을 사용하였지만 이는 불필요하게 Task 관리가 들어감에 따라 역할을 분리하고, 효율을 증가시키기 위해 Airflow로 개선 방법을 잡았습니다.

🔄 Python Schedule에서 Airflow로 전환한 이유와 실무 적용기

1️⃣ 배경

초기에는 schedule 라이브러리를 사용해 간단한 주기 작업을 수행했습니다.
하지만 서비스가 확장되면서 다음과 같은 문제가 발생했습니다.

프로세스가 죽으면 스케줄링이 함께 멈춤

태스크 실패 시 재시도/알람 기능이 없음

실행 시점 충돌(중복 실행) 발생

병렬 처리나 의존 관계 관리가 어려움

이런 불안정한 환경을 개선하기 위해 Airflow를 도입했습니다.

2️⃣ Python schedule 사용 예시

import schedule
import time
from datetime import datetime

def fetch_data():
    print(f"[{datetime.now()}] Fetching data...")

def process_data():
    print(f"[{datetime.now()}] Processing data...")

# 매일 정해진 시각에 실행
schedule.every().day.at("00:00").do(fetch_data)
schedule.every().day.at("00:10").do(process_data)

while True:
    schedule.run_pending()
    time.sleep(1)

이 얼마나 공포스러운 코드인가.. 실제로 3시간 정도 다운 됐던 적이 있었는데 그 때 schedule이 정상 작동하지않아 모든 계산 로직이 꼬인 결과로 나타났습니다.

🔸 단점 요약

스케줄이 단일 프로세스 내에서 동작

프로세스 재시작 시 스케줄 리셋

실패나 중복 실행을 방지할 장치 없음

의존 관계(예: fetch 후 process)가 명확하지 않음

3️⃣ Airflow 전환 후 구조

Airflow는 단순히 “스케줄러”가 아니라 워크플로 오케스트레이터입니다.
즉, 각 태스크 간의 의존성, 실패 처리, 재시도, 병렬 실행 등을 체계적으로 관리합니다.

4️⃣ Airflow DAG 예시

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def fetch_data():
    print(f"[{datetime.now()}] Fetching data...")

def process_data():
    print(f"[{datetime.now()}] Processing data...")

default_args = {
    'owner': 'admin',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='daily_data_pipeline',
    default_args=default_args,
    description='데이터 수집 및 처리 파이프라인',
    schedule_interval='0 0 * * *',  # 매일 자정
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['data', 'pipeline']
) as dag:

    t1 = PythonOperator(
        task_id='fetch_data',
        python_callable=fetch_data
    )

    t2 = PythonOperator(
        task_id='process_data',
        python_callable=process_data
    )

    t1 >> t2  # 의존성 설정

5️⃣ Airflow의 주요 장점

✅ 1. 불필요한 태스크 중복 실행 방지

catchup=False 설정 시, 과거 누락된 스케줄을 자동 보충하지 않음

DAG 실행 상태(running, failed, success)를 자동 추적

✅ 2. 태스크 간 의존 관계 명확

t1 >> t2 형태로 순서 명시

이전 태스크 실패 시 다음 단계 자동 중단

✅ 3. 재시도 및 실패 알림

retries, retry_delay, email_on_failure 설정만으로 자동 재시도 및 알람

✅ 4. 웹 UI에서 실행 상태 실시간 모니터링

DAG 실행 기록, 로그, 리트라이 상태 확인 가능

작업 수동 재실행도 가능

✅ 5. 안정적인 스케줄 관리

Airflow 스케줄러/워커 구조 덕분에 시스템이 재시작되어도 DAG 유지

분산 환경에서 여러 태스크 병렬 실행 가능

6️⃣ Python schedule 대비 Airflow의 장단점 비교

구분Python ScheduleApache Airflow
실행 방식단일 프로세스분산 워커 기반
재시도/에러 관리수동 처리 필요자동화 가능
태스크 의존성직접 구현 필요DAG 구조로 명시적 표현
모니터링로그 직접 출력Web UI 제공
확장성로컬 한정클러스터 확장 용이
알림/이벤트별도 코드 필요내장 기능 있음
유지보수간단하지만 비효율적초기 설정 복잡하지만 안정적

7️⃣ 실제 전환 후 효과 (요약)

중복 실행 문제 해소

과거 schedule 환경에서 같은 job이 동시에 실행되며 DB Lock 발생했지만,
Airflow에서는 DAG Run 상태를 기준으로 중복 실행을 차단.

불필요한 태스크 제거

catchup=False, depends_on_past=False 설정으로
예전 날짜 job까지 몰아서 실행되는 문제 방지.

모니터링 효율 향상

실패 시 원인 분석이 쉬워지고, Web UI에서 로그 즉시 확인 가능.

유지보수 간소화

Python 코드로 DAG 정의 → 변경 사항 바로 반영 가능

Git에 버전 관리하여 이력 추적 용이

마무리

실제로 airflow를 처음 도입할 때 공부했던 시간 3시간? 정도를 제외하고는 쉽게 적용했던거 같습니다. 하지만 복잡한 쿼리 동작을 위한 모듈 사용과 의존성을 줄이기 위한 과정에서 중복이 늘어났던 부분은 좀 아쉬웠고, DB를 Postgre, Redis, Mongo까지 사용하다보니 생각보다 복잡하게 코드를 가져갔던 것들이 너무 아쉽게 느껴졌습니다.

이렇게 task와 dags를 20개 정도 만들면서 느낀 내용을 정리하였습니다.

profile
강준규

0개의 댓글