SubDAG과 TaskGroupXCOM 이용trigger_rule 이용하여 복잡한 branch를 조작예를 들어, 수십, 수백 개의 file이 있다면 task또한 수십, 수백 개가 있을 것이다.
이런 task를 "그룹화", 해당 file들이 same functionality 를 공유하도록 해서
DAG를 clear하게 만들어야 한다.
TaskGroup사용(SubDAG은 사용 X)
Task A, B가 있는데 Task B는 Task A의 데이터를 필요로 하는 경우?
External Tool 혹은
XCOM이용
Condition, Value에 따라 작업들을 선택
해당 상태에 따라 다른 작업을 실행하고 싶을 때,
예를 들면) 이메일에 구독한 사람들에게는 이메일을 보내고,
그렇지 않은 사람들에게는 다른 작업을 수행한다.
trigger_rule이용 (9가지 종류)
task1 >> [task2, task3] >> task4
의 순으로 진행되는데
이 [task2, task3]을 하나의 task로 변경하기 위해서 SubDAG를 사용해야 한다.
from airflow.operators.subdag import SubDagOperator
processing = SubDagOperator(
task_id = 'processing_tasks,
subdag =
)
그래서 function을 작성해야 한다.
dags > subdags > subdag_parallel_dag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
# default_args는 with DAG의 parameter인 default_args와 같아야 한다.
def subdag_parallel_dag(parent_dag_id, child_dag_id, default_args):
with DAG(dag_id = f'{parent_dag_id}.{child_dag_id}', default_args = default_args) as dag:
task_2 = BashOperator(
task_id = 'task_2',
bash_command = 'sleep 3'
)
task_3 = BashOperator(
task_id = 'task_3',
bash_command = 'sleep 3'
)
return dag
dags > parallel_dag.py
# task_2, task_3에 해당하는 내용 subdag_parallel_dag.py로 옮김.
# subdag import
from subdags.subdag_parallel_dag import subdag_parallel_dag
processing = SubDagOperator(
task_id = 'processing_tasks',
subdag = subdag_parallel_dag('parallel_dag', 'processing_tasks', default_args)
)
SubDAGs의 tasks는 Zoom into Sub DAG로 확인할 수 있다.
But!!!
SubDAG을 사용하는 것은 권장되지 않는다.
Deadlock이 발생할 수 있다.복잡하다. new folder, new function, ...SequentialExecutor를 사용한다.SubDAG 대신에
TaskGroups를 사용해라!!
기존 DAG에
with TaskGroup('processing_tasks') as processing_tasks:
task_2 = BashOperator(
task_id = 'task_2',
bash_command = 'sleep 3'
)
task_3 = BashOperator(
task_id = 'task_3',
bash_command = 'sleep 3'
)
task_1 >> processing_tasks >> task_4
위와 같이 TaskGroup를 이용하여 instanciate하면 끝
예를 들어 여기에 여러 작업이 있고 다른 작업에서 사용할 처리 프레임워크에 따라
해당 작업을 그룹화하려는 사용 사례가 있을 수 있다.
작업을 다른 하위 그룹으로 그룹화하는 방법
sparkflink원래는 같은 task_id를 2번 사용해서는 안되지만
taskgroup에서는 가능하다.
taskgroup id로 먼저 prefix하기 때문에 서로 다른 identity를 가진다.
ex) spark_tasks.task_3
ex) flink_tasks.task_3
task_1 : download할 파일 이름을 아는 것
task_2 : 첫번째 작업에서 얻은 이름과 일치하는 파일을 다운로드 하는 것
이라고 하면,
방법은 2가지가 있다.
external tool을 데이터 교환 시 사용external tool과의 연결이 필요하다.external tool이 위의 push, pull 작업에 적합한지 여부를 확인해야 한다.XCOM를 데이터 교환 시 사용push, pull을 이용하기는 함.실제로 XCOMS와 상호 작용할 때 airflow의 metaDB에 데이터를 저장하고 있다.
SQLite : 2GB
Postgres : 1GB
Mysql : 64KB
이처럼 size 제한이 있기 때문에
airflow를 processing framework인
spark나flink와 같은
용도로 사용하면 안된다.(DB 별 메모리 제한이 있기 때문에 한정되어 있다.)
downloading_data >> [training_model a, b, c 총 3가지] >> choose_best_model
면 training_model 중에서 가장 좋은 모델을 선택할 수 있다.
accuracy가 평가 지표라고 하면,
push하는 방법?return을 사용해서 값을 push하는 방법 (Key : return_value)def _training_model():
accuracy = ...
return accuracy
xcom_push를 사용하면 된다.def _training_model(ti):
accuracy = ...
ti.xcom_push(key = 'model_accuracy', value = accuracy)
default 옵션으로 xcom_push가 적용된다.(ex) BashOperator)do_xcom_push = False 기능을 넣어주면 XCOM이 생성되지 않는다.pull하는 방법?-> xcom_pull을 하고
해당 key, task_ids를 가진 multiple XCOM을 pull 가능
앞서 말했듯이 XCOM을 사용할때는 너무 큰 데이터를 넣지 않도록 주의! Memory Overflow가 발생할 수 있다.(Database 당 허용되는 메모리 또한 고려해야 한다.)
trigger_ruleBranchPythonOperator예를 들어 값이 정상적으로 나온 경우 -> 1번을 수행하고,
값이 정상적으로 나오지 않고 비어있는 경우 -> 2번을 수행하는 식이다.
BranchPythonOperator를 이용할 수 있다.
trigger_ruletrigger rules를 적용
어떻게 해야 parent task(이전 task)의 영향을 받아 이후 task가 trigger되는 지에 대한 룰을 정한다.
(룰은 총 9가지가 있다.)
all_success
[task_a, task_b] >> task_c인 경우
task_a와 task_b 모두 성공해야 task_c가 trigger된다.
all_failed
[task_a, task_b] >> task_c인 경우
task_a와 task_b 모두 실패해야 task_c가 trigger된다.
all_done
[task_a, task_b] >> task_c인 경우
task_a와 task_b가 실패든 성공이든 모든 작업이 완료되면 task_c가 trigger된다.
one_success
[task_a, task_b] >> task_c인 경우
task_a와 task_b 중 하나라도 성공하면 task_c가 trigger된다.
(나머지 하나의 상태가 done이 아니어도 task_c가 수행)
one_failed
[task_a, task_b] >> task_c인 경우
task_a와 task_b 중 하나라도 실패하면 task_c가 trigger된다.
(나머지 하나의 상태가 done이 아니어도 task_c가 수행)
none_failed
[task_a, task_b] >> task_c인 경우
task_a와 task_b 둘다 성공 또는 스킵되는 경우 task_c가 trigger된다.
none_failed_or_skipped
[task_a, task_b] >> task_c인 경우
task_a와 task_b가 모두 실패하지 않고 하나라도 성공하면 task_c가 trigger된다.
none_skipped
[task_a, task_b] >> task_c인 경우
task_a와 task_b가 모두 성공 또는 실패인 경우
dummy
의존성은 보여주기 위한 것이고 마음대로 trigger 가능
위와 같은
trigger_rule을 사용함으로써 복잡한 branch rule을 적용할 수 있다.
Reference)