Airflow Study18- Task Group

박성현·2024년 6월 9일
0

Airflow

목록 보기
25/28
post-thumbnail

Task가 갯수가 많을때 , Group화 해서 관리 하기 위함 .
Task들의 모음 , Task Group안에 Task Group 중첩 사용가능

Decorator & 클래스 2가지 방법이 존재

개인적으로 클래스 이용하는게 편함

task순서와 동일하게 그룹간에도 아래와 같이 순서 지정 가능
group_1() >> group_2

1. Decorator 이용

from airflow.decorators import task_group

	@task_group(group_id='first_group')
    def group_1():
        ''' task_group 데커레이터를 이용한 첫 번째 그룹입니다. '''

        @task(task_id='inner_function1')
        def inner_func1(**kwargs):
            print('첫 번째 TaskGroup 내 첫 번째 task입니다.')

        inner_function2 = PythonOperator(
            task_id='inner_function2',
            python_callable=inner_func,
            op_kwargs={'msg':'첫 번째 TaskGroup내 두 번쨰 task입니다.'}
        )

        inner_func1() >> inner_function2

2. 클래스 이용

from airflow.decorators import task_group

with TaskGroup(group_id='second_group', tooltip='두 번째 그룹입니다') as group_2:
       @task(task_id='inner_function1')
       def inner_func1(**kwargs):
           print('두 번째 TaskGroup 내 첫 번째 task입니다.')

       inner_function2 = PythonOperator(
           task_id='inner_function2',
           python_callable=inner_func,
           op_kwargs={'msg': '두 번째 TaskGroup내 두 번째 task입니다.'}
       )
       inner_func1() >> inner_function2
    

설명

아래 UI처럼 접었다 폈다 가능 , tooltip도 확인 가능

그룹간 Trigger_rule 지정가능

with DAG(dag_id='task_group_trigger_rule', start_date=datetime(2023, 1, 1)) as dag:
    with TaskGroup(group_id='group1') as tg1:
        task1 = DummyOperator(task_id='task1')
        task2 = DummyOperator(task_id='task2')

    with TaskGroup(group_id='group2', trigger_rule='all_success') as tg2:
        task3 = DummyOperator(task_id='task3')
        task4 = DummyOperator(task_id='task4')

    tg1 >> tg2
profile
다소Good한 데이터 엔지니어

0개의 댓글