지난 시간에는 다양한 Operator를 이용하여 DAG를 직접 만들어보았다.
그러나, 마지막 부분에 설명을 정확히 하지 않고 넘어간 부분이, 바로 Task들의 의존성(디펜던시) 표시 방법이다.
앞서 만들어보았던 DAG는 복잡한 디펜던시를 가지고 있지 않았기 때문에, >>
연산자만을 이용하여 간단하게 워크플로우를 정의했다.
하지만, 복잡한 의존성 관계를 가진 워크플로우의 경우 이러한 Task 디펜던시에 대한 이해가 조금 더 필요하다.
그리고 실제 운영중인 DAG에 문제가 생기거나, 특정 Task를 다시 시작해야할 경우에도 이러한 지식들은 필요할 수도 있기 때문에 알아두는 것이 좋다.
가장 기본적인 방법은 >>
<<
를 이용한 방법이다.
first_task >> [second_task , third_task]
# [] 를 이용하여 여러 태스크를 한번에 나타낼 수 있다.
third_task << fourth_task
위와 같이 직관적으로 디펜던시를 나타낼 수 있다.
또는 set_upstream
, set_downstream
메소드를 이용하여 디펜던시를 정의할 수도 있다.
first_task.set_downstream([second_task, third_task])
third_task.set_upstream(fourth_task)
서로가 서로에게 dependecy를 가지는 task의 경우 cross_downstream
을 이용하면, 한번에 디펜던시를 편하게 정의할 수 있다.
또는 동적으로 디펜던시를 정의해야할 때 이용할 수도 있다.
from airflow.models.baseopeartor import cross_downstream
# [first_task, second_task] >> third_task
# [first_task, second_task] >> fourth_task
cross_downstream(from_tasks=[first_task, second_task], to_tasks=[third_task, fourth_task])
연속적인 디펜던시를 정의하고 싶을 때는 chain
을 사용하면 편리하다
from airflow.models.baseoperator import chain
# Replaces first >> second >> third >> fourth
chain(first, second, third, fourth)
# You can also do it dynamically
chain(*[DummyOperator(task_id='op' + i) for i in range(1, 6)])