Batch Process
예약된 시간에 실행되는 프로세스
일회성도 가능하고, 주기적인 실행도 가능
e.g., 이번 주 일요일 07시에 1번 실행되는 프로세스
매주 일요일 07시에 실행되는 프로세스
머신러닝에서, 모델을 주기적으로 학습시키는 경우 사용
그 외 개발에서 필요한 배치성 작업
Airflow 등장 전
Linux Crontab 사용
정해진 시간에 predict.py 실행
크론 표현식
batch process 스케줄링 정의한 표현식
읽을 정도만 인지하면 좋음
크론 표현식 제너레이터 사이트도 존재
파일을 실행하다 오류가 발생한 경우, 별도 처리 없이 끝냄
과거 실행 이력 및 실행 로그를 보기가 어려움
여러 파일을 실행하거나, 복잡한 파이프라인을 만들기 힘듦
간단히 사용할 수 있으나, 실패 시 재실행 / 실행 로그 확인 / 알람 등 기능이 없음
⇒ 좀 더 정교한 스케줄링 및 워크플로우 도구의 등장
파이썬을 사용해 스케줄링 및 파이프라인 작성
스케줄링 및 파이프라인 목록을 볼 수 있는 웹 UI 제공
실패 시 알람
실패 시 재실행 시도
동시 실행 워커 수
설정 및 변수 값 분리
설치하고 실행하기
Airflow 설치
pip install apache-airflow
Airflow 기본 디렉토리 설정
환경변수 AIRFLOW_HOME에 사용할 기본 디렉토리 경로 설정
export AIRFLOW_HOME=.
Airflow DB 초기화
airflow db init
Airflow 어드민 계정 생성
airflow user create
Airflow 웹 서버 실행
airflow webserver
Airflow 스케줄러 실행
airflow scheduler
Airflow에서 스케줄링 할 작업을 DAG이라고 부름
Directed Acyclic Graph의 약자로, 순환하지 않는 방향이 존재하는 그래프
DAG 1개 = 1개의 파이프라인
Task: DAG 내에서 실행할 작업
하나의 DAG에 여러 Task의 조합으로 구성
e.g., 머신러닝에서 예측을 한다 → 전처리, 학습, 예측이 각각 task, 이들이 모여 DAG
Task는 순차적으로 실행할 수도, 동시에(병렬로) 실행하게 할 수도 있음
DAG 담을 디렉토리 생성 (이름은 무조건 dags)
dags 폴더 내에 hello_world.py 생성
# hello_world.py
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
def print_world() -> None:
print("world")
# with 구문으로 DAG 정의를 시작합니다.
with DAG(
dag_id="hello_world", # DAG의 식별자용 아이디입니다.
description="My First DAG", # DAG에 대해 설명합니다.
start_date=days_ago(2), # DAG 정의 기준 2일 전부터 시작합니다.
schedule_interval="0 6 * * *", # 매일 06:00에 실행합니다.
tags=["my_dags"], # 태그 목록을 정의합니다. 추후에 DAG을 검색하는데 용이합니다.
) as dag:
# 테스크를 정의합니다.
# bash 커맨드로 echo hello 를 실행합니다.
t1 = BashOperator(
task_id="print_hello",
bash_command="echo Hello",
owner="heumsi", # 이 작업의 오너입니다. 보통 작업을 담당하는 사람 이름을 넣습니다.
retries=3, # 이 테스크가 실패한 경우, 3번 재시도 합니다.
retry_delay=timedelta(minutes=5), # 재시도하는 시간 간격은 5분입니다.
)
# 테스크를 정의합니다.
# python 함수인 print_world를 실행합니다.
t2 = PythonOperator(
task_id="print_world",
python_callable=print_world,
depends_on_past=True,
owner="heumsi",
retries=3,
retry_delay=timedelta(minutes=5),
)
# 테스크 순서를 정합니다.
# t1 실행 후 t2를 실행합니다.
t1 >> t2
with DAG
으로 DAG 정의
BashOperator, PythonOperator로 Task 정의
BashOperator
bash command 실행
bash_command 파라미터에 bash로 실행할 command 전달
PythonOperator
Python 함수 실행
python_callable 파라미터에 실행할 python 함수 전달
t1>>t2
>>
로 표시실행하고 결과 확인
Task 확인하기
두 번째 DAG Run의 두 번째 Task Log 확인
초록색 사각형 클릭 후 Log 클릭
print(”world”)
실행Clear
PythonOperator
파이썬 함수 실행
함수 뿐 아니라, callable 객체를 파라미터로 넘겨 실행 가능
실행할 파이썬 로직을 함수로 생성한 후, PythonOperator로 실행
함수에 ()
를 빼고 넘겨야 함
BashOperator
Bash 커맨드 실행
파이썬이 아닌 경우 BashOperator로 실행 가능
DummyOperator
아무것도 실행하지 않는 Operator
DAG 내에서 Task를 구성할 때, 여러 개의 Task Success를 기다려야 하는 복잡한 Task 구성에서 사용
SimpleHTTPOperator
특정 호스트로 HTTP 요청을 보내고 Response 반환
파이썬 ㅎ함수에서 requests 모듈 사용한 뒤 PythonOperator로 실행시켜도 무방
클라우드 기능을 추상화한 Operator도 존재(AWS, GCP 등) → Provider Packages
pip install apache-airflow[aws]
형태로 추가 설치 필요이 외 DockerOperator, BranchOperator, KubernetesOperator 등 존재
Airflow 기본 아키텍처
DAG 파일 저장
기본 경로는 AIRFLOW_HOME/dags
스케줄러에 의해 .py 파일은 모두 탐색되고 DAG이 파싱
각종 메타 정보 기록 담당
DAG Directory 내 .py 파일에서 DAG을 파싱해 DB에 저장
실행 진행 상황과 결과를 DB에 저장
Executor를 통해 실제로 스케줄링 된 DAG을 실행
스케줄링 된 DAG을 실행하는 객체로, 크게 2종류로 나뉨
1) Local Executor
Local Executor
하나의 DAG Run을 하나의 프로세스로 띄워서 실행
최대로 생성할 프로세스 수를 정해야 함
Airflow를 간단하게 운영할 때 적합
Sequential Executor
하나의 프로세스에서 모든 DAG Run을 처리
Airflow 기본 Executor로, 별도 설정이 없으면 이 Executor 사용
Airflow를 테스트로 잠시 운영할 때 적합
2) Remote Executor (DAG Run을 외부 프로세스로 실행)
Celery Executor
Kubernetes Executor
DAG을 실제로 실행
스케줄러에 의해 생기고 실행
Executor에 따라 worker 형태가 다름
DAG Run 실행 과정에서 생긴 로그 저장
스케줄러에 의해 쌓인 메타 정보 저장
보통 MySAQL이나 Postgres 사용
실제 운영 환경에서는 local DB는 사용하지 않고 외부 DB 인스턴스 사용
Web UI 담당
Metadata DB와 통신하며 유저에게 필요한 메타 데이터를 웹 브라우저 상에 띄우고 시각화
REST API도 제공
실제 회사에서의 구축 및 활용
Managed Airflow (GCP Composer, AWS MWAA)
VM + Docker compose
직접 VM(Virtual Machine) 위에서 Docker compose로 Airflow 배포
Airflow 구축에 필요한 컴포넌트 (Scheduler, Webserver, Database)를 Docker container 형태로 배포
Kubernetes + Helm
MLOps 관점 Airflow
주기적인 실행이 필요한 경우
Batch Training
Batch Serving
Inference 결과 기반 일자, 주차별 모델 performance report 생성
MySQL에 저장된 메타 데이터를 데이터 웨어하우스로 1시간 단위로 이동
S3, GCS 등 Object Storage
Feature Store 생성을 위한 batch ETL
※ 모든 이미지 및 코드 출처는 네이버 커넥트재단 부스트캠프 AI Tech 5기입니다. ※
정보 감사합니다.