[airflow] DAG scheduling 및 debuging!!

오현우·2022년 6월 9일
0

airflow

목록 보기
9/20

airflow dag 스케쥴링

우리는 airflow dag를 정의할 때 언제부터 시작할 것인지, 주기는 어떻게 할 것인지 미리 정해놓는다.

start_date: dag를 언제부터 시작할 것인지 지정
schedule_interval: start date 부터 해당 dag 실행 주기를 지정

까먹을까봐 아래에 예시 코드를 적어 놓았다.

default_args = {
    'start_date': datetime(2022, 1, 1)
}

with DAG('user_processing',
        schedule_interval='@daily',
        default_args=default_args,
        catchup=False) as dag:

Execution Date in airflow

본론부터 말하겠다.

Execution Date 는 airflow의 left boundary이다.

airflow 에서는 batch size별로 작업을 수행하기 때문에 항상 우리가 쌓아놓은 데이터의 기간이 과거(이전까지는 작업이 완료) - 현재(새로운 데이터가 쌓임) 이렇게 존재한다.
그렇기 때문에 우리는 일정한 시간에 스케쥴만 잡아 놓으면 옮겨지지 못한 데이터가 있는 시간대 즉 left_boundary만 신경써주면 된다는 이야기가 된다.

예시를 들자면 아래와 같다.
2022-06-08 ~ 2022-06-09 기간에 우리는 데이터가 쌓이게 된다. @daily로 작업을 하라고 정해놓았기 때문에 06-09 00:00 utc에 배치 작업이 시작될 것이다.
그러면 2022-06-08이 excution_date 가 된다.

Catch up!!! and Backfilling!!

airflow를 디버깅 하기 위해서 유용하게 사용하는 몇가지 기능을 리뷰하려고 한다.

Catchup

우리는 dag를 실행했고 1일 주기로 데이터 베이스에 데이터를 마이그레이션 하려고 한다.

그런데 데이터베이스에 문제가 생겼고, airflow pipeline 도 일시 중단시켰다. 3일 뒤 데이터 베이스가 정상적으로 가동되기 시작했고, pipeline도 다시 시작했지만, 3일의 공백 기간이 발생했다. 이러한 문제는 어떻게 해결할까?

Catchup은 이러한 상황을 위한 강력한 기능이다. 일시정지된 날짜부터 현재까지 다시 dag들을 모두 순차적으로 실행시켜서 원래의 일상으로 복귀시킨다.

이제 좀 있어보이는 말로 위의 Catchup을 정의하고자 한다.

Catchup이란 non_triggered dag run들을 latest execution date 부터 순차적으로 present date 까지 모두 실행시켜주는 기능이다.

주의할 점
위의 기능은 강력하기도 하지만, 짧은 주기의 dag의 경우 과도한 오버헤드가 발생할 수도 있으니 매우 주의해서 사용해야 한다.
(10분짜리를 1달 안돌렸다고 생각해보자... 끔찍하다.)

backfilling

우리가 의도치 않게 start date를 확장하려고 할 때도 있다.

그러한 경우 아래와 같은 명령어를 통해 해결할 수 있다.

airflow dags backfill \
    --start-date START_DATE \
    --end-date END_DATE \![](https://velog.velcdn.com/images/hyunwoozz/post/10abdcae-7588-4659-823e-b1559975a555/image.png)

    dag_id

참조
https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#backfill

profile
핵심은 같게, 생각은 다르게

0개의 댓글