Batch Serving은 예측 코드를 주기적으로 실행해서 예측 결과를 제공
Job Scheduler는 Apache Airflow를 주로 사용
학습과 예측을 별도 설정을 통해 수행
Jupyter Notebook, Lab 있는 코드를 스크립트로 바꿀 때 편함
-> 절차형으로 작성한 코드를 정리한 후, 주기적으로 실행!
스포티파이의 예측 알고리즘 - Discover Weekly
수요 예측 (재고 및 입고 최적화를 위해 매일 매장별 제품 수요 예측)
이미지 예측
자연어 예측
소프트웨어 프로그램을 자동으로 실행하는 방법. 예약된 시간에 자동으로 실행
Batch Processing : 일정 기간 동안 일괄적으로 작업을 수행
Batch Serving : 일정 기간동안 일괄적으로 머신러닝 예측 작업을 수행
Batch Processing이 더 큰 개념이며, Batch로 진행하는 작업에 Airflow를 사용할 수 있음
cron
은 그리스어에서 유래된 것으로 시간을 뜻함.
crontab -e
입력predict.py
입력(0 * * * 은 매 시 0분을 의미)http://www.cronmaker.com
https://crontab.guru/
Crontab은 간단히 사용할 수는 있지만, 실패 시 재실행, 실행 로그 확인, 알림 등의 기능은 제공하지 않음
=> 좀 더 정교한 스케줄링 및 워크플로우 도구가 필요함
Linux Crontab 문제를 해결하기 위해 다양한 스케줄링/워크플로우 전용 도구의 등장
Airflow 등장 후, 스케줄링 및 워크플로우 도구의 표준
파이썬을 사용해 스케줄링 및 파이프라인 작성
DAG는 Directed Acyclic Graph의 약자
DAG의 기능
DAG는 Apache Airflow에서 워크플로우를 정의하는 핵심 구조입니다.
DAG는 작업(Task)들 간의 실행 순서를 정의하며, 이러한 작업은 각 노드(Node)로 표현됩니다.
워크플로우 전체를 표현하고 스케줄링 및 실행을 자동으로 관리합니다.
스케줄링 및 파이프라인 목록을 볼 수 있는 웹 UI 제공
스케줄링 및 파이프라인 목록을 볼 수 있는 웹 UI 제공
특정 조건에 따라 작업을 분기할 수도 있음(Branch 사용)
1) DAGs (Directed Acyclic Graphs)
2) Operator
3) Scheduler
4) Executor
DAG Directory
DAG 파일들을 저장
$AIRFLOW_HOME/dags
.py
파일은 모두 탐색되고 DAG이 파싱Operator
Batch Scheduling을 위한 DAG 생성
ex1) tutorial_etl_dag라는 DAG은 3가지 Task로 구성
tutorial_etl_dag라는 DAG을 실행하면 이 3가지 Task(extract,transform
,load)을 순차적으로 실행
ex2) 병렬적으로 실행
Apache는 apache재단에서 만들었기 때문에 앞에 apache라고 붙고 '강인함'이라는 인디언부족에서 유래함.
python -m venv .venv
source .venv/bin/activate
# pip3 설치된 pip을 최신 버전으로 업그레이드
pip3 install pip --upgrade
# Airflow 설치할 버전을 환경 변수로 지정
AIRFLOW_VERSION=2.6.3
# 현재 설치된 Python 버전을 확인하고, 주 버전과 부 버전만 추출하여 환경 변수로 저장
# 파이썬 python 3.10.2 -> 3.10.2 -> 3.10 으로 추출하는 과정.
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# Apache Airflow 설치 시 사용할 제약 조건(constraint) 파일의 URL 설정
# 제약 조건 파일은 특정 버전의 Python과 Airflow에 맞는 종속성을 명시
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# Apache Airflow를 지정한 버전으로 설치하며, 제약 조건 파일을 참조하여 올바른 종속성을 설치
pip3 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
export AIRFLOW_HOME=`pwd`
echo $AIRFLOW_HOME
airflow db init
DB를 초기화하면 기본 파일이 생성
airflow.cfg
: Airflow 설정 파일
airflow.cfg 파일 : Airflow 각종 설정이 존재하며, load_examples = True를 주면 예제 파일이 포함됨
airflow.db
: Airflow DB(여기선 SQLite 사용)
airflow users create \
--username admin \
--password '0123' \
--firstname lee \
--lastname jaegun \
--role Admin \
--email leejken530@naver.com
airflow webserver --port 8080
별도의 터미널 창을 띄워 다음처럼 Airflow Scheduler를 실행
(Webserver를 실행한 상황에서 Scheduler를 또 실행)
export 항상 해주고 실행
#!/bin/bash
# 1. 프로젝트 디렉토리로 이동
cd boostcamp
# 2. 가상 환경 활성화
source .venv/bin/activate
# 3. Airflow 작업 디렉토리 설정
export AIRFLOW_HOME=$(pwd)
# 4. Airflow 스케줄러 실행
airflow scheduler
[2024-01-29 12:01:09 +0900][66714] [ERROR] Connection in use: ('::', 8793)
8793 port에 연결할 수 없음 = port의 process id를 찾아서 Kill
kill $(lsof -t -i:8793
pwd
새 터미널을 띄우고 AIRFLOW_HOME에서 DAG을 저장할 디렉토리 생성(이름은 무조건 dags)
mkdir dags
dags 폴더안데 hello_world.py
생성
Airflow DAG 파일은 크게 3가지 파트로 나뉨
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
# PythonOperator에서 호출할 함수 정의
def print_world() -> None:
print("world")
# with 구문으로 DAG 정의 시작
with DAG(
dag_id="hello_world", # DAG의 식별자용 아이디
description="My First DAG", # DAG 설명
start_date=days_ago(2), # DAG 정의 기준 2일 전부터 시작
schedule_interval="0 6 * * *", # 매일 06:00에 실행
tags=["my_dags"], # 태그 설정
) as dag:
# 테스크 1: bash 커맨드로 echo Hello 실행
t1 = BashOperator(
task_id="print_hello", # 테스크 ID
bash_command="echo Hello", # 실행할 bash 명령어
owner="heumsi", # 테스크 오너 (담당자)
retries=3, # 실패 시 3번 재시도
retry_delay=timedelta(minutes=5), # 재시도 간격은 5분
)
# 테스크 2: Python 함수 print_world 실행
t2 = PythonOperator(
task_id="print_world", # 테스크 ID
python_callable=print_world, # 실행할 Python 함수
depends_on_past=True, # 이전 테스크의 성공 여부에 따라 실행
owner="heumsi", # 테스크 오너
retries=3, # 실패 시 3번 재시도
retry_delay=timedelta(minutes=5), # 재시도 간격은 5분
)
# 테스크 순서 정의: t1 실행 후 t2 실행
t1 >> t2
catchup
True : DAG에서 정의한 start_date부터 현재까지 미실행된 모든 스케줄에 대해 DAG을 실행-> 과거 데이터를 처리할 필요 있을 때 유용
False : DAG에서 정의한 start_date와 상관없이 앞으로 실행될 DAG을 실행
depends_on_past
True : 이전 DAG이 성공으로 완료되어야 이후의 DAG이 실행
False : 이전 DAG의 성공 여부 상관없이 스케줄이 되면 DAG이 실행
-> Executor에 따라 여러가지 작업을 한번에 실행할 수도 있음
hello_world.py을 저장한 후, 웹 UI 확인해 보면 DAG 생성됨
예제 DAG 파일도 보임
Filter DAGs by tag -> my_dags를 검색
토글을 클릭해서 DAG 상태를 ON으로 만들면 우측에 연두색 불이 들어오기 시작(찰나의 시간)
이제 DAG의 이름을 클릭
Airflow에서는 다양한 Operator를 제공. 이 중 자주 사용하는 Operator를 소개
Airflow Operator는 Airflow로 워크플로우를 구성할 때 "한 개의 실제 실행 단위를 정의하는 핵심 요소"
작업 B,C,D 모두 Success 하면 F실행.
-> 이처럼 E는 사실상 아무런 작업을 하지않고, 작업을 모아두는 역할을 함.
특정 호스트로 HTTP 요청을 보내고 Response를 반환
Airflow Slack Provider 설치
pip3 install 'apache-airflow-providers-slack[http]'==8.6.0
(1) Slack API Key 발급 : Slack API Apps에서 Create APP 클릭
From scratch -> App Name 지정하고 설치할 Slack Workspace 지정
(2) 생성된 URL 저장 (https://hooks.slack.com/services/T*****/B*****/*********)
(3) Webserver 통해 [Admin] - [Connections] - [Add a new record] 경로로 Connection 생성
(4) Task가 실패할 때 알림 코드 작성
dags/utils 폴더 생성 후, __init__.py
, slack_notifier.py
생성
utils 폴더 안에서 작성한 파일을 import해서 사용할 수 있음
(5) DAG 코드 작성 : DAG 코드
전체적인 흐름
Scheduler는 각종 메타 정보의 기록을 담당
Executor는 스케줄링 기간이 된 DAG을
실행하는 객체. 크게 2종류로 나뉨
Scheduler - Local Executor
DAG Run을 프로세스 단위로 실행
Scheduler - Remote Executor
DAG Run을 외부 프로세스로 실행
DAG의 작업을 수행
메타 정보를 저장
WEB UI를 담당
Airflow를 구축하는 방법으로 보통 3가지 방법을 사용
-> 보통 별도의 데이터 엔지니어가 없고, 분석가로 이루어진 데이터 팀의 초기에 활용하기 좋음
장점
단점
장점
단점
관련 추천 글
1. 버킷플레이스 - Airflow 도입기