Task 의존성 표시 방법

JB·2022년 1월 19일
0

Airflow

목록 보기
5/5
post-thumbnail
post-custom-banner

지난 시간에는 다양한 Operator를 이용하여 DAG를 직접 만들어보았다.
그러나, 마지막 부분에 설명을 정확히 하지 않고 넘어간 부분이, 바로 Task들의 의존성(디펜던시) 표시 방법이다.

앞서 만들어보았던 DAG는 복잡한 디펜던시를 가지고 있지 않았기 때문에, >> 연산자만을 이용하여 간단하게 워크플로우를 정의했다.
하지만, 복잡한 의존성 관계를 가진 워크플로우의 경우 이러한 Task 디펜던시에 대한 이해가 조금 더 필요하다.

그리고 실제 운영중인 DAG에 문제가 생기거나, 특정 Task를 다시 시작해야할 경우에도 이러한 지식들은 필요할 수도 있기 때문에 알아두는 것이 좋다.

Task Dependecy

Bitwise shift operator

가장 기본적인 방법은 >> <<를 이용한 방법이다.

first_task >> [second_task , third_task] 
# [] 를 이용하여 여러 태스크를 한번에 나타낼 수 있다.
third_task << fourth_task

위와 같이 직관적으로 디펜던시를 나타낼 수 있다.

set_upstream, set_downstream

또는 set_upstream, set_downstream 메소드를 이용하여 디펜던시를 정의할 수도 있다.

first_task.set_downstream([second_task, third_task])
third_task.set_upstream(fourth_task)

cross_downstream

서로가 서로에게 dependecy를 가지는 task의 경우 cross_downstream을 이용하면, 한번에 디펜던시를 편하게 정의할 수 있다.
또는 동적으로 디펜던시를 정의해야할 때 이용할 수도 있다.

from airflow.models.baseopeartor import cross_downstream

# [first_task, second_task] >> third_task
# [first_task, second_task] >> fourth_task
cross_downstream(from_tasks=[first_task, second_task], to_tasks=[third_task, fourth_task])

chain

연속적인 디펜던시를 정의하고 싶을 때는 chain을 사용하면 편리하다

from airflow.models.baseoperator import chain

# Replaces first >> second >> third >> fourth
chain(first, second, third, fourth)

# You can also do it dynamically
chain(*[DummyOperator(task_id='op' + i) for i in range(1, 6)])
profile
평범한 월급쟁이 개발자
post-custom-banner

0개의 댓글