
데이터 파이프라인, Airflow (2)
직접 설치하고 운영
클라우드 사용 (프로덕션 환경에서 선호됨)
개인 랩탑에 도커 설치 후 Airflow 설치
AWS EC2 등의 리눅스 서버에 직접 설치
from datetime import datetime, timedelta
default_args = {
'owner': 'keeyong',
'email': ['keeyonghan@hotmail.com'],
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
default_args)retries : 태스크가 실패하면 재시도 할지 말지, 한다면 몇 번 할지
retry_delay : 재시도시 얼마나 기다릴지
그 외
on_failure_callback : 실패 시 호출할 함수
on_success_callback : 성공 시 호출할 함수
from airflow import DAG
dag = DAG(
"dag_v1", # DAG name
start_date=datetime(2020,8,7,hour=0,minute=00),
schedule="0 * * * *",
tags=["example"],
catchup=False,
# common settings
default_args=default_args
)
start_date : 시작 날짜
catchup : start_date부터 활성화 시점까지 실행 안된 날짜에 대해서 실행하려 함
full refresh 때는 false로
schedule="0 * * * *" : 매시 0분에 시작
schedule
None, @once, @hourly, @daily, @weekly, @monthly, @yearly 등으로 세팅 가능
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'keeyong',
'start_date': datetime(2023, 5, 27, hour=0, minute=00),
'email': ['keeyonghan@hotmail.com'],
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
test_dag = DAG(
"dag_v1", # DAG name
schedule="0 9 * * *",
tags=['test'],
catchUp=False,
default_args=default_args
)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=test_dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
dag=test_dag)
t3 = BashOperator(
task_id='ls',
bash_command='ls /tmp',
dag=test_dag)
t1 >> [ t2, t3 ]
Airflow 서버에 로그인하고 다음 명령 실행