[AIRFLOW] Airflow의 Setup과 Teardown에 대해 알아보자!

NewNewDaddy·2025년 6월 11일
0

AIRFLOW

목록 보기
3/4
post-thumbnail

O. INTRO

  • 이번 블로그 글에서 다뤄볼 내용은 Airflow Dag 내의 Task 실행시 선행 작업과 후행 작업을 설정할 수 있는 setupteardown 기능입니다.
  • 예를 들어 아래와 같은 흐름의 파이프라인을 구성해보겠습니다.
    1) MySQL 데이터베이스에 연결
    2) 테이블 데이터 조회 후 집계 테이블로 변환하여 저장
    3) 데이터베이스 연결 종료
    여기서 실질적인 작업을 하는 Task는 2번이고 1,3번은 2번 Task 앞뒤에 따라서 실행되는 작업입니다.
  • 이런 경우, 1~3까지의 로직을 각각의 Task로 구성해도 상관없지만 setupteardown 기능을 활용한다면 2번 작업에 대한 선행, 후행 작업으로 선언하는 것이 이후에 이어질 파이프라인 작업을 선언할 때도 훨씬 직관적으며, 이후 Clear 같은 Task 단위의 재실행 작업시에도 깔끔하게 실행이 가능해집니다.

1. 기본 작업 구성

import pendulum
from airflow import DAG
from airflow.sdk import DAG, task

with DAG(
        dag_id="setup_n_teardown",
        schedule="@once",
        start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
        catchup=False,
) as dag:
    @task(task_id='pre_task')
    def pre_task():
        print('INITIALIZE')

    @task(task_id='real_task')
    def real_task():
        MSG = """
        HELLO WORLD!
        
        THIS IS REAL PIPELINE TASK!
        """
        print(MSG)

    @task(task_id='post_task')
    def post_task():
        print('FINALIZE')

pre_task().as_setup() >> real_task() >> post_task().as_teardown()
  • pre_task : real_task 이전에 선행되는 작업
  • real_task : DAG의 실질적인 작업
  • post_task : real_task가 끝나고 후행되는 작업
  • 위와 같이 real_task에 대한 선행 작업은 as_setup(), 후행 작업은 as_teardown() 메소드를 통해 각각 설정이 가능합니다.
  • Airflow UI의 그래프를 확인해보면 일반적인 Task와는 다르게 setup이 설정된 Task는 , teardown이 설정된 작업은 모양의 화살표가 붙어있는 것을 확인할 수 있습니다.
  • 또한 위의 경우, real_task만을 Clear로 재실행하면 해당 Task의 선후행 작업이 모두 같이 실행되는 것을 볼 수 있습니다.

2. Task Group 작업 구성

  • 이번엔 Task Group으로 묶인 Task 내에서 setupteardown 선언시 어떻게 작업이 처리되는지 보도록 하겠습니다.
import pendulum
from airflow.sdk import DAG, task, task_group


with DAG(
        dag_id="setup_n_teardown_tg",
        schedule="@once",
        start_date=pendulum.datetime(2025, 6, 1, tz="Asia/Seoul"),
        catchup=False,
) as dag:

    @task_group(group_id='first_group')
    def first_group():
        @task(task_id='pre_task')
        def pre_task():
            print('INITIALIZE')

        @task(task_id='real_task')
        def real_task():
            MSG = """
            HELLO WORLD!
            THIS IS REAL PIPELINE TASK!
            """
            print(MSG)

        @task(task_id='post_task')
        def post_task():
            print('FINALIZE')
            
        pre_task().as_setup() >> real_task() >> post_task().as_teardown()

    @task(task_id='task_outer_tg')
    def task_outer_tg():
        print('outer tg')
        
    @task_group(group_id='second_group')
    def second_group():
        @task(task_id='pre_task')
        def pre_task():
            print('INITIALIZE')

        @task(task_id='real_task')
        def real_task():
            MSG = """
            HELLO WORLD!
            THIS IS REAL PIPELINE TASK!
            """
            print(MSG)

        @task(task_id='post_task')
        def post_task():
            print('FINALIZE')
        
        pre_task().as_setup() >> real_task() >> post_task().as_teardown()

    first_group() >> task_outer_tg() >> second_group()
  • first_groupsecond_group 내에 각각 선후행 작업이 있는 Task가 존재합니다. 이 두 Task Group 사이에는 task_outer_tg라는 Task가 연결되어 있습니다.
  • 아래 그래프에서 볼 수 있듯 Task Group 내에서 setupteardown이 설정된 경우, 이후에 따라오는 작업(task_outer_tg)은 이전 작업의 real_task와 곧바로 연결되는 것을 확인할 수 있습니다.
  • 즉, 그룹이 이후 다른 Task과 연결될 때, 그 Task는 그룹내에 있는 선후행 작업이 아닌 실제 작업과 연결되어 실행되고, 후행 작업은 그와 독립적으로 실행되는 것을 확인할 수 있습니다.

3. teardown 작업 옵션 설정

  • 1에서 다룬 예제와 같이 선후행 작업이 설정되어 있을 때,
    선행 작업, 본작업, 후행 작업 각각 Fail시 어떻게 진행되는지 보도록 하겠습니다.

1) 선행 작업 실패시

  • 선행 작업 실패 → 본작업 실패 + 전체 DAG 실패 → 후행 작업은 성공 or 실패
  • 선행 작업이 실패하면 본작업 역시 제대로 진행되지 못하여 전체 DAG가 FAILED 됩니다. 하지만 후행 작업은 성공될 수 있습니다.

2) 본작업 실패시

  • 선행 작업 성공 → 본작업 실패 + 전체 DAG 실패 → 후행 작업 성공 or 실패
  • 본작업이 실패하면 전체 DAG는 FAILED되지만 후행 작업은 성공될 수 있습니다.

3) 후행 작업 실패시

  • 선행 작업 성공 → 본작업 성공 + 전체 DAG 성공 → 후행 작업 성공

  • 후행 작업의 경우 성공, 실패 여부와 상관 없이 본작업이 성공하게 되면 전체 DAG는 SUCCESS로 표시됩니다.

  • 하지만, 아래 설정을 통해 후행 작업 실패시 전체 DAG가 FAILED 되도록 설정할 수도 있습니다.

    • as_teardown(on_failure_fail_dagrun=True)

4. 참고 자료

profile
데이터 엔지니어의 작업공간 / #PYTHON #CLOUD #SPARK #AWS #GCP #NCLOUD

0개의 댓글