DAG(Directe Acyclic Graph)은 Airflow의 핵심개념으로 태스크들을 모아 의존성과 관계에 따라 어떻게 그들이 실행될지를 구성해 놓은 작업서이다.
DAG은 3가지 방식으로 선언할 수 있다.
with DAG("my_dag_name") as dag:
op = DummyOperator(task_id="task")
my_dag = DAG("my_dag_name")
op = DummyOperator(task_id="task", dag=my_dag)
@dag(start_date=days_ago(2))
def generate_dag():
op = DummyOperator(task_id="task")
dag = generate_dag()
DAG에서 태스크가 빠지면 앙꼬없는 찐빵이다. 태스크는 Operators
, Sensors
또는 TaskFlow
같은 형태가 있다.
태스크와 오퍼레이터는 따로 떨어져 지내지 않는다. 이들은 다른 업스트림 태스크들에 의존하며, 다운스트림 태스크들은 이들에 의존하게 된다.
다음의 2가지 주된 방법으로 태스크들간의 의존성을 설정할 수 있다. 권장하는 방법은 >>
와 <<
연산자를 사용하는 것이다.
first_task >> [second_task, third_task]
third_task << fourth_task
명시적으로는 아래처럼 업스트림과 다운스트림을 지정할 수 있다.
first_task.set_downstream(second_task, third_task)
third_task.set_upstream(fourth_task)
cross_downstream
# Replaces
# [op1, op2] >> op3
# [op1, op2] >> op4
cross_downstream([op1, op2], [op3, op4])
chain
# Replaces op1 >> op2 >> op3 >> op4
chain(op1, op2, op3, op4)
# You can also do it dynamically
chain(*[DummyOperator(task_id='op' + i) for i in range(1, 6)])
pairwise
chain
은 동일한 길이의 리스트로 pairwise
의존성을 구성할 수 있다.
# Replaces
# op1 >> op2 >> op4 >> op6
# op1 >> op3 >> op5 >> op6
chain(op1, [op2, op3], [op4, op5], op6)
DAG가 실행되는 경우는 아래 두 가지 중 하나에 해당한다.
일반적으로 schedule_interval
인자에 Crontab
스케쥴값을 셋팅하여 DAG에 등록하게 된다.
[참고] Crontab
관련해서는 아래 사이트를 참조한다.
schedule_interval
으로 DAG의 스케쥴을 표현하기에 충분하지 않다면 Timetables
로 커스텀하게 스케쥴을 등록할 수 있다.
DAG를 실행될 때마다 DAG Run
이라고 불리우는 새로운 DAG 인스턴스가 만들어지게 된다. DAG Run은 동시에 실행이 가능하며 이때 각각의 DAG Run은 서로 다른 데이터 구간 (data interval)을 담당하게 된다.
DAG가 DAG Run으로 실체화 되듯이, DAG 내의 Task들은 Task Instance 로 실체화 된다.