Airflow 개념 및 실습

선욱·2025년 1월 6일

Airflow

목록 보기
1/1
post-thumbnail

Apache Airflow란

  • 기본 흐름 개요
  • Python코드로 워크플로우 를 작성하고 scheduling monitoringmonitoring하는 플랫폼
  • ETL작업을 자동화하고 DAG(Directed Acyclic Graph) 형태의 워크플로우 작성이 가능함
    • DAG: 작업 흐름을 구성하는 태스크들의 집합
      image
      • Directed(방향성): 태스크 간의 의존성과 실행 순서를 명확히 정의
      • Acyclic(비순환): 순환 구조가 없어 무한 루프를 방지
      • Graph(그래프): 노드(태스크)와 엣지(의존성)로 구성된 그래프 구조
  • AWS, GCP 모두 Airflow managed service를 제공

Airflow 이점

  • 각 작업 간 의존성을 명확히 정의
  • 실패한 스크립트에 대한 알림 쉬운 수정 및 재시도
  • 특정 날짜 작업의 재실행 기능을 제공

Airflow 주요 구성 요소

image

  • DAGs: 작업의 의존성, 실행 순서, 스케줄, 파라미터를 정의하는 파이썬 스크립트 모음. Airflow에서 실행될 워크플로우의 틀 설정
  • Operators: 데이터 추출, 변환, 로드 등 특정 작업을 수행하는 파이썬 클래스. 워크플로우 내에서 단일 작업 또는 일련의 작업의 추상화
  • Tasks: Operator의 인스턴스로, 워크플로우에서 실행될 각 작업 단위의 정의. 독립적 실행 또는 다른 Task와의 연결 가능
  • Schedulers: 설정된 스케줄에 따른 DAG 실행 역할. DAG 주기적 검사, 실행할 Task 트리거, 필요시 재시도 수행
  • Executor: Scheduler에 의해 등록된 Task들의 Worker 할당 및 작업 실행 관리 시스템. 다양한 Executor 유형을 통한 작업 분산 방식 결정
  • Worker: Task의 실제 실행 단위. 독립된 프로세스로 할당된 Task의 병렬 실행 가능
  • Webserver: 웹 기반 사용자 인터페이스. 사용자의 워크플로우 시각적 배포, 모니터링, 수정 가능 플랫폼
  • Airflow.cfg: 전반적인 동작 방식 설정(디렉토리 지정, DB 연결 정보, 보안 설정, 이메일 알림 설정 등)
  • Metadata DB: 저장정보(DAG 실행 기록, 태스크 상태, 변수와 연결 정보, 사용자 정보, 스케줄링 정보) PostgreSQL, MySQL 등 변경 가능

Airflow 공식 문서


Windows에서 Apache Airflow 설치 및 실습

Ubuntu 설치

  • WSL2를 통해 Linux 환경 사용
wsl --install # PowerShell 관리자 모드에서
  • WSL2를 통해 Ubuntu 설치
wsl --install Ubuntu-22.04

Python 설치(WSL Ubuntu)

  • Ubuntu 버전: Ubuntu 22.04.3 LTS
$ sudo apt update # Ubuntu 패키지 최신화
$ sudo apt install python3-pip # pip 패키지 관리자 설치
$ sudo apt install python3.10-venv # python 가상환경 모듈 설치

Airflow 설치 및 환경 설정

# 작업 디렉토리 생성
mkdir ~/airflow
cd ~/airflow
# 가상환경 생성 및 활성화
python3 -m venv venv
source venv/bin/activate
# Airflow 설치
pip install apache-airflow
# Airflow 홈 디렉토리 설정
export AIRFLOW_HOME=~/airflow
# Airflow DB 초기화
airflow db init
# Airflow 사용자 생성
airflow users create \
    --username admin \
    --firstname admin \
    --lastname admin \
    --role Admin \
    --email admin@example.com \
    --password admin

기본 DAG 예제 생성

  • Ubuntu nano 에디터 활용해서 dag파일 생성
nano first_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

# DAG 기본 인수 설정
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email': ['your-email@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# DAG 정의
dag = DAG(
    'first_dag',
    default_args=default_args,
    description='첫 번째 DAG',
    schedule_interval=timedelta(days=1),
)

# 태스크 정의
def print_hello():
    return 'Hello from Python!'

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = PythonOperator(
    task_id='print_hello',
    python_callable=print_hello,
    dag=dag,
)

# 태스크 순서 설정
t1 >> t2

디렉토리 구조 확인

DAG 파일 권한 설정

  • 소유자만 읽기,쓰기 나머지(그룹, 기타)는 읽기
chmod 644 ~/airflow/dags/first_dag.py

DAG 실행

# 가상환경 활성화 (아직 활성화되지 않은 경우)
source ~/airflow/venv/bin/activate

# DAG 목록 확인
airflow dags list

# DAG 테스트
airflow dags test first_dag 2024-01-01

웹 서버를 통해 DAG 실행 상태 확인

airflow webserver --port 8080
  • Airflow Webserver
    • 웹 인터페이스를 제공하는 서버를 실행
    • http://localhost:8080 에서 접근 가능
    • DAG 모니터링, 실행 상태 확인, 수동 실행 등을 할 수 있는 UI 제공
    • 포트 8080으로 웹서버를 실행 (다른 포트로도 변경 가능)
airflow scheduler
  • Airflow Scheduler
    • DAG를 스케줄링하고 실행하는 핵심 컴포넌트
    • 설정된 시간에 따라 DAG를 자동으로 실행
    • 태스크의 의존성을 관리하고 실행 순서를 제어
    • DAG 파일의 변경사항을 감지하고 업데이트





profile
데이터엔지니어입니다😁

0개의 댓글