Airflow

-·2022년 1월 18일
1

강의정리 - MLOps

목록 보기
16/18

목차

  • Apache Airflow 소개
    -- Batch Process란?
    -- Batch Process - Airflow 등장 전
    -- Airflow 소개

  • Apache Airflow 실습하며 배워보기
    -- 설치하고 실행하기
    -- DAG 작성하기
    -- 유용한 Operator 간단 소개

  • Apache Airflow 아키텍처와 활용방안
    -- 기본 아키텍처
    -- Airflow 실제 활용 사례
    -- MLOps 관점의 Airflow

Apache Airflow 소개

Batch Process란?

위키피디아 
"Computerized batch processing is the running of "jobs that can run without end user interaction, or can be scheduled to run as resources permit"
  • 예약된 시간에 실행되는 프로세스
  • 일회성(1회)도 가능하고, 주기적인 실행도 가능
    -- 이번 주 일요일 07:00에 1번 실행되는 프로세스
    -- 매주 일요일 07:00에 실행되는 프로세스
지금은 은행 점검 시간입니다 같은건가봐요 !

Batch Process를 AI엔지니어가 알아야 하는 이유

  • 모델을 주기적으로 학습시키는 경우 사용(Continuous Training)
  • 주기적인 Batch Serving을 하는 경우 사용
  • 그 외 개발에서 필요한 배치성 작업

Batch Process - Airflow 등장 전

대표적인 Batch Process 구축 방법: Linux Crontab

MyProject/
	train.py	#모델 학습을 진행하고 모델을 저장
	predict.py	# 저장된 모델을 불러오고, 인풋 값을 받아 모델 출력값을 별도로 저장
  • 서버에서 crontab -e 입력
  • 실행된 에디터에서 0****predict.py 입력
  • (0****은 크론탭 표현으로 매 시 0분에 실행하는 것을 의미)
  • OS에 의해 매 시 0분에 predict.py가 실행
  • Linux는 일반적인 서버 환경이고, Crontab은 기본적으로 설치되어 있기 때문에 매우 간편
  • 간단하게 Batch Process를 시작하기에 Crontab은 좋은 선택

크론 표현식

  • Batch Process의 스케줄링을 정의한 표현식

  • 이 표현식은 다른 Batch Process도구에서도 자주 사용 됨

    크론 표현식 예시

  • 매번 표현식을 암기할 필요는 없지만, 읽을 정도만 인지하면 좋음

  • 크론 표현식 제너레이터 사이트가 많이 있으니, 활용
    -- 크론 표현식 제너레이터 사이트 (시간->크론표현식)

  • cron 표현식이 어렵다면 다음 사이트에서 확인 가능
    -- 크론탭 구루(크론표현식->시간)

Linux Crontab의 문제

  • 재실행 및 알람
    -- 파일을 실행하다 오류가 발생한 경우, 크론탭이 별도의 처리를 하지 않음
    -- 예) 매주 일요일 07:00에 predict.py를 실행하다가 에러가 발생한 경우, 알람을 별도로 받지 못함
  • 실패할 경우, 자동으로 몇 번 더 재실행(Retry)하고, 그래도 실패하면 실패했다는 알람을 받으면 좋음
  • 과거 실행 이력 및 실행 로그를 보기 어려움
  • 여러 파일을 실행하거나, 복잡한 파이프라인을 만들기 어려움

-> Crontab은 간단히 사용할 수는 있지만, 실패 시 재실행, 실행 로그 확인, 알람 등의 기능은 제공하지 않음

--> 좀 더 정교한 스케줄링 및 워크플로우 도구가 필요함

다양한 도구
스케줄링 워크플로우 전용 도구의 등장

Airflow 소개

현재 스케줄링, 워크플로우 도구의 표준

  • 에어비앤비(Airbnb)에서 개발
  • 현재 릴리즈된 버전은 2.2.0으로, 업데이트 주기가 빠름
  • 스케줄링 도구로 무거울 수 있지만, 거의 모든 기능을 제공하고, 확장성이 넓어 일반적으로 스케줄링과 파이프라인 작성 도구로 많이 사용
  • 특히 데이터 엔지니어링 팀에서 많이 사용

Airflow가 제공하는 기능

  • 파이썬을 사용해 스케줄링 및 파이프라인 작성 !
  • 스케줄링 및 파이프라인 목록을 볼 수 있는 웹 UI제공 !
  • 실패 시 알람
  • 실패 시 재실행 시도
  • 동시 실행 워커 수
  • 설정 및 변수값 분리

Apache Airflow 실습하며 배워보기

설치하고 실행하기

pip install pip --upgrade apache-airflow==2.2.0
export AIRFLOW_HOME={지정가능}
# Users/Username/workspace/MLOPS

airflow db init
위와 같은 기본파일이 생성된다.
  • Airflow에서 사용할 어드민 계정 생성
airflow users create \
--username JODONG2 \
--password 1234 \
--firstname DONG2\
--lastname JO\
--role Admin\
--email {}@gmail.com

Airflow Webserver 실행

airflow webserver --port 8080

http://localhost:8080으로 접속하면 웹 UI 등장. 위에서 생성한 어드민 계정으로 로그인 해보자.

웹UI대시보드 화면이 보인다. 하지만 스케줄러 실행중이지 않다는 경고가 보임

The scheduler does not appear to be running.

The DAGs list may not update, and new tasks will not be scheduled.

-> 별도의 터미널 창을 띄워 다음처럼 Airflow Scheduler를 실행한다.

airflow scheduler
다시 웹 UI를 새로고침하면, Scheduler 관련 경고가 없어짐

-> 다른 경고로 바뀜 :)
->없어지는거 맞음 나의 실수였다. ㅎㅎ..

사라지고 새롭게 발생한 경고 !
The scheduler does not appear to be running. Last heartbeat was received 2 minutes ago.

The DAGs list may not update, and new tasks will not be scheduled.
해결한 방법
제대로 연결이 되지 않았었고, folder를 따로 만들어
AIRFLOW_HOME을 다시 설정하였고 
airflow db init부터 다시 진행하니 경고 메세지가 사라졌다.

정리

  • Airflow 설치
    -- pip install apache-airflow
  • Airflow기본 디렉토리 설정
    -- 환경변수 AIRFLOW_HOME에 사용할 기본 디렉토리 경로 설정
    -- export AIRFLOW_HOME=/abc/def/
  • Airflow DB 초기화
    -- Airflow에서 사용할 DB를 초기화
    -- airflow db init
  • Airflow 어드민 계정 생성
    -- airflow user create
  • Airflow 웹서버 실행
    -- airflow webserver
  • Airflow 스케줄러 실행
    -- airflow scheduler

DAG 작성하기

Batch Scheduling을 위한 DAG 생성

  • Airflow에서는 스케줄링할 작업을 DAG이라고 부름
  • DAG은 Directed Acyclic Graph의 약자로, Airflow에 한정된 개념이 아닌 소프트웨어 자료구조에서 일반적으로 다루는 개념
  • DAG은 이름 그대로, 순환하지 않는 방향이 존재하는 그래프를 의미

Airflow는 Crontab처럼 단순히 하나의 파일을 실행하는 것이 아닌, 여러 작업의 조합도 가능함

  • DAG 1개 : 1개의 파이프라인
  • Task : DAG내에서 실행할 작업

하나의 DAG에 여러 Task의 조합으로 구성된다.

A,B,C,D,E,F,G는 Task

예) tutorial_etl_dag 이라는 DAG은 3가지 Task로 구성

  • extract
  • transform
  • load
    tutorial_etl_dag이라는 DAG을 실행하면 이 3가지 Task를 순차적으로 실행

Task가 꼭 순차적으로 진행하지 않게 할 수도 있음
tutorial DAG

  • print_data Task 이후 sleep, templated Task 동시 실행

정리

  • Airflow는 DAG이라는 단위로 스케줄링 관리
  • 각 DAG은 Task로 구성
  • DAG 내 Task는 순차적으로 실행되거나, 동시에 (병렬로)실행할 수 있음

DAG작성하기 - hello_world

먼저 DAG을 담을 디렉토리 생성(이름은 무조건 dags)
AIRFLOW_HOME에 dags 생성했다.
AIRFLOW_HOME/dags/{}.py 파일 생성

#hello_world.py
from datetime import timedelta 

from airflow import DAG 
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator 
from airflow.operators.python import PythonOperator

def print_world() -> None:
    print("world") 

#with 구문으로 DAG 정의 시작
with DAG(
    dag_id="hello_world", # DAG 식별자용 아이디
    description = "JODONG2 first DAG", 
    start_date=days_ago(2), # DAG 정의 기준 2일전부터 실행합니다.
    schedule_interval= "0 6 * * *", # 매일 06:00에 실행합니다
    tags=["my_dags"],
)as dag:
    #테스크를 정의합니다.
    #bash command로 hello를 출력합니다
    t1 = BashOperator(
        task_id ="print_hello",
        bash_command="echo Hello",
        owner ="DONG2", # 작업의 오너, 보통 작업을 담당하는 사람을 넣음
        retries=3,
        retry_delay=timedelta(minutes=5),
    )

    #테스크를 정의합니다.
    #Python Operaotr로 world를 출력합니다.
    t2 = PythonOperator(
        task_id = "print_world",
        python_callable=print_world,
        depends_on_past= True,
        owner="DONG2",
        retries = 3,
        retry_delay = timedelta(minutes=5),
    )

    #테스트 순서를 정합니다.
    #t1실행후 t2실행
    t1 >> t2

이제 파일을 저장하고, 웹 UI를 확인해보면 새로 생성한 DAG이 보임

airflow.cfg의 dags_folder 경로를 잘 확인해보자 !

조금 기다리면 실행된 결과를 볼 수 있음

  • 세로 한 줄이 하나의 실행을 의미
    -- 맨위의 원이 하나의 DAG 실행을 의미하며, 하나의 실행을 DAG Run이라고 부름
    -- DAG 스케줄링 시작 날짜를 2일전으로 설정해서, 두개의 DAG Run이 생성됨
    -- 내일 오전 6시(UTC 기준)가 지나면, 하나의 DAG Run 이 또 생길 것

  • 아래 사각형은 하나의 Task를 의미
    -- 2개의 Task를 정의했으므로, 2개의 사각형을 볼 수 있음

마우스를 올리면 간단한 정보를 볼 수 있음

사각형(Task)를 눌러서 Log를 눌러보면 의도한대로 echo Hello, print('World') 실행된 것을 확인할 수 있음

만약 특정 DAG Run의 기록을 지우고 다시 실행시키고 싶으면 동그라미(DAG)을 누르고 Clear를 실행

DAG작성하기 - 정리

  • AIRFLOW_HOME으로 지정된 디렉토리에 dags 디렉토리를 생성하고 이 안에 DAG파일을 작성
  • DAG은 파이썬 파일로 작성. 보통 하나의 .py파일에 하나의 DAG을 저장
  • DAG 파일은 크게 다음으로 구성
    -- DAG 정의 부분
    -- Task 정의 부분
    -- Task 간 순서 정의 부분
  • DAG 파일을 저장하면, Airflow 웹 UI에서 확인할 수 있음
  • Airflow 웹 UI에서 해당 DAG을 ON으로 변경하면 DAG이 스케줄링되어 실행
  • DAG 세부 페이지에서 실행된 DAG Run의 결과를 볼 수 있음

유용한 Operator 간단 소개

Airflow에서는 다양한 Operator를 제공. 이 중 자주 사용하는 Operator를 소개 !

airflow.operators!

PythonOperator

  • 파이썬 함수를 실행
    -- 함수뿐 아니라, Callable한 객체를 파라미터로 넘겨 실행할 수 있음

  • 실행할 파이썬 로직을 함수로 생성한 후, Python Operator로 실행

from airflow.operators.python import PythonOperator
~
t2 = PythonOperator(
        task_id = "print_world",
        python_callable= print_world, #위에서 정의한 함수
        depends_on_past= True,
        owner="DONG2",
        retries = 3,
        retry_delay = timedelta(minutes=5),
    )
~

BashOperator

  • Bash 커맨드를 실행
  • 실행해야 할 프로세스가 파이썬이 아닌 경우에도 BashOperator로 실행 가능
    -- ex. shell 스크립트, scala파일 등
from airflow.operators.bash import BashOperator 
~
t1 = BashOperator(
        task_id ="print_hello",
        bash_command="echo Hello",
        owner ="DONG2", # 작업의 오너, 보통 작업을 담당하는 사람을 넣음
        retries=3,
        retry_delay=timedelta(minutes=5),
    )
~

DummyOperator

  • 아무것도 실행하지 않음
  • DAG내에서 Task를 구성할 때, 여러개의 Task의 SUCCESS를 기다려야 하는 복잡한 Task 구성에서 사용
from airflow.operators.dummy import DummyOperaotr
"""
Bases: airflow.models.BaseOperator

Operator that does literally nothing. It can be used to group tasks in a DAG.

The task is evaluated by the scheduler but never processed by the executor.
"""
B,C,D 작업이 모두 SUCCESS하면 E 실행, E가 SUCCESS된 후, F가 실행, E는 사실상 아무 작업도 하지 않고, 작업을 모아두는 역할. A>>B B>>C B>>E G>>D B>>D C>>E D>>E E>>F

SimpleHttpOperator

  • 특정 호스트로 HTTP 요청을 보내고 Response를 반환
  • 파이썬 함수에서 requests모듈을 사용한 뒤 PythonOperator로 실행시켜도 무방
  • 다만 이런 기능이 Airflow Operaotr에 이미 존재하는 것을 알면 좋음
from airflow.prividers.http.operators.http import SimpleHttpOperator

이 외에도

  • BranchOperator
  • DockerOperator
  • KuberntesOperator
  • CustomOperator(직접 Operator 구현)
  • 등등

클라우드의 기능을 추상화한 Operator도 존재(AWS,GCP 등) - Provider Packages
클라우드의 기능을 추상화
!!Tip!!

  • 외부 Third Party와 연동해 사용하는 Operator의 경우(docker, aws, gcp 등) Airflow 설치시 다음처럼 extra package를 설치해야함
pip install "apache-airflow[aws]"

다루지 않은 내용
Airflow DAG을 더 풍부하게 작성할 수 있는 방법으로 다음 내용

  • Variable : Airflow Console에서 변수(Variable)를 저장해 Airflow DAG에서 활용
  • Connection & Hooks : 연결하기 위한 설정(MySQL, GCP등)
  • Sensor : 외부 이벤트를 기다리며 특정 조건이 만족하면 실행
  • Marker
  • XComs : Task끼리 결과를 주고받고 싶은 경우 사용

Apache Airflow 아키텍처와 활용방안

기본 아키텍처

DAG Directory
DAG 파일들을 저장

  • 기본경로는 $AIRFLOW_HOME/dags
  • DAG_FOLDER라고도 부르며, 이 폴더 내부에서 폴더 구조를 어떻게 두어도 상관없음
  • Scheduler에 의해 .py 파일은 모두 탐색되고 DAG이 파싱

Scheduler
Scheduler는 각종 메타 정보의 기록을 담당

  • DAG Directory 내 .py 파일에서 DAG을 파싱하여 DB에 저장
  • DAG들의 스케줄링 관리 및 담당
  • 실행 진행 상황과 결과를 DB에 저장
  • Executor를 통해 실제로 스케줄링된 DAG을 실행
  • Airflow에서 가장 중요한 컴포넌트

Executor

  • Executor는 스케줄링된 DAG을 실행하는 객체로, 크게 2종류로 나뉨
    -- Local Executor
    -- Remote Executor
    Local Executor

  • Local Executor는 DAG Run을 프로세스 단위로 실행하며 다음처럼 나뉨

  • 하나의 DAG Run을 하나의 프로세스로 띄워서 실행

  • 최대로 생성할 프로세스 수를 정해야 함

  • Airflow를 간단하게 운영할 때 적합

  • Sequential Executor
    -- 하나의 프로세스에서 모든 DAG Run들을 처리
    -- Airflow 기본 executor로, 별도 설정이 없으면 이 Executor를 사용
    -- Airflow를 잠시 운영할 때 적합

Remote Excutor
DAG Run을 외부 프로세스로 실행

  • Celery Executor
    -- DAG Run을 Celery Worker Process로 실행
    -- 보통 Redis를 중간에 두고 같이 사용
    -- Local Executor를 사용하다가 Airflow운영 규모가 좀 더 커지면 Celery Executor로 전환

  • Kubernetes Executor
    -- 쿠버네티스 상에서 Airflow를 운영할 때 사용
    -- DAG Run 하나가 하나의 Pod(쿠버네티스의 컨테이너 같은 개념)
    -- Airflow 운영 규모가 큰 팀에서 사용

라인 블로그에 잘 정리되어 있음 !!
라인 블로그

Workers
DAG을 실제로 실행

  • Scheduler에 의해 생기고 실행
  • Executor에 따라 워커의 형태가 다름
    -- Celery 혹은 Local Executor인 경우, Worker는 프로세스
    -- Kubernetes Executor인 경우, Worker는 pod
  • DAG Run을 실행하는 과정에서 생긴 로그를 저장

Metadata Database
메타 정보를 저장

  • Scheduler에 의해 Metadata가 쌓임
  • 보통 MySQL이나 Postgres를 사용
  • 파싱한 DAG 정보, DAG Run 상태와 실행 내용, Task 정보 등을 저장
  • User와 Role(RBAC)에 대한 정보 저장
  • Scheduler와 더불어 핵심 컴포넌트
    -- 트러블 슈팅 시, 디버깅을 위해 직접 DB에 연결해 데이터를 확인하기도 함
  • 실제 운영환경에서는 GCP Cloud SQL이나, AWS Aurora DB등 외부 DB 인스턴스를 사용

Webserver
WEB UI를 담당

  • Metadata DB와 통신하며 유저에게 필요한 메타 데이터를 웹 브라우저에 보여주고 시각화
  • 보통 Airflow 사용자들은 이 웹서버를 이용하여 DAG을 ON/OFF하며, 현상황을 파악
  • REST API도 제공하므로, 꼭 WEB UI를 통해서 통신하지 않아도 괜찮음
  • 웹서버가 당장 작동하지 않아도, Airflow에 큰 장애가 발생하지 않음 (반면 Scheduler의 작동여부는 매우 중요)

Airflow 실제 활용 사례

Airflow를 구축하는 방법으로 보통 3가지 방법을 사용

1. Managed Airflow(GCP Composer, AWS MWAA)

2. VM + Docker compose

3. Kubernetes + Helm

1. Managed Airflow
Managed Airflow은 클라우드 서비스 형태로 Airflow를 사용하는 방법

보통 별도의 데이터 엔지니어가 없고, 분석가로 이루어진 데이터 팀의 초기에 활용하기 좋음

Managed Airflow의 장단점

장점

  • 설치와 구축을 클릭 몇번으로 클라우드 서비스가 다 진행
  • 유저는 DAG 파일을 스토리지(파일 업로드)형태로 관리

단점

  • 비용
  • 자유도가 적음. 클라우드에서 기능을 제공하지 않으면 불가능한 제약이 많음

2. VM + Docker Compose
참고

VM + Docker compose는 직접 VM위에서 Docker compose로 Airflow를 배포하는 방법
Airflow 구축에 필요한 컴포넌트(Scheduler, Webserver, Database 등)를 Docker container 형태로 배포
예시)

쏘카 참고

VM + Docker compose 방법의 장단점

장점

  • Managed Service 보다는 살짝 복잡하지만, 어려운 난이도는 아님
    -- (Docker와 Docker compose에 익숙한 사람이라면 금방 익힐 수 있음)
    -- 하나의 VM만을 사용하기 때문에 단순

단점

  • 각 도커 컨테이너 별로 환경이 다르므로, 관리 포인트가 늘어남
  • 예를들어, 특정 컨테이너가 갑자기 죽을수도 있고, 특정 컨테이너에 라이브러리를 설치했다면, 나머지 컨테이너에도 하나씩 설치해야함.

Kubernetes + Helm
Kubernetes+ Helm은 Kubernetes 환경에서 Helm 차트로 Airflow를 배포하는 방법

  • Kubernetes는 여러개의 vm을 동적으로 운영하는 일종의 분산환경으로, 리소스 사용이 매우 유연한게 대표적인 특징(필요에 따라 VM수를 알아서 늘려주고 줄여줌)
  • 이런 특징 덕분에, 특정 시간에 배치 프로세스를 실행시키는 Airflow와 궁합이 매우 잘맞음
  • Airflow DAG 수가 몇 백개로 늘어나도 노드 오토 스케일링으로 모든 프로세스를 잘 처리할 수 있음
  • 하지만 쿠버네티스 자체가 난이도가 있는만큼 구축과 운영이 어려움
  • 보통 데이터팀에 엔지니어링 팀이 존재하고, 쿠버네티스 환경인 경우에 적극 사용

쏘카 참고

MLOps 관점의 Airflow

Airflow는 데이터 엔지니어링에서 많이 사용하지만, MLOps에서도 활용할 수 있음
"주기적인 실행"이 필요한 경우

  • Batch Training: 1주일 단위로 모델 학습
  • Batch Serving(Batch Inference): 30분 단위로 인퍼런스
  • 인퍼런스 결과를 기반으로 일자별, 주차별 모델 퍼포먼스 Report 생성
  • MySQL에 저장된 메타데이터를 데이터 웨어하우스로 1시간 단위로 옮기기
  • S3, GCS 등 Object Storage
  • Feature Store를 만들기 위해 Batch ETL 실행
ETL : (Extract, Transform, Load)

Airflow 관련 추천 글 !!!!

버킷플레이스 - Airflow 도입기

라인 엔지니어링 - Airflow on Kubernetes

쏘카 데이터 그룹 - Airflow와 함께한 데이터 환경 구축기

Airflow Executors Explained

Special Mission

  1. Airflow Local에 환경 설정하기
  2. 학습을 1달 단위, 예측을 1일 단위로 하는 Airflow DAG 생성하기
  3. Airflow 관련 추천 글 읽기
profile
-

0개의 댓글