우리는 airflow dag를 정의할 때 언제부터 시작할 것인지, 주기는 어떻게 할 것인지 미리 정해놓는다.
start_date
: dag를 언제부터 시작할 것인지 지정
schedule_interval
: start date 부터 해당 dag 실행 주기를 지정
까먹을까봐 아래에 예시 코드를 적어 놓았다.
default_args = {
'start_date': datetime(2022, 1, 1)
}
with DAG('user_processing',
schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:
본론부터 말하겠다.
Execution Date
는 airflow의 left boundary이다.
airflow 에서는 batch size별로 작업을 수행하기 때문에 항상 우리가 쌓아놓은 데이터의 기간이 과거(이전까지는 작업이 완료) - 현재(새로운 데이터가 쌓임) 이렇게 존재한다.
그렇기 때문에 우리는 일정한 시간에 스케쥴만 잡아 놓으면 옮겨지지 못한 데이터가 있는 시간대 즉 left_boundary만 신경써주면 된다는 이야기가 된다.
예시를 들자면 아래와 같다.
2022-06-08 ~ 2022-06-09 기간에 우리는 데이터가 쌓이게 된다. @daily로 작업을 하라고 정해놓았기 때문에 06-09 00:00 utc에 배치 작업이 시작될 것이다.
그러면 2022-06-08이 excution_date
가 된다.
airflow를 디버깅 하기 위해서 유용하게 사용하는 몇가지 기능을 리뷰하려고 한다.
우리는 dag를 실행했고 1일 주기로 데이터 베이스에 데이터를 마이그레이션 하려고 한다.
그런데 데이터베이스에 문제가 생겼고, airflow pipeline 도 일시 중단시켰다. 3일 뒤 데이터 베이스가 정상적으로 가동되기 시작했고, pipeline도 다시 시작했지만, 3일의 공백 기간이 발생했다. 이러한 문제는 어떻게 해결할까?
Catchup
은 이러한 상황을 위한 강력한 기능이다. 일시정지된 날짜부터 현재까지 다시 dag들을 모두 순차적으로 실행시켜서 원래의 일상으로 복귀시킨다.
이제 좀 있어보이는 말로 위의 Catchup을 정의하고자 한다.
Catchup
이란 non_triggered dag run들을 latest execution date 부터 순차적으로 present date 까지 모두 실행시켜주는 기능이다.
주의할 점
위의 기능은 강력하기도 하지만, 짧은 주기의 dag의 경우 과도한 오버헤드가 발생할 수도 있으니 매우 주의해서 사용해야 한다.
(10분짜리를 1달 안돌렸다고 생각해보자... 끔찍하다.)
우리가 의도치 않게 start date를 확장하려고 할 때도 있다.
그러한 경우 아래와 같은 명령어를 통해 해결할 수 있다.
airflow dags backfill \
--start-date START_DATE \
--end-date END_DATE \![](https://velog.velcdn.com/images/hyunwoozz/post/10abdcae-7588-4659-823e-b1559975a555/image.png)
dag_id
참조
https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#backfill