241210 TIL #559 AI Tech #92 Airflow

김춘복·2024년 12월 10일
0

TIL : Today I Learned

목록 보기
561/575

Today I Learned

오늘 배운 내용은 배치 프로세싱과 스케줄링에 사용하는 airflow에 대해 공부했다.


Batch Processing

SW를 예약된 시간에 자동으로 실행하는 방법. 일정 기간동안 일괄적으로 작업 수행

  • Batch Serving
    일정 기간동안 일괄적으로 머신러닝 작업을 수행

  • 간단한 Batch Processing은 Linux의 Crontab을 활용한다.
    하지만 로그 보기도 어렵고 오류 처리를 하기 힘들고 알람도 없어서 복잡하게는 힘들다.

  • Cron 표현식
    *은 매 ~시간마다 라는 뜻. ex) 0 * * * * 는 매 0분 즉 매 시간 정각마다
    cronmaker 같은 사이트 이용하면 편하다.
    이미지 출처 : bryceandy

Airflow

데이터 파이프라인을 개발, 스케줄링, 모니터링하기 위한 파이썬 기반의 오픈소스 플랫폼

이미지 출처 : @alsgur

  • Task 간의 의존성과 실행 순서를 명확하게 표현

  • 워크플로우 관리 도구로, 데이터 파이프라인 흐름을 파이썬 코드로 작성해 스케쥴링 및 모니터링
    (웹 UI로 제공)

  • 다양한 클라우드 서비스 및 빅데이터 도구와 연동

  • 자동 재시도 및 알림 기능 제공

  • 무겁기도 하지만 기능이 매우 많고 확장성이 좋아서 스케쥴링과 파이프라인 도구로 많이 쓰인다.

  • 조건에 따라 작업 분기를 나눌 수도 있다.

기본 구성

이미지 출처 : apache

DAGs

  • Directed Acyclic Graph라는 순환하지 않는 일반 그래프 자료구조.

  • Airflow에선 DAG로 실행할 작업들을 순서에 맞게 워크플로우를 구성한다.

  • 방향성을 가지면서 순환하지 않는 그래프 구조. 이전 태스크가 완료되면 다음 태스크 실행
    논리적 오류는 데드락으로 이어진다.

Operator

  • 작업 유형(Task)을 정의할 때 사용. 오퍼레이터를 실행하면 Task가 된다. 클래스 형태.

  • 기능이나 명령을 수행하는 Action Operator(bash, python, sql, email Operator 등), 시스템을 다른 시스템으로 옮기는 Transfer Operator, 조건 충족 시 다음 Task를 실행시키는 Sensor Operator가 있다.

  • PythonOperator
    파이썬 함수를 실행. callable한 객체를 파라미터로 넘겨서 실행할 수도 있다.
    미리 함수를 정의해두고 전달하면 된다.

  • BashOperator
    Bash 커맨드를 실행

  • DummyOperator
    아무것도 실행하지 않는 더미. 보통 다른 작업 순서를 기다려야할 때 더미를 쓴다.

  • SimpleHttpOperator
    HTTP 요청을 보내고 Response를 반환 받음.

  • BranchPythonOperator
    조건에 따라 실행을 제어할 수 있는 Operator

  • 외부 서드파티(docker, aws 등)와 연동하는 operator들은 extra package 설치 후 사용 가능

Scheduler

  • 워크플로우의 실행을 관리하고 오케스트레이션하는 핵심 구성 요소

  • DAGs를 보고 실행할 태스크를 결정하고 대기열에 배치하는 스케줄링 작업을 수행

DAG Directory

  • 파이썬으로 작성된 DAG 파일을 저장하는 공간

  • 기본 경로 : $AIRFLOW_HOME/dags/

기타

  • Excutor : 작업이 실행되는 환경
  • Sensor : 외부 이벤트를 기다리면서 특정 조건 만족시 실행됨
  • XComs : task끼리 결과를 주고받고 싶은 경우 사용
  • Jinja Template : 파이썬 템플릿 문법.(Fast API에서도 사용)
    ex) {{ ds }} = YYYY-MM-DD / {{ ts }} = 2018-01-01T00:00:00+00:00

예제

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

def print_hello():
    return "Hello"

def print_world():
    return "World"

# with 구문을 사용한 DAG 정의. 이름, 태그, 언제부터, 간격을 어떻게 할지 정의
with DAG(
    dag_id='example_dag', # DAG 식별자용 아이디
    description='DAG 설명'
    schedule='0 * * * *',
    start_date=datetime(2025, 1, 1),
    depends_on_past=False, # 이전 태스크의 실행 결과(성공, 실패)에 의존할지, 상관없이 작업할지 
    catchup=False, # start date부터 현재까지의 미실행된 DAG를 실행할지 결정
    tags=['example'] # 검색을 위한 태그
) as dag:
    
    # 태스크 정의
    # Operator 클래스를 사용해서 정의. 오퍼레이터마다 인자가 다름
    task1 = PythonOperator(
        task_id='print_hello',
        python_callable=print_hello
    )
    
    task2 = BashOperator(
        task_id='print_date',
        bash_command='date',
        retries=3 # 실패시 재 시도
    )
    
    task3 = PythonOperator(
        task_id='print_world',
        python_callable=print_world # 함수
    )
    
    # 태스크 순서 정의
    task1 >> task2 >> task3

실무에서 구현 방법

  1. Managed Airflow
    클라우드에서 airflow를 띄우고 DAG만 전달해서 실행. 간단한 방식으로 실행되지만 비용이 높다.
    인프라 관리가 힘든 초기에 하기 좋음.
    ex) aws의 MWAA, GCP의 Cloud Composer

  2. VM + Docker compose
    VM 위에서 Docker compose로 airflow를 배포
    하나의 VM만 쓰기 때문에 생각보다 단순하다.
    근데 도커 컨테이너 별로 환경이 달라 관리 포인트가 늘어난다.

  3. Kubernetes + Helm
    Kubernetes환경에서 Helm차트로 airflow를 배포
    필요에 따라 VM 수를 자유롭게 늘릴 수 있음
    난이도가 있고 운영과 구축이 어렵지만 자유도가 높다.


Docker로 실행 방법

  1. bitnami나 apache의 airflow 버전들 중 하나를 선택해서 docker-compose.yaml 파일을 찾아서 세부 옵션을 조정한다.

  2. (Windows의 경우 wsl로) docker-compose up을 실행하면 localhost:8080에 airflow가 실행되어있다.

  3. 기본 웹 ui 로그인은 airflow airflow로 하면 된다.


Slack 연결

  1. slack api apps에 들어가서 Incoming Webhooks에서 add new webhook to workspace에서 새 webhook 발급.

  2. Airflow 웹 UI에서 Admin-Connections에 들어가서 add a new record에

    Connection ID : 알아서 지정
    Connection Type : HTTP
    Host : https://hooks.slack.com/services/
    Password : T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX 값 전체

  3. dags/utils 폴더 생성하고 __init__.pyslack_norifier.py 생성 후 정의

  4. DAG 코드 정의

default_args = {
    "owner": "choonb",
    "depends_on_past": False,
    "start_date": datetime(2024, 1, 1),
    "end_date": datetime(2025, 1, 1)
}

def _handle_job_error() -> None:
    raise AirflowFailException("Raise Exception.")

with DAG(
    dag_id="dag_with_slack",
    default_args=default_args,
    schedule_interval="0 12 * * * ", # UTC 기준이라 한국은 +9해서 생각
    tags=["slack"],
    catchup=True,
    on_failure_callback=task_fail_slack_alert
) as dag:
    
    send_slack_notification = PythonOperator(
        task_id="raise_exception_and_send_slack_notification",
        python_callable=_handle_job_error
    )

    send_slack_notification
profile
Backend Dev / Data Engineer

0개의 댓글