1. Airflow 설치
2. Airflow 기본 프로그램 실행
직접 설치하고 운영하는 방법
클라우드를 사용하는 방법
Ubuntu 20.04 사용
AWS EC2 t3.small나 t3a.small 인스턴스를 선택
( 2 CPU, 2 GB Memory, 8GB SSD Disk )
AWS 계정 필요
t3.small 인스턴스의 경우 한달 비용이 $18.72
t3a.small 인스턴스의 경우 한달 비용이 $16.85
ubuntu : 리눅스 타입 중의 하나
ssh : 리눅스 혹은 유닉스 서버에 로그인해주는 프로그램 (터미널)
sudo : 보통 슈퍼유저로서 프로그램을 구동할 수 있도록 하는 프로그램
apt-get: 우분투 계열의 리눅스에서 프로그램 설치/삭제를 관리해주는 프로그램
apt-get updateapt-get installsu : substitue user의 약자로 현재 사용 중인 사용자 계정을 로그아웃하지 않고 다른 사용자의 권한을 얻을 때 사용.
vi : 텍스트 에디터.
아래 링크의 Readme를 참고하여 설치를 진행
https://github.com/keeyong/airflow-setup/blob/main/docs/Airflow%202%20Installation.md
airflow-setup Github repo 클론
git clone https://github.com/keeyong/airflow-setup.git2.5.1 이미지 관련 yml 파일 다운로드
cd airflow-setup
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
이미지 다운 및 컨테이너 실행
docker-compose -f docker-compose.yaml pull
docker-compose -f docker-compose.yaml up
http://localhost:8080으로 웹 UI 로그인
DAG 대표하는 객체를 먼저 생성
( DAG 이름, 실행 주기, 실행 날짜, 오너 등 )
다음으로 DAG를 구성하는 태스크를 생성
2-1. 태스크 별로 적합한 오퍼레이터를 선택
2-2. 태스크 ID를 부여하고 작업의 세부사항 지정
최종적으로 태스크들 간의 실행 순서를 결정
DAG를 설정할 때 가장 먼저 해야하는 작업이 있습니다.
이 모든 태스크들에 공통으로 적용이 되는 설정을 Dictionary로 만들어 두어야합니다.
( 보통 default_args라 지칭 )
from datetime import datetime, timedelta
default_args = {
# 오너가 누구인지
'owner': 'keeyong',
# 오너의 이메일은 무엇인지
'email': ['keeyonghan@hotmail.com'],
# 작업이 실패했을 때 재시도를 몇 번 할 것인지
'retries': 1,
# 재시도 간에 얼마나 기다릴지 (3분)
'retry_delay': timedelta(minutes=3),
}
여기에 지정되는 인자들은 모든 태스크들에 공통으로 적용되는 설정이 됩니다.
on_success_callback, on_failure_callback
태스크가 성공 혹은 실패했을 때, 내가 어떤 함수를 호출하고 싶은 경우 사용합니다.
from airflow import DAG
dag = DAG(
# DAG 이름
"dag_v1",
# DAG 시작 날짜
start_date=datetime(2020,8,7,hour=0,minute=00),
# 이 스케줄이 뜻하는 바는 모든 0분
# 즉, 매 시간마다의 0분에 실행
schedule="0 * * * *",
# 태그 선택
tags=["example"],
catchup=False,
# 위에서 지정한 default_args를 세팅
default_args=default_args
)
스케줄 범위
앞에서부터 분,시간,월,년,요일 순으로 사용합니다.
만약, 실행을 주기적으로 하지않겠다면
None혹은@once로 세팅 후 다른 DAG가 종료되었을 때, 해당 DAG를 트리거하는 방식으로 사용도 가능합니다.
Schedule의 cron expression
None, @once, @hourly, @daily, @weekly, @monthly, @yearly
e.g.) 매년 매일 매시 정각에 실행을 하고 싶은 경우
@hourly
catchup 이란?
예를 들어, start_date가 과거로 설정됐을 경우 Airflow는 설정된 과거부터 현재의 시간까지의 기간동안 실행이 밀렸다고 생각합니다.
따라서, 밀린 해당 태스크를 실행하기 위해 그 태스크들의 스케줄에 해당되는 시간이 오면 밀린 태스크를 실행하는 것이 catchup입니다.
-> Default가
catchup=True이므로,
원치 않는다면catchup=False로 바꿔서 사용해줘야합니다.
e.g.)
Full Refresh를 하는 DAG의 경우 catchup은 항상 False로 하는 것이 좋습니다.
그 이유는 과거의 태스크들을 모두 실행하는 것이나 지금 한번 DAG를 실행하는 것이나 똑같기 때문입니다.
결국, 이 catchup은 incremental update를 하는 Data Pipeline에서만 의미가 있습니다.
대부분의 경우에서는 False가 좋습니다.

# DAG import
from airflow import DAG
# Bash Operator import
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'keeyong',
'start_date': datetime(2023, 5, 27, hour=0, minute=00),
'email': ['keeyonghan@hotmail.com'],
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
test_dag = DAG(
"dag_v1", # DAG name
schedule="0 9 * * *",
tags=['test'],
catchUp=False,
default_args=default_args
)
# BashOperator로 Task 생성
t1 = BashOperator(
task_id='print_date',
# 지금 시간 출력
bash_command='date',
dag=test_dag)
t2 = BashOperator(
task_id='sleep',
# 5초 동안 정지
bash_command='sleep 5',
dag=test_dag)
t3 = BashOperator(
task_id='ls',
# /tmp 내용 출력
bash_command='ls /tmp',
dag=test_dag)
# t1이 끝나면 t2,t3를 동시 실행
t1 >> [ t2, t3 ]
터미널에서 Docker로 Airflow 로그인
# docker의 컨테이너들을 확인해서
# 스케줄러에 접근하기 위한 ID를 찾음
docker ps
# 컨테이너 안으로 로그인.(airflow)
docker exec -it container_id sh
Airflow 로그인 후 명령어 실행
# DAG 리스트 출력
airflow dags list
# 선택한 DAG 내의 태스크 리스트 출력
airflow tasks list DAG이름
# DAG의 특정 Task를 실행하고 싶은 경우
airflow tasks test DAG이름 Task이름 날짜
# test와 run은 동일한 기능.
# test의 경우 metadata에 기록이 됨.
# run의 경우 기록이 되지 않음