Airflow

홍찬우·2023년 7월 30일
0

Apache Airflow 소개

Batch Process

  • 예약된 시간에 실행되는 프로세스

    • 일회성도 가능하고, 주기적인 실행도 가능

      • e.g., 이번 주 일요일 07시에 1번 실행되는 프로세스

      • 매주 일요일 07시에 실행되는 프로세스

  • 머신러닝에서, 모델을 주기적으로 학습시키는 경우 사용

  • 그 외 개발에서 필요한 배치성 작업


Airflow 등장 전

  • Linux Crontab 사용

  • 정해진 시간에 predict.py 실행

  • 크론 표현식

    • batch process 스케줄링 정의한 표현식

    • 읽을 정도만 인지하면 좋음

    • 크론 표현식 제너레이터 사이트도 존재


  • 문제
    • 파일을 실행하다 오류가 발생한 경우, 별도 처리 없이 끝냄

    • 과거 실행 이력 및 실행 로그를 보기가 어려움

    • 여러 파일을 실행하거나, 복잡한 파이프라인을 만들기 힘듦

    • 간단히 사용할 수 있으나, 실패 시 재실행 / 실행 로그 확인 / 알람 등 기능이 없음

      ⇒ 좀 더 정교한 스케줄링 및 워크플로우 도구의 등장


Airflow 기능

  1. 파이썬을 사용해 스케줄링 및 파이프라인 작성

  2. 스케줄링 및 파이프라인 목록을 볼 수 있는 웹 UI 제공

    • 트리, 그래프 등 형식으로 시각화
  3. 실패 시 알람

  4. 실패 시 재실행 시도

  5. 동시 실행 워커 수

  6. 설정 및 변수 값 분리



Apache Airflow 실습

설치하고 실행하기

  1. Airflow 설치

    • pip install apache-airflow
  2. Airflow 기본 디렉토리 설정

    • 환경변수 AIRFLOW_HOME에 사용할 기본 디렉토리 경로 설정

      • export AIRFLOW_HOME=.
  3. Airflow DB 초기화

    • airflow db init
  4. Airflow 어드민 계정 생성

    • airflow user create
  5. Airflow 웹 서버 실행

    • airflow webserver
  6. Airflow 스케줄러 실행

    • airflow scheduler

DAG & Task

  • Airflow에서 스케줄링 할 작업을 DAG이라고 부름

  • Directed Acyclic Graph의 약자로, 순환하지 않는 방향이 존재하는 그래프

  • DAG 1개 = 1개의 파이프라인

  • Task: DAG 내에서 실행할 작업

    • 하나의 DAG에 여러 Task의 조합으로 구성

    • e.g., 머신러닝에서 예측을 한다 → 전처리, 학습, 예측이 각각 task, 이들이 모여 DAG

  • Task는 순차적으로 실행할 수도, 동시에(병렬로) 실행하게 할 수도 있음


DAG 작성하기 (hello world 예제)

  1. DAG 담을 디렉토리 생성 (이름은 무조건 dags)

  2. dags 폴더 내에 hello_world.py 생성

# hello_world.py
from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

def print_world() -> None:
    print("world")

# with 구문으로 DAG 정의를 시작합니다.
with DAG(
    dag_id="hello_world",  # DAG의 식별자용 아이디입니다.
    description="My First DAG",  # DAG에 대해 설명합니다.
    start_date=days_ago(2),  # DAG 정의 기준 2일 전부터 시작합니다.
    schedule_interval="0 6 * * *",  # 매일 06:00에 실행합니다.
    tags=["my_dags"],  # 태그 목록을 정의합니다. 추후에 DAG을 검색하는데 용이합니다.
) as dag:

    # 테스크를 정의합니다.
    # bash 커맨드로 echo hello 를 실행합니다.
    t1 = BashOperator(
        task_id="print_hello",
        bash_command="echo Hello",
        owner="heumsi",  # 이 작업의 오너입니다. 보통 작업을 담당하는 사람 이름을 넣습니다.
        retries=3,  # 이 테스크가 실패한 경우, 3번 재시도 합니다.
        retry_delay=timedelta(minutes=5),  # 재시도하는 시간 간격은 5분입니다.
    )

    # 테스크를 정의합니다.
    # python 함수인 print_world를 실행합니다.
    t2 = PythonOperator(
        task_id="print_world",
        python_callable=print_world,
        depends_on_past=True,
        owner="heumsi",
        retries=3,
        retry_delay=timedelta(minutes=5),
    )

    # 테스크 순서를 정합니다.
    # t1 실행 후 t2를 실행합니다.
    t1 >> t2
  • with DAG 으로 DAG 정의

    • 언제부터 스케줄링 시작할 지, 간격은 어떻게 할 지
  • BashOperator, PythonOperator로 Task 정의

    • BashOperator

      • bash command 실행

      • bash_command 파라미터에 bash로 실행할 command 전달

    • PythonOperator

      • Python 함수 실행

      • python_callable 파라미터에 실행할 python 함수 전달

  • t1>>t2

    • t1 이후에 t2를 실행하겠다 → >> 로 표시

실행하고 결과 확인

  • 파일 저장하고, 웹 UI를 확인하면 새로 생성한 DAG이 나타남

  • DAG 상세 페이지에서 DAG을 ON 상태로 변경
    • Auto-refresh도 ON 상태로 변경

  • 모든 Task가 success 되면, DAG도 success


Task 확인하기

  • 두 번째 DAG Run의 두 번째 Task Log 확인

  • 초록색 사각형 클릭 후 Log 클릭

  • 파이썬 함수가 실행되어 print(”world”) 실행


Clear

  • DAG Run의 기록을 지우고 다시 실행시키고 싶으면 Clear를 실행

유용한 Operator 간단 소개

  1. PythonOperator

    • 파이썬 함수 실행

    • 함수 뿐 아니라, callable 객체를 파라미터로 넘겨 실행 가능

    • 실행할 파이썬 로직을 함수로 생성한 후, PythonOperator로 실행

    • 함수에 ()를 빼고 넘겨야 함

  2. BashOperator

    • Bash 커맨드 실행

    • 파이썬이 아닌 경우 BashOperator로 실행 가능

      • e.g., shell script, scala 파일
  3. DummyOperator

    • 아무것도 실행하지 않는 Operator

    • DAG 내에서 Task를 구성할 때, 여러 개의 Task Success를 기다려야 하는 복잡한 Task 구성에서 사용

  4. SimpleHTTPOperator

    • 특정 호스트로 HTTP 요청을 보내고 Response 반환

    • 파이썬 ㅎ함수에서 requests 모듈 사용한 뒤 PythonOperator로 실행시켜도 무방

  5. 클라우드 기능을 추상화한 Operator도 존재(AWS, GCP 등) → Provider Packages

    • pip install apache-airflow[aws] 형태로 추가 설치 필요
  6. 이 외 DockerOperator, BranchOperator, KubernetesOperator 등 존재


Apache Airflow 아키텍처와 활용 방안

Airflow 기본 아키텍처

1. DAG Directory

  • DAG 파일 저장

    • 기본 경로는 AIRFLOW_HOME/dags

    • 스케줄러에 의해 .py 파일은 모두 탐색되고 DAG이 파싱

2. Scheduler

  • 각종 메타 정보 기록 담당

  • DAG Directory 내 .py 파일에서 DAG을 파싱해 DB에 저장

  • 실행 진행 상황과 결과를 DB에 저장

  • Executor를 통해 실제로 스케줄링 된 DAG을 실행

3. Executor

  • 스케줄링 된 DAG을 실행하는 객체로, 크게 2종류로 나뉨

    1) Local Executor

    • Local Executor

      • 하나의 DAG Run을 하나의 프로세스로 띄워서 실행

      • 최대로 생성할 프로세스 수를 정해야 함

      • Airflow를 간단하게 운영할 때 적합

    • Sequential Executor

      • 하나의 프로세스에서 모든 DAG Run을 처리

      • Airflow 기본 Executor로, 별도 설정이 없으면 이 Executor 사용

      • Airflow를 테스트로 잠시 운영할 때 적합

    2) Remote Executor (DAG Run을 외부 프로세스로 실행)

    • Celery Executor

      • DAG Run을 Celery Worker Process로 실행
    • Kubernetes Executor

      • 쿠버네티스 상에서 Airflow 운영

4. Workers

  • DAG을 실제로 실행

  • 스케줄러에 의해 생기고 실행

  • Executor에 따라 worker 형태가 다름

  • DAG Run 실행 과정에서 생긴 로그 저장

5. Metadata Database

  • 스케줄러에 의해 쌓인 메타 정보 저장

  • 보통 MySAQL이나 Postgres 사용

  • 실제 운영 환경에서는 local DB는 사용하지 않고 외부 DB 인스턴스 사용

6. Webserver

  • Web UI 담당

  • Metadata DB와 통신하며 유저에게 필요한 메타 데이터를 웹 브라우저 상에 띄우고 시각화

  • REST API도 제공


실제 회사에서의 구축 및 활용

  • Airflow 구축 방법으로 보통 3가지 방법 사용
  1. Managed Airflow (GCP Composer, AWS MWAA)

    • 클라우드 서비스 형태로 Airflow 사용하는 방법
  2. VM + Docker compose

    • 직접 VM(Virtual Machine) 위에서 Docker compose로 Airflow 배포

    • Airflow 구축에 필요한 컴포넌트 (Scheduler, Webserver, Database)를 Docker container 형태로 배포

  3. Kubernetes + Helm

    • Kubernetes 환경에서 Helm 차트로 Airflow 배포

MLOps 관점 Airflow

  • 주기적인 실행이 필요한 경우

    • Batch Training

    • Batch Serving

    • Inference 결과 기반 일자, 주차별 모델 performance report 생성

    • MySQL에 저장된 메타 데이터를 데이터 웨어하우스로 1시간 단위로 이동

    • S3, GCS 등 Object Storage

    • Feature Store 생성을 위한 batch ETL







※ 모든 이미지 및 코드 출처는 네이버 커넥트재단 부스트캠프 AI Tech 5기입니다. ※

profile
AI-Kid

1개의 댓글

comment-user-thumbnail
2023년 7월 30일

정보 감사합니다.

답글 달기

관련 채용 정보