SubDAG
과 TaskGroup
XCOM
이용trigger_rule
이용하여 복잡한 branch를 조작예를 들어, 수십, 수백 개의 file이 있다면 task또한 수십, 수백 개가 있을 것이다.
이런 task를 "그룹화", 해당 file들이 same functionality 를 공유하도록 해서
DAG를 clear하게 만들어야 한다.
TaskGroup
사용(SubDAG
은 사용 X)
Task A, B가 있는데 Task B는 Task A의 데이터를 필요로 하는 경우?
External Tool 혹은
XCOM
이용
Condition, Value에 따라 작업들을 선택
해당 상태에 따라 다른 작업을 실행하고 싶을 때,
예를 들면) 이메일에 구독한 사람들에게는 이메일을 보내고,
그렇지 않은 사람들에게는 다른 작업을 수행한다.
trigger_rule
이용 (9가지 종류)
task1 >> [task2, task3] >> task4
의 순으로 진행되는데
이 [task2, task3]
을 하나의 task로 변경하기 위해서 SubDAG
를 사용해야 한다.
from airflow.operators.subdag import SubDagOperator
processing = SubDagOperator(
task_id = 'processing_tasks,
subdag =
)
그래서 function을 작성해야 한다.
dags > subdags > subdag_parallel_dag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
# default_args는 with DAG의 parameter인 default_args와 같아야 한다.
def subdag_parallel_dag(parent_dag_id, child_dag_id, default_args):
with DAG(dag_id = f'{parent_dag_id}.{child_dag_id}', default_args = default_args) as dag:
task_2 = BashOperator(
task_id = 'task_2',
bash_command = 'sleep 3'
)
task_3 = BashOperator(
task_id = 'task_3',
bash_command = 'sleep 3'
)
return dag
dags > parallel_dag.py
# task_2, task_3에 해당하는 내용 subdag_parallel_dag.py로 옮김.
# subdag import
from subdags.subdag_parallel_dag import subdag_parallel_dag
processing = SubDagOperator(
task_id = 'processing_tasks',
subdag = subdag_parallel_dag('parallel_dag', 'processing_tasks', default_args)
)
SubDAGs의 tasks는 Zoom into Sub DAG로 확인할 수 있다.
But!!!
SubDAG을 사용하는 것은 권장되지 않는다.
Deadlock
이 발생할 수 있다.복잡
하다. new folder, new function, ...SequentialExecutor
를 사용한다.SubDAG
대신에
TaskGroups
를 사용해라!!
기존 DAG에
with TaskGroup('processing_tasks') as processing_tasks:
task_2 = BashOperator(
task_id = 'task_2',
bash_command = 'sleep 3'
)
task_3 = BashOperator(
task_id = 'task_3',
bash_command = 'sleep 3'
)
task_1 >> processing_tasks >> task_4
위와 같이 TaskGroup
를 이용하여 instanciate하면 끝
예를 들어 여기에 여러 작업이 있고 다른 작업에서 사용할 처리 프레임워크에 따라
해당 작업을 그룹화하려는 사용 사례가 있을 수 있다.
작업을 다른 하위 그룹으로 그룹화하는 방법
spark
flink
원래는 같은 task_id를 2번 사용해서는 안되지만
taskgroup에서는 가능하다.
taskgroup id로 먼저 prefix
하기 때문에 서로 다른 identity를 가진다.
ex) spark_tasks.task_3
ex) flink_tasks.task_3
task_1 : download할 파일 이름을 아는 것
task_2 : 첫번째 작업에서 얻은 이름과 일치하는 파일을 다운로드 하는 것
이라고 하면,
방법은 2가지가 있다.
external tool
을 데이터 교환 시 사용external tool
과의 연결
이 필요하다.external tool
이 위의 push, pull 작업에 적합한지 여부를 확인해야 한다
.XCOM
를 데이터 교환 시 사용push
, pull
을 이용하기는 함.실제로 XCOMS와 상호 작용할 때 airflow의 metaDB에 데이터를 저장하고 있다.
SQLite : 2GB
Postgres : 1GB
Mysql : 64KB
이처럼 size 제한이 있기 때문에
airflow를 processing framework인
spark
나flink
와 같은
용도로 사용하면 안된다.(DB 별 메모리 제한이 있기 때문에 한정되어 있다.)
downloading_data >> [training_model a, b, c 총 3가지] >> choose_best_model
면 training_model 중에서 가장 좋은 모델을 선택할 수 있다.
accuracy
가 평가 지표라고 하면,
push
하는 방법?return
을 사용해서 값을 push하는 방법 (Key : return_value)def _training_model():
accuracy = ...
return accuracy
xcom_push
를 사용하면 된다.def _training_model(ti):
accuracy = ...
ti.xcom_push(key = 'model_accuracy', value = accuracy)
default 옵션으로 xcom_push
가 적용된다.(ex) BashOperator)do_xcom_push = False
기능을 넣어주면 XCOM이 생성되지 않는다.pull
하는 방법?-> xcom_pull
을 하고
해당 key
, task_ids
를 가진 multiple XCOM을 pull 가능
앞서 말했듯이 XCOM을 사용할때는 너무 큰 데이터를 넣지 않도록 주의! Memory Overflow가 발생할 수 있다.(Database 당 허용되는 메모리 또한 고려해야 한다.)
trigger_rule
BranchPythonOperator
예를 들어 값이 정상적으로 나온 경우 -> 1번을 수행하고,
값이 정상적으로 나오지 않고 비어있는 경우 -> 2번을 수행하는 식이다.
BranchPythonOperator
를 이용할 수 있다.
trigger_rule
trigger rules
를 적용
어떻게 해야 parent task(이전 task)의 영향을 받아 이후 task가 trigger
되는 지에 대한 룰을 정한다.
(룰은 총 9가지가 있다.)
all_success
[task_a, task_b] >> task_c
인 경우
task_a와 task_b 모두 성공해야 task_c가 trigger
된다.
all_failed
[task_a, task_b] >> task_c
인 경우
task_a와 task_b 모두 실패해야 task_c가 trigger
된다.
all_done
[task_a, task_b] >> task_c
인 경우
task_a와 task_b가 실패든 성공이든 모든 작업이 완료되면 task_c가 trigger
된다.
one_success
[task_a, task_b] >> task_c
인 경우
task_a와 task_b 중 하나라도 성공하면 task_c가 trigger
된다.
(나머지 하나의 상태가 done이 아니어도 task_c가 수행)
one_failed
[task_a, task_b] >> task_c
인 경우
task_a와 task_b 중 하나라도 실패하면 task_c가 trigger
된다.
(나머지 하나의 상태가 done이 아니어도 task_c가 수행)
none_failed
[task_a, task_b] >> task_c
인 경우
task_a와 task_b 둘다 성공 또는 스킵되는 경우 task_c가 trigger
된다.
none_failed_or_skipped
[task_a, task_b] >> task_c
인 경우
task_a와 task_b가 모두 실패하지 않고 하나라도 성공하면 task_c가 trigger
된다.
none_skipped
[task_a, task_b] >> task_c
인 경우
task_a와 task_b가 모두 성공 또는 실패인 경우
dummy
의존성은 보여주기 위한 것이고 마음대로 trigger 가능
위와 같은
trigger_rule
을 사용함으로써 복잡한 branch rule을 적용할 수 있다.
Reference)