[Airflow] 02. DAG 구성하기 (1)

Jay Park·2021년 11월 22일
0

DAG (1)

DAG(Directe Acyclic Graph)은 Airflow의 핵심개념으로 태스크들을 모아 의존성과 관계에 따라 어떻게 그들이 실행될지를 구성해 놓은 작업서이다.

Example DAGs

DAG 선언하기

DAG은 3가지 방식으로 선언할 수 있다.

  1. Context Manager 안에서 암묵적으로 선언
with DAG("my_dag_name") as dag:
    op = DummyOperator(task_id="task")
  1. 표준 생성자
my_dag = DAG("my_dag_name")
op = DummyOperator(task_id="task", dag=my_dag)
  1. @dag 꾸밈자 (decorator)
@dag(start_date=days_ago(2))
def generate_dag():
    op = DummyOperator(task_id="task")

dag = generate_dag()

DAG에서 태스크가 빠지면 앙꼬없는 찐빵이다. 태스크는 Operators, Sensors 또는 TaskFlow 같은 형태가 있다.

Task Dependencies

태스크와 오퍼레이터는 따로 떨어져 지내지 않는다. 이들은 다른 업스트림 태스크들에 의존하며, 다운스트림 태스크들은 이들에 의존하게 된다.

다음의 2가지 주된 방법으로 태스크들간의 의존성을 설정할 수 있다. 권장하는 방법은 >><< 연산자를 사용하는 것이다.

first_task >> [second_task, third_task]
third_task << fourth_task

명시적으로는 아래처럼 업스트림과 다운스트림을 지정할 수 있다.

first_task.set_downstream(second_task, third_task)
third_task.set_upstream(fourth_task)

cross_downstream

# Replaces
# [op1, op2] >> op3
# [op1, op2] >> op4
cross_downstream([op1, op2], [op3, op4])

chain

# Replaces op1 >> op2 >> op3 >> op4
chain(op1, op2, op3, op4)

# You can also do it dynamically
chain(*[DummyOperator(task_id='op' + i) for i in range(1, 6)])

pairwise

chain은 동일한 길이의 리스트로 pairwise 의존성을 구성할 수 있다.

# Replaces
# op1 >> op2 >> op4 >> op6
# op1 >> op3 >> op5 >> op6
chain(op1, [op2, op3], [op4, op5], op6)

DAG 실행하기

DAG가 실행되는 경우는 아래 두 가지 중 하나에 해당한다.

  • API에 의해서나 매뉴얼하게 트리거될 때 실행이 된다.
  • 정해진 일정이 되면 실행이 된다.

일반적으로 schedule_interval 인자에 Crontab 스케쥴값을 셋팅하여 DAG에 등록하게 된다.

[참고] Crontab 관련해서는 아래 사이트를 참조한다.

schedule_interval 으로 DAG의 스케쥴을 표현하기에 충분하지 않다면 Timetables 로 커스텀하게 스케쥴을 등록할 수 있다.

DAG를 실행될 때마다 DAG Run 이라고 불리우는 새로운 DAG 인스턴스가 만들어지게 된다. DAG Run은 동시에 실행이 가능하며 이때 각각의 DAG Run은 서로 다른 데이터 구간 (data interval)을 담당하게 된다.

DAG가 DAG Run으로 실체화 되듯이, DAG 내의 Task들은 Task Instance 로 실체화 된다.

참고자료

profile
Jaytiger

0개의 댓글