[MLOps] Airflow 시작하기

이한슬·2025년 4월 6일

MLOps

목록 보기
5/9
post-thumbnail

Airflow

Airflow
Airflow는 Airbnb에서 개발한 워크플로우를 작성하고, 스케줄링, 모니터링할 수 있는 오픈소스 플랫폼입니다.
데이터 파이프라인, 머신러닝, 배치 작업 워크플로우를 자동화하는 데 사용합니다.

구성 요소

  • DAG(Directed Acyclic Graph)
    • 순환하지 않는 그래프로 순차적으로 진행되는 워크플로우
  • Scheduler
    • DAG의 정의된 스케줄과 의존성을 기반으로 작업을 실행할 시점을 결정
    • 작업을 Executor에 제출하여 실행 관리
  • Executor
    • Scheduler가 제출한 작업을 실행
  • Worker
    • 실제로 작업을 수행하는 프로세스
  • Metadata Database
    • DAG, 작업 인스턴스, 실행 기록, 연결 설정 등 Airflow의 모든 메타데이터를 저장하는 DB
    • PostgreSQL, MySQL, SQLite와 같은 관계형 데이터베이스를 사용
  • Web Server
    • DAG(Directed Acyclic Graph)를 시각적으로 관리하고 모니터링할 수 있는 웹 기반 UI를 제공

Flow

1. DAG 정의

  • 작업 간의 의존성과 실행 순서를 설정합니다.
  • 각 작업은 Operator를 사용하여 정의됩니다.

Operator
특정 작업을 수행하는 역할을 수행하는 워크플로우 구성 요소

  • BashOperator: Bash 명령어 또는 스크립트를 실행합니다.
  • PythonOperator: Python 함수를 호출하여 작업을 수행합니다.
  • EmailOperator: 이메일을 보내는 작업을 수행합니다.
  • KubernetesPodOperator: Kubernetes Pod에서 Docker 이미지를 실행합니다.
  • Sensor Operators: 외부 이벤트나 조건을 기다리는 역할을 합니다.
  • Transfer Operators: 데이터를 한 시스템에서 다른 시스템으로 이동합니다.

2. DAG 등록

  • 작성된 DAG 파일은 Airflow의 DAG 폴더에 저장됩니다.
  • Scheduler가 주기적으로 DAG 폴더를 스캔하여 새로운 DAG을 등록합니다.

3. Scheduler의 스케줄링

  • Scheduler는 DAG에 정의된 스케줄에 따라 작업 실행 시점을 결정합니다.
  • DAG 내 각 작업의 의존성을 검토하여 실행 가능한 작업을 식별합니다.

4. Executor에 작업 제출

  • Scheduler는 실행 가능한 작업을 Executor에 제출합니다.
  • Executor는 작업을 처리할 Worker를 지정합니다.
  • 사용되는 Executor의 종류에 따라 다른 방식으로 실행됩니다.

1. SequentialExecutor

  • Airflow의 기본값으로, 작업을 한 번에 하나씩 순차적으로 실행합니다.
  • 모든 작업이 같은 프로세스에서 실행되며, 병렬 처리가 불가능합니다.
  • 장점
    • 설정이 매우 간단하고, 추가 인프라가 필요 없습니다.
    • 개발, 테스트, 디버깅 환경에 적합합니다.
  • 단점
    • 병렬 실행이 불가능해 대규모 워크플로우에는 부적합합니다.
    • 실제 운영 환경에서는 거의 사용하지 않습니다.

2. LocalExecutor

  • 단일 머신에서 여러 작업을 병렬로 실행할 수 있습니다.
  • Python의 멀티프로세싱을 활용해 여러 프로세스에서 작업을 동시에 처리합니다.
  • 장점
    • 단일 서버에서 병렬 처리가 가능해 소규모~중규모 워크플로우에 적합합니다.
    • 설치와 관리가 간단합니다.
  • 단점
    • 단일 머신의 자원 한계로 인해 확장성이 제한됩니다.
    • 대규모 분산 처리가 필요한 환경에는 부적합합니다.

3. CeleryExecutor

  • Celery 분산 작업 큐를 이용해 여러 워커 노드에 작업을 분산 실행합니다.
  • RabbitMQ, Redis 등 메시지 브로커가 필요하며, 워커 수평 확장이 가능합니다.
  • 장점
    • 여러 서버에 작업을 분산시켜 대규모 워크플로우에 적합합니다.
    • 높은 확장성과 내결함성을 제공합니다.
  • 단점
    • Celery, 메시지 브로커, 워커 등 추가 인프라가 필요해 설정이 복잡합니다.
    • 운영 및 모니터링에 추가적인 관리가 필요합니다.

4. KubernetesExecutor

  • 각 작업을 독립적인 Kubernetes Pod로 실행합니다.
  • 클러스터 자원을 유연하게 활용할 수 있고, 컨테이너 기반 워크로드에 최적화되어 있습니다.
  • 장점
    • 자동 확장, 자원 격리, 컨테이너화 등 최신 클라우드 환경에 적합합니다.
    • 워커가 필요할 때만 생성되어 자원 효율성이 높습니다.
  • 단점
    • Kubernetes 클러스터 환경이 필요하며, 설정과 운영이 복잡할 수 있습니다.
    • 로그 및 디버깅이 상대적으로 어렵습니다.

5. Worker에서 작업 수행

  • Worker는 지정된 작업을 실제로 실행합니다.
  • 작업 완료 후 상태를 Metadata Database에 기록합니다.

6. Metadata Database 업데이트

  • 모든 DAG 및 작업의 상태와 실행 기록이 Metadata Database에 저장됩니다.

7. Web Server를 통한 모니터링

  • DAG 상태 확인, 로그 검토, 수동 트리거 등을 수행할 수 있습니다.

8. 재시도 및 의존성 처리

  • 실패한 작업은 정의된 재시도 정책에 따라 다시 실행됩니다.
  • 모든 의존성이 충족되면 다음 작업이 실행됩니다.

로컬에서 Airflow 튜토리얼 진행하기(Mac M2 기준)

작업할 디렉토리를 하나 생성하겠습니다.

mkdir airflow-tutorial
cd airflow-tutorial

가상환경을 하나 생성해줍니다.

python -m venv venv
source venv/bin/activate 

Airflow를 설치합니다.

AIRFLOW_VERSION=2.5.1
PYTHON_VERSION=$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

Airflow의 홈 디렉토리를 설정합니다.
기본적으로는 ~/airflow로 설정되지만 저는 작업하고 있는 디렉토리로 지정하였습니다.

export AIRFLOW_HOME=$(pwd)/airflow

Airflow의 메타데이터 데이터베이스를 초기화합니다.

airflow db init

사용자 계정을 생성합니다.
중괄호안에 정보를 입력합니다.

airflow users create \
 --username {id} \
 --firstname {firstname} \
 --lastname {lastname} \
 --role Admin \
 --password {password} \
 --email {email}

Airflow 웹 서버를 실행합니다.

airflow webserver -p 8080

http://localhost:8080 에 접속하면 다음처럼 웹 서버에 접속할 수 있습니다.
생성한 계정으로 로그인합니다.

airflow 폴더 안에 있는 airflow.cfg 파일의 dags_folder 값을 확인합니다.
스케쥴러는 이 경로에 있는 DAG를 로드하므로 이 경로에 DAG 파일을 생성해야합니다. 폴더가 존재하지 않는다면 경로에 맞는 폴더를 생성하거나, DAG 폴더를 원하는 경로로 수정합니다.
DAG 예시들을 보이지 않기 위하여 load_examplesFalse로 바꾸겠습니다.

# airflow.cfg
[core]
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = ~~/airflow-tutorial/airflow/dags
load_examples = False

DAG 폴더 안에 tutorial.py 파일을 다음처럼 생성합니다.

# tutorial.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(
    'tutorial',
    default_args={'start_date': datetime(2023, 1, 1)},
    schedule_interval='*/1 * * * *',
    catchup=False,
) as dag:
    task = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

tutorial: DAG의 이름으로, 워크플로우를 식별하는 데 사용되며, 고유해야 합니다.
start_date: DAG의 시작 날짜를 정의합니다. 이 날짜 이후로 스케줄러가 작업을 실행합니다.
schedule_interval: 크론 표현식 또는 예약 문자열(@daily, @hourly, @weekly)을 사용하여 스케줄을 정의할 수 있습니다.
여기서는 크론 표현식을 사용했으며, 각 의미는 순서대로 다음과 같습니다.
*/1: 매 분, *: 매 시간, *: 매일, *: 매 월, *: 매 요일
*/1 * * * *으로 작성하였으므로 1분마다 실행됩니다.
catchup: True일 경우, 시작 날짜부터 현재 날짜까지 모든 이전 스케줄을 실행하려고 시도합니다. False로 설정하여 이전 스케줄을 무시하고 현재 시점부터 실행합니다.
task_id='print_date': 작업의 고유 식별자입니다.
bash_command='date': BashOperator를 사용하여 Bash 명령어를 실행합니다. 여기서는 'date' 명령어를 실행하여 현재 날짜와 시간을 출력합니다.

이제 스케쥴러를 실행합니다.

airflow scheduler

다시 웹 서버로 가서 tutorial DAG의 왼쪽 토글을 활성화하면 DAG가 활성화됩니다.
기본적으로 DAG가 생성되면 비활성화 상태이므로 아래 이미지처럼 웹 서버에서 활성화해야 합니다.

DAG가 실행되지 않는다면 스케쥴러를 재시작해줍니다.
정상적으로 실행되었다면 tutorial DAG를 클릭해 세부정보를 확인합니다.

Details 버튼을 누르고 success를 클릭합니다.

Task Id를 클릭하여 작업 세부 정보를 확인합니다.

Log 버튼을 눌러 로그를 확인합니다.

로그 마지막 부분에 다음처럼 DAG에 정의한 날짜 출력 코드가 정상적으로 수행된 것을 확인할 수 있습니다.

[2025-04-06, 08:21:25 UTC] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'date']
[2025-04-06, 08:21:25 UTC] {subprocess.py:86} INFO - Output:
[2025-04-06, 08:21:26 UTC] {subprocess.py:93} INFO - 202546일 일요일 172126초 KST

튜토리얼 DAG의 실행 흐름을 정리해보면 이와 같습니다.
1. 스케줄러가 활성화되면 1분 마다 tutorial DAG가 트리거됩니다.
2. DAG 내부의 task인 print_date가 실행됩니다.
3. Bash 명령어 date가 실행되어 현재 날짜와 시간이 출력됩니다.
4. 결과를 Airflow 웹 UI에서 확인합니다.

profile
궁금하면 일단 먹어보는 소프트웨어 전공생

0개의 댓글