이번 글에서는 DAG 클래스 입력 파라미터를 잘못 사용하는 경우와 올바른 사용법을 제시합니다. (airflow 2.1.1 버전 기준)
Airflow Scheduler는 일정한 주기마다 계속해서 dag파일들이 저장된 폴더 내의 파이썬 파일들을 스캔하면서 파일별로 정의된 DAG 클래스의 스케쥴 일정에 맞춰서 Dagrun이라는 인스턴스를 생성하는 방식으로 작동한다.
이 때, DAG 클래스의 입력 파라미터 값들로 start_date(시작 시간)과 schedule_interval(실행 주기)를 입력하게 되는데 여기서 보통 많이들 착각하시는게, 시작 시간부터 스케쥴링이 시작된다고 생각하는 것입니다. 파라미터 이름만 오히려 착각한 내용이 맞다고 생각할 수밖에 없는데, airflow는 아래와 같이 작동되도록 설계되어 있습니다.
Dag는 start_date + schedule_interval 첫번째 사이클이 지나고 나서부터 작동한다.
가령, 시작시간이 목요일이고, 스케쥴이 화요일마다 실행되도록 되어 있으면 다음주 화요일까지는 아직 일주일이 안되었으므로 그 다음주 화요일에 처음 dagrun이 됩니다
이는 아마도, 어제 날짜의 raw 데이터를 가공 후 적재하는 작업을 한다고 할 때, 어제 날짜의 데이터는 어제가 다 지나고 오늘이 되어서야 raw 데이터 전부를 얻을 수 있을테니, start_date를 어제로 잡으면, 오늘에서야 그 작업을 완료할 수 있다는 점을 미리 반영해 둔 것이 아닐까라고 생각됩니다. 하지만 좀 이상하게 설계된 것 같은... (2.2.0 버전부터는 Timetable 개념이 추가되었습니다. 이에 대한 리뷰는 다음 글에서 다뤄보겠습니다)
참고로 schedule_interval 값으로는 datetime.timedelta(days=1) 이런식으로 @daily를 표현할 수 있습니다. 또한 weekly 부분에서 0이 일요일이고 6이 토요일인 것도 자주 까먹게 되는 내용이니 기억해두면 좋습니다.
dagrun들이 작업 특징에 따라서 종속적이거나 독립적으로 작동되도록 세팅할 필요성이 있다. 주로 아래 2개가 사용되는데, 둘 간의 차이가 있습니다.
depends_on_past : dagrun별 같은 위치의 Task 단위 종속성
wait_for_downstream : dagrun과 무관한 완전한 Task 단위 종속성
예를 들면, 어제 (5개의 task가 있는)dagrun이 생성되어 프로세스가 도는 와중에 3번째 task에서 fail이 나면 기본적으로 4,5번 task는 수행을 하지 않게 됩니다. 이 상태에서 오늘 새로운 dagrun이 생성된다고 하면, depends_on_past의 값이 True일 때, 오늘은 2번째까지 잘 돌고, 3번째부터 중지됩니다. 오늘 3번째 작업의 수행여부가 어제의 3번째 task 성공에 의존(depends)하는 것입니다. 반면, wait_for_downstream의 값이 True이면, 오늘의 모든 Task는 어제 5번째 task에 downstream에 있게 되고, 어제 3번째부터 중단되어 있으니 오늘 모든 task는 수행이 되지 않고 대기상태(wait)가 됩니다. 둘다 False값이라면 의존성은 없이 오늘 dagrun은 수행됩니다.
가끔 dag파일을 작성하고 첫 dagrun이 되었을때, start_date부터 오늘날 사이에 작업들이 작동해서 난처한 경우가 있을 수 있는데, 이 경우 Dag 클래스의 파라미터 중 하나인 catchup 값을 false로 넣어주면 됩니다.
# Dag 클래스 정의
default_args = {
"owner": "xxxxx",
"depends_on_past": False, # dagrun끼리 독립적
"email": ["xxxx@gmail.com"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 1, # task 실패시 한번 더 실행
"retry_delay": timedelta(minutes=5) # 실패 후 재실행 시 5분 후에 재실행
}
# please edit dag_id, schedule_interval, start_date, tags
dag = DAG(
dag_id="xxxxx",
default_args=default_args,
schedule_interval='00 7 * * *', # schedule cron 7 am daily
start_date=datetime(2021,11,11),
access_control = {"user_name": {"can_read", "can_edit"}}, # rbac 기능
tags=["xxxxx"],
catchup=False,
)
안녕하세요 포스팅 내용 중 궁금한게 있어 질문드립니다.
하나의 Dag 안에 3개의 task가 있을 경우, 실패한 task만 재실행하는 건지, dags 전체 재실행하는지 궁금합니다.