pip install "apache-airflow==2.6.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.9.txt"
export AIRFLOW_HOME=`pwd`
airflow db init
airflow users create --username <사용자이름> --firstname <이름> --lastname <성> --email <이메일> --role Admin --password <비밀번호>
airflow webserver --port 8080
airflow scheduler
localhost:8080에서 접속.pwd로 설정했던 AIRFLOW_HOME에서 DAG를 저장할 dags 폴더 만들기
mkdir dags
Airflow DAG 파일은 크게 3가지 파트로 나뉘는데
1) DAG 정의
2) TASK 정의
3) Task 순서 정의(연결)
위 세가지다.
실무에서 Airflow를 사용할 때 중요한 인자
catchup
과거에 지나간 일자의 DAG를 실행할지 결정하는 옵션
True: DAG에서 정의한 start_date부터 현재까지 미실행된 모든 스케줄에 대해 DAG를 실행함. 과거 데이터를 처리할 필요가 있을 때 유용하다.

False: DAG에서 정의한 start_date와 상관없이 앞으로 실행될 DAG를 실행

depends_on_past
특정 Task가 이전 DAG 실행 결과에 의존할지 여부를 결정함
이전 Task와 상관없이 작업을 수행하고 싶은지 고민
하루 단위의 작업들이 의존성이 있다면 True를 주고 순차적으로 실행


DAG 디렉토리 설정
AIRFLOW_HOME/dags 디렉토리를 생성. .py 파일로 저장. DAG 파일 구성
task1 >> task2 형태). DAG 파일 저장 및 확인
.py 파일로 저장 후, Airflow 웹 UI에서 DAG 목록을 확인 가능. localhost:8080. DAG 활성화 및 실행
실행 결과 확인
추가설명:
schedule_interval에 따라 주기적으로 실행 가능 (@daily, @hourly 등). PythonOperator
t2 = PythonOperator(
task_id="print_world",
python_callable=print_world,
depends_on_past = True,
owner = "hsk",
retries = 3,
retry_delay = timedelta(minutes =5),
)
BashOperator
t1 = BashOperator(
task_id = "print_hello",
bash_command = "echo Hello",
owner="hsk", # 이 작업의 오너, 보통 작업담당자 이름,
retries = 3, # Task 실패시 재시도할 횟수
retry_delay = timedelta(minutes=5) # 재시도 간격 : 5분
)DummyOperator

SimpleHttpOperator
파이썬에서 requests 모듈로도 가능하지만 이런 걸로도 가능하다.
클라우드 기능을 추상화한 Operator도 존재한다(AWS, GCP 등) - Provider Packages
pip install "apache-airflow[aws]"BranchPythonOperator
Operator 외에도 알아두면 좋은 내용
Airflow DAG를 더 풍부하게 작성할 수 있는 개념
Airflow Slack Provider 설치
Slack Webhook URL 따기
Webserver 통하여 [Admin]-[Connections]-[Add a new record] 경로로 Connection 생성

Add Copnnection 채우기

Task가 실패할 때 알림 보내주는 코드 작성
기본 아키텍쳐
Executor
스케줄링 기간이 도래한 DAG를 실행하는 객체를 의미
- Local Executor
DAG Run을 프로세스 단위로 실행한다.
- 하나의 DAG Run을 하나의 프로세스로 띄워서 실행하고
- 최대로 생성할 프로세스 수를 정해야 한다.
- Airflow를 간단하게 운영할 때 적합하다.
- Sequential Executor
- 하나의 프로세스에서 모든 DAG Run들을 처리
- Airflow 기본 Executor로, 별도 설정이 없으면 이 Executor를 사용한다.
- Airflow를 테스트로 잠시 운영할 때 적합하거나 잘 사용하지는 않음
- Remote Executor
DAG Run을 외부 프로세스로 실행
- Celery Executor
- DAG Run을 Celery Worker Process로 실행
- 보통 Redis를 중간에 두고 같이 사용
- Local Executor를 사용한다. Airflow 운영 규모가 좀 더 커지면 Celery Executor로 전환
- Kubernetes Executor
- 쿠버네티스 상에서 Airflow를 운영할 때 사용
- DAG Run 하나가 하나의 Pod(쿠버네티스의 컨테니어 같은 개념)
- Airflow 운영 규모가 큰 팀에서 사용
Workers
DAG의 작업을 수행
Metadata Database
메타 정보를 저장
Webserver
WEB UI를 담당
Airflow를 구축하는 방법으로 보통 3가지 방법을 사용
Airflow는 많이 띄운다...
어떤 회사는 데이터 엔지니어링 전용 Airflow를 띄우고
어떤 회사는 MLOps 전용 Airflow를 띄운다.
Airflow를 띄우는 것도 컴퓨터(서버)를 사용하기에 비용을 생각해야한다.
운영하는 DAG 갯수가 적은 경우에 Data Engineering + MLOps 통합 Airflow 1개를 운영하는 경우 또한 존재한다.
- 핵심 정리
Apache Airflow
- Batch Serving, 특정 시간 단위로 실행해야 할 때 사용하는 워크플로우 도구
- A Task 후, B Task, 그 후 여러 Task를 병렬 실행 등 다양한 기능 지원
- 데이터 엔지니어링, MLOps에서 많이 사용
MLOps 에서도 Airflow를 사용하는 경우