ML 파이프라인은 하나의 워크플로우로 이루어지며, 데이터 추출, 데이터 유효성 검사, 데이터 전처리, 모델 학습, 모델 평가 및 검증 그리고 예측과 같은 단계를 작업(Task)으로 구현하고 실행할 수 있음
Airflow는 데이터 파이프라인을 구축, 스케일링 관리하는 오픈 소스 플랫폼
DAG를 사용해 작업을 정의하고, 각 작업은 하나 이상의 Task로 구성됨
Task는 노드로 표현되며, 노드는 서로 연결되어 작업 흐름을 나타냄
DAG를 사용하여 데이터 처리에 대한 작업 흐름을 정의하면 Airflow는 작업을 스케줄링하고 자동 실행
MLOps에서는 모델의 학습, 평가, 배포 등의 다양한 작업을 자동화하는 파이프라인을 구성할 때 DAG를 자주 사용함
💡Airflow의 아키텍처는 크게 세 가지 요소로 구성됨
Airflow는 다양한 플러그인과 연동할 수 있다. 예를 들어 BigQuery, Amazon S3, Spark, Hive, Presto, MySQL 등과 연동할 수 있으며, 사용자 정의 플러그인을 작성하여 다른 서비스와 연결이 가능함
Airflow 장점

파일 권한 오류
터미널에서는 mlops 폴더 아래에 pipeline 폴더가 있는 것이 확인되었지만 PyCharm 접속 시 pipeline 폴더가 확인되지 않음
PyCharm 인터프리터 설정 오류
인터프리터 create를 누르면 권한이 없다는 오류가 발생하였다.
아직도 PyCharm이 /home/ubuntu/.pycharm_helpers 경로(옛날 사용자 ubuntu 홈 디렉터리)에 있는 스크립트를 실행하려다 권한 문제가 발생하는 전형적인 상황이었다.

# Airflow needs a home. ~/airflow is the default, but you can put it
# somewhere else if you prefer (optional)
export AIRFLOW_HOME=~/airflow
# Install Airflow using the constraints file
AIRFLOW_VERSION=2.7.2
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# For example: 3.10
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.5.2/constraints-3.7.txt
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
# The Standalone command will initialise the database, make a user,
# and start all components for you.
airflow standalone
mlops-study-ml-pipeline 프로젝트 내에서 Airflow DAG를 개발하면, Airflow Web UI에서 즉시 확인해 볼 수 있도록 심볼릭 링크(바로가기)를 생성하고자 함
Airflow에서 DAG 파일을 관리하는 경로는 Airflow Home 디렉터리의 airflow.cfg 파일에 정의되어 있음
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
#
# Variable: AIRFLOW__CORE__DAGS_FOLDER
#
dags_folder = /home/mlops/airflow/dags
# Hostname by providing a path to a callable, which will resolve the hostname.
# The format is "package.function".
#
# For example, default value "airflow.utils.net.getfqdn" means that result from patched
# version of socket.getfqdn() - see https://github.com/python/cpython/issues/49254.
#
# No argument should be required in the function specified.
# If using IP address as hostname is preferred, use value ``airflow.utils.net.get_host_ip_address``
#
# Variable: AIRFLOW__CORE__HOSTNAME_CALLABLE
#
hostname_callable = airflow.utils.net.getfqdn
# A callable to check if a python file has airflow dags defined or not
PyCharm IDE에서 패키지내 DAG를 정의하거나, 참조해야 하는 Python 모듈을 추가로 개발할 수 있다. 이렇게 개발된 코드는 Airflow Home의 dags 디렉터리 안에 바로 반영될 수 있도록 심볼릭 링크를 생성할 것이다.
⇒ PyCharm에서 mlops-study-ml-pipeline 프로젝트의 features, models 그리고 support 패키지를 Terminal 내의 ~/airflow/dags 하위에 심볼릭 링크 파일을 생성해 두면 PyCharm의 코드와 바로 연결됨
심볼릭 링크 생성 명령어
mkdir -p ~/airflow/dags
ln -snf ~/Study/mlops-study-ml-pipeline/features ~/airflow/dags/features
ln -snf ~/Study/mlops-study-ml-pipeline/features ~/airflow/dags/models
ln -snf ~/Study/mlops-study-ml-pipeline/support ~/airflow/dags/support
ls -al ~/airflow/dags

mkdir -p ~/airflow/dags
ln -snf ~/Study/mlops-study-ml-pipeline/features ~/airflow/dags/features
Broken DAG Error Solution
1. DAG 파일 캐시 삭제 - Airflow가 DAG 파일 파싱할 때 사용하는 pycache 삭제하여 변경 사항 반영find ~/airflow/dags -type d -name "__pycache__" -exec rm -r {} +
- Airflow DB Upgrade
- Airflow scheduler와 webserver 재시작
airflow db upgrade airflow webserver airflow scheduler오류 해결 이후 심볼릭 링크가 잘 반영되어 DAGs에 mlops가 조회되는 것을 확인할 수 있다.
Airflow는 데이터 파이프라인을 관리하기 위한 플랫폼으로, 개발자와 데이터 엔지니어 데이터 처리, 변환 그리고 데이터 전송 등의 작업을 보다 쉽게 관리하고 자동화할 수 있도록 도와줌.
Airflow는 DAG를 스케줄링하고 실행하기 위한 다양한 기능 제공
DAG의 실행 스케줄을 설정하고, DAG의 실행 결과를 모니터링하며, 실패한 Task를 다시 실행하도록 스케줄을 조정할 수 있다.

from airflow import DAG
with DAG(dag_id='my_first_airflow_dag',
default_args={
"owner": "mlops.study", # 소유권
"depends_on_past": False, # 과거 성공 여부와 상관없이 실행
# 생략
},
description='우리가 처음 만들어 보는 Airflow Dag입니다.',
schedule="0 8 * * *",
start_date=datetime(2023, 5, 1),
catchup=False,
tags=["mlops", study"],
) as dag:
# Context Manager---------------------------------
def __enter__(self):
DagContext.push_context_managed_dag(self)
return self
def __exit__(self, _type, _value, _tb):
DagContext.pop_context_managed_dag()
# /Context Manager--------------------------------
with 문으로 DAG를 선언하면 Airflow가 DAG의 시작과 끝을 자동으로 관리해주기 때문에, 파이프라인 작성이 더 간편하고 안전함
Context Manager의 매직 메서드(언더스코어 2개로 시작;끝, 파이썬 클래스 내에 정의, 구현)
Dag를 선언하기 위해 입력한 매개변수
dag_id
default_args
description
schedule

start_date
catchup
tags
Task에서 사용할 Operator의 클래스를 import하는데, BashOpertator를 선택함
Task는 DAG를 선언한 with 절 안에 작성해야 하며 DAG는 3개의 Task를 가진다.
모두 BashOperator를 사용하여 작업을 수행하며 각 Task를 연결하여 하나의 DAG를 정의한다.

Task1은 date라는 명령어를 수행하고, 그 결과를 출력하는 기능 정의
task1= BashOperator(
task_id="print_date",
bash_command="date",
)
Task2는 sleep 5 명령어를 실행하도록 정의하였으며, 명령어가 실행되면 5초간 대기하고 종료함
task2= BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
Task3는 Jinja 템플릿을 사용해, templated_command에 작성된 셸 코드를 실행하고, 그 결과를 출력함
{% %} 블록에 코드 논리가 포함되어 있고, {{ ds }}와 같은 매개변수를 참조하며, 함수를 호출한다.
templated_command = dedent(
"""
{% for i in range(5) %}
echo "ds = {{ ds }}"
echo "macros.ds_add(ds, {{ i }}) = {{ macros.ds_add(ds, i) }}"
{% endfor %}
"""
)
task3= BashOperator(
task_id="templated",
bash_command="templated_command",
)
Airflow에서는 Jinja 템플릿으로 파이프라인에 사용할 수 있는 매개변수와 매크로 세트를 제공하며, 개발자가 자체 매개변수, 매크로 세트를 정의할 수 있도록 제공한다.
매개변수
매크로
Tasks 중 Task1이 실행되면 Task2와 Task3이 실행되는 의존성을 가지고 있다.
import pendulum
from textwrap import dedent
from datetime import datetime, timedelta
from airflow import DAG # 2.7.2ver 설치
from airflow.operators.bash import BashOperator
from support.callback_functions import success_callback, failuer_callback
# 로컬 타임존 생성
local_timezone = pendulum.timezone("Asia/Seoul")
with DAG(dag_id="my_first_airflow_dag",
default_args={
"owner": "mlops.study",
"depends_on_past": False,
"email": ["mlops.study@gmail.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"on_failure_callback": failure_callback, # 정의 함수
"on_success_callback": success_callback,
},
description='우리가 처음 만들어 보는 Airflow Dag 입니다.',
schedule="0 8 * * *",
start_date=datetime(2023, 5, 1, tzinfo=local_timezone),
catchup=False,
tags=["mlops", "study"],
) as dag:
task1 = BashOperator(
task_id="print_date",
bash_command="date",
)
task2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command = "sleep 5",
retries = 3,
)
templated_command = dedent(
"""
{% for i in range(5) %}
echo "ds = {{ ds }}"
echo "macros.ds_add(ds, {{ i }}) = {{ macros.ds_add(ds, i) }}"
{% endfor %}
"""
)
task3= BashOperator(
task_id="templated",
bash_command="templated_command",
)
task1 >> [task2, task3]
def success_callback(context):
print("success_callback!")
print(f"context: {context}")
def failure_callback(context):
print("failure_callback!")
print(f"context: {context}")Flask 호환성 문제
여러 확장 프로그램을 설치하다 보니 호환성 충돌 문제가 잦게 발생한다.
Flask 2.2.5 → airflow 2.7.2와 호환됨
Flasck-Session 0.4.0 이상이어야 호환됨
해결 후 airflow db upgrade로 migration 시켜주니 업데이트 되어있음
Grid 버튼
Graph 버튼
Graph로 Task가 표시된다.
방금 실행시켜 success 상태까지는 기다려야 한다.

Trigger
시간 설정은 08:00으로 해놨기 때문에 내일 아침이 되어야 볼 수 있다.
기다릴 수 없다면 ⇒ 수동 trigger

수동 trigger로 인하여 Last Run이 현재 시각이다.

Calendar 버튼
2023년 1월부터의 DAG 실행 결과 확인 가능
Task Duration 버튼
Task 실행된 시간 확인 가능
Airflow로 Workflow를 개발하다 보면, 다양한 오류를 만날 수 있고 그 결과를 확인해야 하는 일이 생긴다. 이럴 때 앞에 나온 설명대로 Task의 상태를 확인할 수 있고, 정상적으로 Task가 동작했는지 또는 오류가 발생하면 어떤 오류를 나타내는지 확인할 수 있다.
SQL 작업을 데이터 파이프라인으로 구축하는 과정을 살펴보면서 SQL 쿼리의 로직이 Airflow DAG의 Task로 어떻게 변환되는지 확인해 볼 것이다.
ML Pipeline의 Data Extraction 단계를 구현하기 전에 데이터 파이프라인을 미리 경험하고, 데이터 과학자의 SQL을 주기적을 동작시키기 위해 추가로 고려해야 할 사항들을 알아본다.
apache-airflow-providers-mysql 설치
Airflow에서 MySQL 또는 MariaDB를 사용하기 위해서 MySQL에서 제공하는 패키지를 설치해야 함



Airflow에서 Feature Store(MySQL)을 접속해서 데이터를 처리하려면 Connection을 추가해야 함

이제 Airflow에서 MariaDB를 사용해 데이터를 처리하는 데이터 파이프라인을 개발할 준비가 되었다.
import pendulum # 날짜 및 시간 관련 작업 python 라이브러리
from datetime import datetime
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from support.callback_functions import success_callback, failure_callback
local_timezone = pendulum.timezone('Asia/Seoul')
conn_id = "feature_store"
task = SQLExecuteQueryOperator(
task_id="create_table_ineligible_loan_model_features_02",
conn_id=conn_id,
sql="drop table mlops.ineligible_loan_model_features"
)
with DAG(dag_id="data_extract_pipeline",
default_args={
"owner": "mlops.study",
"depends_on_past": False,
"email": ["mlops.study@gmail.com"],
"on_failure_callback": failure_callback,
"on_success_callback": success_callback,
},
description='데이터추출_파이프라인',
schedule=None, # 주기적으로 실행되지 않도록 None으로 설정
start_date=datetime(2023, 5, 1, tzinfo=local_timezone),
catchup=False,
tags=["mlops", "study"],
) as dag:
SQLExecuteQueryOperator는 데이터베이스에서 다양한 쿼리(DDL, DML)를 수행하도록 도와주는 오퍼레이터이다. 또한 칼럼 또는 테이블 수준에서의 데이터 품질을 체크하는 기능도 제공한다.
01_data_extract.sql 파일의 SQL을 하나씩 Task로 정의하여 DAG에 추가한다.
출처
Apache AirFliow2를 사용한 머신러닝 OPS[MLOP]
https://fueled.com/the-cache/posts/backend/devops/mlops-with-airflow2/
MLOps 구축 가이드북 - 김남기