Airflow Task 여러개 사용하기

BAO.DE·2025년 3월 17일

Apache Airflow

목록 보기
9/20

Task

>> 
<<
일반적으로 task 간의 순서를 나타내기 위해서 방향성을 알리는 꺽쇠 사용
하지만 여러 task 존재시 직관성이 떨어지므로 [] 동일레벨의 task를 가질 시 list type으로 묶어서 사용

예시

from airflow import DAG
from airflow.operators.bash import EmptyOperator
import datetime
import pendulum 

with DAG (
    dag_id ="dags_conn_operator",
    schedule =None, ##cron
    start_date = pendulum.datetime(2025,1,1,tz="Asia/Seoul"),
    catchup = False, ## 누락된 구간 x 소급적용 x
)as dag: ## 실행할 bash 명령어
     t1 = EmptyOperator(
        task_id = "t1"
    )
     t2 = EmptyOperator(
        task_id = "t2"
    )
     t3 = EmptyOperator(
        task_id = "t3"
    )
     t3 = EmptyOperator(
        task_id = "t3"
    )
     t4 = EmptyOperator(
        task_id = "t4"
    )
     t5 = EmptyOperator(
        task_id = "t5"
    )
     t6 = EmptyOperator(
        task_id = "t6"
    )
     t7 = EmptyOperator(
        task_id = "t7"
    )
     t8 = EmptyOperator(
        task_id = "t8"
    )
    
     t1 >> [t2,t3] >> t4
     t5 >> t4
     [t4,t7] >> t6 >> t8 

TASK graph 확인

 1. t1 >> [t2,t3] >> t4 // t2,t3는 동일 레벨로 병렬실행
 t5 >> t4
 [t4,t7] >> t6 >> t8 // t4,t7 는 동일 레벨로 병렬실행

DAG 선언 방법

with - as

 import datetime

 from airflow import DAG
 from airflow.operators.empty import EmptyOperator

 with DAG(
     dag_id="my_dag_name",
     start_date=datetime.datetime(2021, 1, 1),
     schedule="@daily",
 ):
     EmptyOperator(task_id="task")
     
     일반적으로 많이 쓰이는 방법

생성자

 import datetime

 from airflow import DAG
 from airflow.operators.empty import EmptyOperator

 my_dag = DAG(
     dag_id="my_dag_name",
     start_date=datetime.datetime(2021, 1, 1),
     schedule="@daily",
 )
 EmptyOperator(task_id="task", dag=my_dag)
 
 선언한 Dag를 매개변수로 전달 

@데코레이터

import datetime

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator


@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
   EmptyOperator(task_id="task")


generate_dag()

종속성 선언

first_task >> [second_task, third_task]
third_task << fourth_task

first_task.set_downstream([second_task, third_task])
third_task.set_upstream(fourth_task)

두개 동작방식은 동일하나 >> airflow 공식 홈페이지에서는 꺽쇠를 권장함 

0개의 댓글