오늘 배운 내용은 배치 프로세싱과 스케줄링에 사용하는 airflow에 대해 공부했다.
SW를 예약된 시간에 자동으로 실행하는 방법. 일정 기간동안 일괄적으로 작업 수행
Batch Serving
일정 기간동안 일괄적으로 머신러닝 작업을 수행
간단한 Batch Processing은 Linux의 Crontab을 활용한다.
하지만 로그 보기도 어렵고 오류 처리를 하기 힘들고 알람도 없어서 복잡하게는 힘들다.
Cron 표현식
*은 매 ~시간마다 라는 뜻. ex) 0 * * * * 는 매 0분 즉 매 시간 정각마다
cronmaker 같은 사이트 이용하면 편하다.
이미지 출처 : bryceandy
데이터 파이프라인을 개발, 스케줄링, 모니터링하기 위한 파이썬 기반의 오픈소스 플랫폼
이미지 출처 : @alsgur
Task 간의 의존성과 실행 순서를 명확하게 표현
워크플로우 관리 도구로, 데이터 파이프라인 흐름을 파이썬 코드로 작성해 스케쥴링 및 모니터링
(웹 UI로 제공)
다양한 클라우드 서비스 및 빅데이터 도구와 연동
자동 재시도 및 알림 기능 제공
무겁기도 하지만 기능이 매우 많고 확장성이 좋아서 스케쥴링과 파이프라인 도구로 많이 쓰인다.
조건에 따라 작업 분기를 나눌 수도 있다.
이미지 출처 : apache
Directed Acyclic Graph라는 순환하지 않는 일반 그래프 자료구조.
Airflow에선 DAG로 실행할 작업들을 순서에 맞게 워크플로우를 구성한다.
방향성을 가지면서 순환하지 않는 그래프 구조. 이전 태스크가 완료되면 다음 태스크 실행
논리적 오류는 데드락으로 이어진다.
작업 유형(Task)을 정의할 때 사용. 오퍼레이터를 실행하면 Task가 된다. 클래스 형태.
기능이나 명령을 수행하는 Action Operator(bash, python, sql, email Operator 등), 시스템을 다른 시스템으로 옮기는 Transfer Operator, 조건 충족 시 다음 Task를 실행시키는 Sensor Operator가 있다.
PythonOperator
파이썬 함수를 실행. callable한 객체를 파라미터로 넘겨서 실행할 수도 있다.
미리 함수를 정의해두고 전달하면 된다.
BashOperator
Bash 커맨드를 실행
DummyOperator
아무것도 실행하지 않는 더미. 보통 다른 작업 순서를 기다려야할 때 더미를 쓴다.
SimpleHttpOperator
HTTP 요청을 보내고 Response를 반환 받음.
BranchPythonOperator
조건에 따라 실행을 제어할 수 있는 Operator
외부 서드파티(docker, aws 등)와 연동하는 operator들은 extra package 설치 후 사용 가능
워크플로우의 실행을 관리하고 오케스트레이션하는 핵심 구성 요소
DAGs를 보고 실행할 태스크를 결정하고 대기열에 배치하는 스케줄링 작업을 수행
파이썬으로 작성된 DAG 파일을 저장하는 공간
기본 경로 : $AIRFLOW_HOME/dags/
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
def print_hello():
return "Hello"
def print_world():
return "World"
# with 구문을 사용한 DAG 정의. 이름, 태그, 언제부터, 간격을 어떻게 할지 정의
with DAG(
dag_id='example_dag', # DAG 식별자용 아이디
description='DAG 설명'
schedule='0 * * * *',
start_date=datetime(2025, 1, 1),
depends_on_past=False, # 이전 태스크의 실행 결과(성공, 실패)에 의존할지, 상관없이 작업할지
catchup=False, # start date부터 현재까지의 미실행된 DAG를 실행할지 결정
tags=['example'] # 검색을 위한 태그
) as dag:
# 태스크 정의
# Operator 클래스를 사용해서 정의. 오퍼레이터마다 인자가 다름
task1 = PythonOperator(
task_id='print_hello',
python_callable=print_hello
)
task2 = BashOperator(
task_id='print_date',
bash_command='date',
retries=3 # 실패시 재 시도
)
task3 = PythonOperator(
task_id='print_world',
python_callable=print_world # 함수
)
# 태스크 순서 정의
task1 >> task2 >> task3
Managed Airflow
클라우드에서 airflow를 띄우고 DAG만 전달해서 실행. 간단한 방식으로 실행되지만 비용이 높다.
인프라 관리가 힘든 초기에 하기 좋음.
ex) aws의 MWAA, GCP의 Cloud Composer
VM + Docker compose
VM 위에서 Docker compose로 airflow를 배포
하나의 VM만 쓰기 때문에 생각보다 단순하다.
근데 도커 컨테이너 별로 환경이 달라 관리 포인트가 늘어난다.
Kubernetes + Helm
Kubernetes환경에서 Helm차트로 airflow를 배포
필요에 따라 VM 수를 자유롭게 늘릴 수 있음
난이도가 있고 운영과 구축이 어렵지만 자유도가 높다.
bitnami나 apache의 airflow 버전들 중 하나를 선택해서 docker-compose.yaml 파일을 찾아서 세부 옵션을 조정한다.
(Windows의 경우 wsl로) docker-compose up
을 실행하면 localhost:8080에 airflow가 실행되어있다.
기본 웹 ui 로그인은 airflow airflow로 하면 된다.
slack api apps에 들어가서 Incoming Webhooks에서 add new webhook to workspace에서 새 webhook 발급.
Airflow 웹 UI에서 Admin-Connections에 들어가서 add a new record에
Connection ID : 알아서 지정
Connection Type : HTTP
Host : https://hooks.slack.com/services/
Password : T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX 값 전체
dags/utils 폴더 생성하고 __init__.py
와 slack_norifier.py
생성 후 정의
DAG 코드 정의
default_args = {
"owner": "choonb",
"depends_on_past": False,
"start_date": datetime(2024, 1, 1),
"end_date": datetime(2025, 1, 1)
}
def _handle_job_error() -> None:
raise AirflowFailException("Raise Exception.")
with DAG(
dag_id="dag_with_slack",
default_args=default_args,
schedule_interval="0 12 * * * ", # UTC 기준이라 한국은 +9해서 생각
tags=["slack"],
catchup=True,
on_failure_callback=task_fail_slack_alert
) as dag:
send_slack_notification = PythonOperator(
task_id="raise_exception_and_send_slack_notification",
python_callable=_handle_job_error
)
send_slack_notification