[Airflow] Task Group

식빵·2025년 6월 23일
0

Airflow

목록 보기
7/9
post-thumbnail

이 게시물은 airflow 2.10.5 버전을 사용해서 작성됐습니다.
3.x 버전과 조금 다를 수 있으니 유의하시기 바랍니다.

Task Group 소개

Airflow 에서는 여러 Task 를 하나의 Group 에 넣거나,
GroupGroup 를 연결하여 하나의 workflow 를 만들 수 있습니다.

그림출처: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#taskgroups

하는 방법은 단순하기 때문에 예시코드, 실행결과만 보고 글을 마치도록 하겠습니다.



Task Group 사용법

예시 코드

from airflow import DAG
from airflow.decorators import task, task_group
import pendulum


with DAG(
    dag_id='dags_task_group_sample1',
    start_date=pendulum.datetime(2025, 6, 1, tz='Asia/Seoul'),
    schedule='@daily',
    catchup=False
) as dag:
    
    # 목표
    ## [그룹 1 : [태스트1 -> 태스트2]] -> [그룹 2 : [태스트1 -> 태스트2]]
    
    @task_group(group_id='group_1')
    def group_1():
        
        @task(task_id='group_1_task_1')
        def group_1_task_1():
            print('group_1_task_1 executed')
        
        @task(task_id='group_1_task_1')
        def group_1_task_2():
            print('group_1_task_2 executed')
        
        group_1_task_1() >> group_1_task_2()
    
    
    @task_group(group_id='group_2')
    def group_2():
        
        @task(task_id='group_2_task_1')
        def group_2_task_1():
            print('group_2_task_1 executed')
        
        @task(task_id='group_2_task_2')
        def group_2_task_2():
            print('group_2_task_2 executed')
        
        group_2_task_1() >> group_2_task_2()
    
    group_1() >> group_2()

생각보다 대단한 내용이 없죠?
airflow ui 에서 DAG 를 Unpaused 하고 실행된 결과를 한번 관찰해보죠.



참고로 Group 안에 또다른 Group 을 계속해서 넣을 수도 있습니다.

from airflow import DAG
from airflow.decorators import task, task_group
import pendulum

with DAG(
    dag_id='dags_task_group_sample2',
    start_date=pendulum.datetime(2025, 6, 1, tz='Asia/Seoul'),
    schedule='@daily',
    catchup=False
) as dag:
    
    # 목표
    ## [그룹 1 : [태스트1 -> [INNER 그룹 : 태스트3] -> 태스트2]] -> [그룹 2 : [태스트1 -> 태스트2]]
    
    @task_group(group_id='group_1')
    def group_1():
        
        @task(task_id='group_1_task_1')
        def group_1_task_1():
            print('group_1_task_1 executed')
        
        @task(task_id='group_1_task_1')
        def group_1_task_2():
            print('group_1_task_2 executed')
            

        @task_group(group_id='group_in_group')
        def group_in_group():
            @task(task_id='task_in_group_in_group')
            def task_in_group_in_group():
                print('task_in_group_in_group executed')
            
            task_in_group_in_group()
        
        group_1_task_1() >> group_in_group() >> group_1_task_2()
    
# 이 아래부터는 이전코드와 동일. 생략.

실행해보면 아래와 같이 Graph 가 나옵니다.



참고한 링크

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#taskgroups

profile
백엔드 개발자로 일하고 있는 식빵(🍞)입니다.

0개의 댓글