Apache Airflow 소개
-- Batch Process란?
-- Batch Process - Airflow 등장 전
-- Airflow 소개
Apache Airflow 실습하며 배워보기
-- 설치하고 실행하기
-- DAG 작성하기
-- 유용한 Operator 간단 소개
Apache Airflow 아키텍처와 활용방안
-- 기본 아키텍처
-- Airflow 실제 활용 사례
-- MLOps 관점의 Airflow
위키피디아
"Computerized batch processing is the running of "jobs that can run without end user interaction, or can be scheduled to run as resources permit"
지금은 은행 점검 시간입니다 같은건가봐요 !
Batch Process를 AI엔지니어가 알아야 하는 이유
대표적인 Batch Process 구축 방법: Linux Crontab
MyProject/
train.py #모델 학습을 진행하고 모델을 저장
predict.py # 저장된 모델을 불러오고, 인풋 값을 받아 모델 출력값을 별도로 저장
크론 표현식
Batch Process의 스케줄링을 정의한 표현식
이 표현식은 다른 Batch Process도구에서도 자주 사용 됨
크론 표현식 예시
매번 표현식을 암기할 필요는 없지만, 읽을 정도만 인지하면 좋음
크론 표현식 제너레이터 사이트가 많이 있으니, 활용
-- 크론 표현식 제너레이터 사이트 (시간->크론표현식)
cron 표현식이 어렵다면 다음 사이트에서 확인 가능
-- 크론탭 구루(크론표현식->시간)
Linux Crontab의 문제
-> Crontab은 간단히 사용할 수는 있지만, 실패 시 재실행, 실행 로그 확인, 알람 등의 기능은 제공하지 않음
--> 좀 더 정교한 스케줄링 및 워크플로우 도구가 필요함
다양한 도구
스케줄링 워크플로우 전용 도구의 등장
현재 스케줄링, 워크플로우 도구의 표준
Airflow가 제공하는 기능
pip install pip --upgrade apache-airflow==2.2.0
export AIRFLOW_HOME={지정가능}
# Users/Username/workspace/MLOPS
airflow db init
위와 같은 기본파일이 생성된다.
airflow users create \
--username JODONG2 \
--password 1234 \
--firstname DONG2\
--lastname JO\
--role Admin\
--email {}@gmail.com
Airflow Webserver 실행
airflow webserver --port 8080
http://localhost:8080으로 접속하면 웹 UI 등장. 위에서 생성한 어드민 계정으로 로그인 해보자.
웹UI대시보드 화면이 보인다. 하지만 스케줄러 실행중이지 않다는 경고가 보임
The scheduler does not appear to be running.
The DAGs list may not update, and new tasks will not be scheduled.
-> 별도의 터미널 창을 띄워 다음처럼 Airflow Scheduler를 실행한다.
airflow scheduler
다시 웹 UI를 새로고침하면, Scheduler 관련 경고가 없어짐
-> 다른 경고로 바뀜 :)
->없어지는거 맞음 나의 실수였다. ㅎㅎ..
사라지고 새롭게 발생한 경고 !
The scheduler does not appear to be running. Last heartbeat was received 2 minutes ago.
The DAGs list may not update, and new tasks will not be scheduled.
해결한 방법
제대로 연결이 되지 않았었고, folder를 따로 만들어
AIRFLOW_HOME을 다시 설정하였고
airflow db init부터 다시 진행하니 경고 메세지가 사라졌다.
정리
Batch Scheduling을 위한 DAG 생성
Airflow는 Crontab처럼 단순히 하나의 파일을 실행하는 것이 아닌, 여러 작업의 조합도 가능함
하나의 DAG에 여러 Task의 조합으로 구성된다.
A,B,C,D,E,F,G는 Task
예) tutorial_etl_dag 이라는 DAG은 3가지 Task로 구성
Task가 꼭 순차적으로 진행하지 않게 할 수도 있음
tutorial DAG
먼저 DAG을 담을 디렉토리 생성(이름은 무조건 dags)
AIRFLOW_HOME에 dags 생성했다.
AIRFLOW_HOME/dags/{}.py 파일 생성
#hello_world.py
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
def print_world() -> None:
print("world")
#with 구문으로 DAG 정의 시작
with DAG(
dag_id="hello_world", # DAG 식별자용 아이디
description = "JODONG2 first DAG",
start_date=days_ago(2), # DAG 정의 기준 2일전부터 실행합니다.
schedule_interval= "0 6 * * *", # 매일 06:00에 실행합니다
tags=["my_dags"],
)as dag:
#테스크를 정의합니다.
#bash command로 hello를 출력합니다
t1 = BashOperator(
task_id ="print_hello",
bash_command="echo Hello",
owner ="DONG2", # 작업의 오너, 보통 작업을 담당하는 사람을 넣음
retries=3,
retry_delay=timedelta(minutes=5),
)
#테스크를 정의합니다.
#Python Operaotr로 world를 출력합니다.
t2 = PythonOperator(
task_id = "print_world",
python_callable=print_world,
depends_on_past= True,
owner="DONG2",
retries = 3,
retry_delay = timedelta(minutes=5),
)
#테스트 순서를 정합니다.
#t1실행후 t2실행
t1 >> t2
이제 파일을 저장하고, 웹 UI를 확인해보면 새로 생성한 DAG이 보임
airflow.cfg의 dags_folder 경로를 잘 확인해보자 !
조금 기다리면 실행된 결과를 볼 수 있음
세로 한 줄이 하나의 실행을 의미
-- 맨위의 원이 하나의 DAG 실행을 의미하며, 하나의 실행을 DAG Run이라고 부름
-- DAG 스케줄링 시작 날짜를 2일전으로 설정해서, 두개의 DAG Run이 생성됨
-- 내일 오전 6시(UTC 기준)가 지나면, 하나의 DAG Run 이 또 생길 것
아래 사각형은 하나의 Task를 의미
-- 2개의 Task를 정의했으므로, 2개의 사각형을 볼 수 있음
마우스를 올리면 간단한 정보를 볼 수 있음
사각형(Task)를 눌러서 Log를 눌러보면 의도한대로 echo Hello, print('World') 실행된 것을 확인할 수 있음
만약 특정 DAG Run의 기록을 지우고 다시 실행시키고 싶으면 동그라미(DAG)을 누르고 Clear를 실행
PythonOperator
파이썬 함수를 실행
-- 함수뿐 아니라, Callable한 객체를 파라미터로 넘겨 실행할 수 있음
실행할 파이썬 로직을 함수로 생성한 후, Python Operator로 실행
from airflow.operators.python import PythonOperator
~
t2 = PythonOperator(
task_id = "print_world",
python_callable= print_world, #위에서 정의한 함수
depends_on_past= True,
owner="DONG2",
retries = 3,
retry_delay = timedelta(minutes=5),
)
~
BashOperator
from airflow.operators.bash import BashOperator
~
t1 = BashOperator(
task_id ="print_hello",
bash_command="echo Hello",
owner ="DONG2", # 작업의 오너, 보통 작업을 담당하는 사람을 넣음
retries=3,
retry_delay=timedelta(minutes=5),
)
~
DummyOperator
from airflow.operators.dummy import DummyOperaotr
"""
Bases: airflow.models.BaseOperator
Operator that does literally nothing. It can be used to group tasks in a DAG.
The task is evaluated by the scheduler but never processed by the executor.
"""
B,C,D 작업이 모두 SUCCESS하면 E 실행, E가 SUCCESS된 후, F가 실행,
E는 사실상 아무 작업도 하지 않고, 작업을 모아두는 역할.
A>>B
B>>C
B>>E
G>>D
B>>D
C>>E
D>>E
E>>F
SimpleHttpOperator
from airflow.prividers.http.operators.http import SimpleHttpOperator
이 외에도
클라우드의 기능을 추상화한 Operator도 존재(AWS,GCP 등) - Provider Packages
클라우드의 기능을 추상화
!!Tip!!
pip install "apache-airflow[aws]"
다루지 않은 내용
Airflow DAG을 더 풍부하게 작성할 수 있는 방법으로 다음 내용
DAG Directory
DAG 파일들을 저장
Scheduler
Scheduler는 각종 메타 정보의 기록을 담당
Executor
Executor는 스케줄링된 DAG을 실행하는 객체로, 크게 2종류로 나뉨
-- Local Executor
-- Remote Executor
Local Executor
Local Executor는 DAG Run을 프로세스 단위로 실행하며 다음처럼 나뉨
하나의 DAG Run을 하나의 프로세스로 띄워서 실행
최대로 생성할 프로세스 수를 정해야 함
Airflow를 간단하게 운영할 때 적합
Sequential Executor
-- 하나의 프로세스에서 모든 DAG Run들을 처리
-- Airflow 기본 executor로, 별도 설정이 없으면 이 Executor를 사용
-- Airflow를 잠시 운영할 때 적합
Remote Excutor
DAG Run을 외부 프로세스로 실행
Celery Executor
-- DAG Run을 Celery Worker Process로 실행
-- 보통 Redis를 중간에 두고 같이 사용
-- Local Executor를 사용하다가 Airflow운영 규모가 좀 더 커지면 Celery Executor로 전환
Kubernetes Executor
-- 쿠버네티스 상에서 Airflow를 운영할 때 사용
-- DAG Run 하나가 하나의 Pod(쿠버네티스의 컨테이너 같은 개념)
-- Airflow 운영 규모가 큰 팀에서 사용
라인 블로그에 잘 정리되어 있음 !!
라인 블로그
Workers
DAG을 실제로 실행
Metadata Database
메타 정보를 저장
Webserver
WEB UI를 담당
Airflow를 구축하는 방법으로 보통 3가지 방법을 사용
1. Managed Airflow
Managed Airflow은 클라우드 서비스 형태로 Airflow를 사용하는 방법
보통 별도의 데이터 엔지니어가 없고, 분석가로 이루어진 데이터 팀의 초기에 활용하기 좋음
Managed Airflow의 장단점
장점
단점
2. VM + Docker Compose
참고
VM + Docker compose는 직접 VM위에서 Docker compose로 Airflow를 배포하는 방법
Airflow 구축에 필요한 컴포넌트(Scheduler, Webserver, Database 등)를 Docker container 형태로 배포
예시)
VM + Docker compose 방법의 장단점
장점
단점
Kubernetes + Helm
Kubernetes+ Helm은 Kubernetes 환경에서 Helm 차트로 Airflow를 배포하는 방법
Airflow는 데이터 엔지니어링에서 많이 사용하지만, MLOps에서도 활용할 수 있음
"주기적인 실행"이 필요한 경우
ETL : (Extract, Transform, Load)
라인 엔지니어링 - Airflow on Kubernetes
쏘카 데이터 그룹 - Airflow와 함께한 데이터 환경 구축기