04_Advanced Concepts in Airflow

Copes·2021년 12월 6일
1

Airflow

목록 보기
4/5
post-thumbnail
  • SubDAGTaskGroup
  • Task끼리 data를 주고받을 때
    • External Tool 이용(ex DB, AWS S3 등)
    • XCOM 이용
  • trigger_rule 이용하여 복잡한 branch를 조작

예를 들어, 수십, 수백 개의 file이 있다면 task또한 수십, 수백 개가 있을 것이다.

이런 task를 "그룹화", 해당 file들이 same functionality 를 공유하도록 해서
DAG를 clear하게 만들어야 한다.

TaskGroup 사용(SubDAG은 사용 X)

Task A, B가 있는데 Task B는 Task A의 데이터를 필요로 하는 경우?

External Tool 혹은 XCOM 이용

Condition, Value에 따라 작업들을 선택

해당 상태에 따라 다른 작업을 실행하고 싶을 때,
예를 들면) 이메일에 구독한 사람들에게는 이메일을 보내고,
그렇지 않은 사람들에게는 다른 작업을 수행한다.

trigger_rule 이용 (9가지 종류)


SubDAG과 TaskGroup

SubDAG

task1 >> [task2, task3] >> task4
의 순으로 진행되는데
[task2, task3]을 하나의 task로 변경하기 위해서 SubDAG를 사용해야 한다.

from airflow.operators.subdag import SubDagOperator

processing = SubDagOperator(
	task_id = 'processing_tasks,
	subdag = 
)
  • "subdag의 리턴으로 받을 어떤 function을 작성하지 않았는데?"

    그래서 function을 작성해야 한다.

dags > subdags > subdag_parallel_dag.py

from airflow import DAG
from airflow.operators.bash import BashOperator

# default_args는 with DAG의 parameter인 default_args와 같아야 한다.
def subdag_parallel_dag(parent_dag_id, child_dag_id, default_args):
    with DAG(dag_id = f'{parent_dag_id}.{child_dag_id}', default_args = default_args) as dag:
        task_2 = BashOperator(
        task_id = 'task_2',
        bash_command = 'sleep 3'
        )

        task_3 = BashOperator(
            task_id = 'task_3',
            bash_command = 'sleep 3'
        )

        return dag

dags > parallel_dag.py

# task_2, task_3에 해당하는 내용 subdag_parallel_dag.py로 옮김.

# subdag import
from subdags.subdag_parallel_dag import subdag_parallel_dag

processing = SubDagOperator(
        task_id = 'processing_tasks',
        subdag = subdag_parallel_dag('parallel_dag', 'processing_tasks', default_args)
    )

SubDAGs의 tasks는 Zoom into Sub DAG로 확인할 수 있다.

But!!!
SubDAG을 사용하는 것은 권장되지 않는다.

  1. Deadlock이 발생할 수 있다.
  2. subDAG은 복잡하다. new folder, new function, ...
    그래서 data pipeline의 복잡도를 높인다.
  3. LocalExecutor로 되어 있어도 기본적으로 SequentialExecutor를 사용한다.
    -> SubDAG을 사용하지 마라!

TaskGroup

SubDAG 대신에
TaskGroups를 사용해라!!

기존 DAG에

with TaskGroup('processing_tasks') as processing_tasks:
        task_2 = BashOperator(
            task_id = 'task_2',
            bash_command = 'sleep 3'
        )

        task_3 = BashOperator(
            task_id = 'task_3',
            bash_command = 'sleep 3'
        )

task_1 >> processing_tasks >> task_4

위와 같이 TaskGroup를 이용하여 instanciate하면 끝

less complexity

예를 들어 여기에 여러 작업이 있고 다른 작업에서 사용할 처리 프레임워크에 따라
해당 작업을 그룹화하려는 사용 사례가 있을 수 있다.

작업을 다른 하위 그룹으로 그룹화하는 방법

  • task3 : spark
  • task3 : flink

원래는 같은 task_id를 2번 사용해서는 안되지만
taskgroup에서는 가능하다.
taskgroup id로 먼저 prefix하기 때문에 서로 다른 identity를 가진다.

ex) spark_tasks.task_3
ex) flink_tasks.task_3


XCOM

  • task들 간에 data를 주고받도록 하는 방법?
  • 동시에 airflow memory가 explode하지 않도록 하는 방법?

task_1 : download할 파일 이름을 아는 것
task_2 : 첫번째 작업에서 얻은 이름과 일치하는 파일을 다운로드 하는 것
이라고 하면,

방법은 2가지가 있다.

1. external tool을 데이터 교환 시 사용

  • DB나 AWS S3를 이용(push, pull을 이용할 수 있다.)

but, 데이터 파이프라인의 복잡도가 올라갈 것이다. 이유는

  • external tool과의 연결이 필요하다.
  • 해당 external tool이 위의 push, pull 작업에 적합한지 여부를 확인해야 한다.

2. XCOM를 데이터 교환 시 사용

  • push, pull을 이용하기는 함.
    • 상호 통신 가능
    • 적은 양의 데이터는 교환할 수 있다.

실제로 XCOMS와 상호 작용할 때 airflow의 metaDB에 데이터를 저장하고 있다.

  • DB마다 제한 데이터 크기가 다르다
SQLite : 2GB
Postgres : 1GB
Mysql : 64KB

이처럼 size 제한이 있기 때문에

airflow를 processing framework인 sparkflink와 같은
용도로 사용하면 안된다.(DB 별 메모리 제한이 있기 때문에 한정되어 있다.)

downloading_data >> [training_model a, b, c 총 3가지] >> choose_best_model
training_model 중에서 가장 좋은 모델을 선택할 수 있다.
accuracy가 평가 지표라고 하면,

  • training a, b, c간 accuracy를 공유하면서 최적의 결과값을 이끌어낼 수 있다.

XCOM에 value를 push하는 방법?

  1. return을 사용해서 값을 push하는 방법 (Key : return_value)
def _training_model():
    accuracy = ...
    return accuracy
  1. speicific key를 XCOM에 push하기 위해서는 xcom_push를 사용하면 된다.
    (Key 이름을 설정할 수 있다.)
def _training_model(ti):
	accuracy = ...
	ti.xcom_push(key = 'model_accuracy', value = accuracy)
  1. 어떤 Operator에는 default 옵션으로 xcom_push가 적용된다.(ex) BashOperator)
    해당 Operator 내에 do_xcom_push = False 기능을 넣어주면 XCOM이 생성되지 않는다.

XCOM에 value를 pull하는 방법?

-> xcom_pull을 하고
해당 key, task_ids를 가진 multiple XCOM을 pull 가능

앞서 말했듯이 XCOM을 사용할때는 너무 큰 데이터를 넣지 않도록 주의! Memory Overflow가 발생할 수 있다.(Database 당 허용되는 메모리 또한 고려해야 한다.)


trigger_rule

BranchPythonOperator

  • 해당 task의 결과에 따라 어떤 task를 진행할지가 달라지도록 해야하는 경우가 있다.

예를 들어 값이 정상적으로 나온 경우 -> 1번을 수행하고,
값이 정상적으로 나오지 않고 비어있는 경우 -> 2번을 수행하는 식이다.

BranchPythonOperator를 이용할 수 있다.

  • 여러 개 중 하나를 시행할 수도 있지만, 여러 개를 모두 실행할 수도 있다.

trigger_rule

  • 그러면 작업이 수행되고, 이전 작업의 결과에 따라 의도대로 뒤의 작업에 영향을 주고 싶다면?

trigger rules를 적용
어떻게 해야 parent task(이전 task)의 영향을 받아 이후 task가 trigger되는 지에 대한 룰을 정한다.
(룰은 총 9가지가 있다.)

  1. all_success
    [task_a, task_b] >> task_c인 경우
    task_a와 task_b 모두 성공해야 task_c가 trigger된다.

  2. all_failed
    [task_a, task_b] >> task_c인 경우
    task_a와 task_b 모두 실패해야 task_c가 trigger된다.

  3. all_done
    [task_a, task_b] >> task_c인 경우
    task_a와 task_b가 실패든 성공이든 모든 작업이 완료되면 task_c가 trigger된다.

  4. one_success
    [task_a, task_b] >> task_c인 경우
    task_a와 task_b 중 하나라도 성공하면 task_c가 trigger된다.
    (나머지 하나의 상태가 done이 아니어도 task_c가 수행)

  5. one_failed
    [task_a, task_b] >> task_c인 경우
    task_a와 task_b 중 하나라도 실패하면 task_c가 trigger된다.
    (나머지 하나의 상태가 done이 아니어도 task_c가 수행)

  6. none_failed
    [task_a, task_b] >> task_c인 경우
    task_a와 task_b 둘다 성공 또는 스킵되는 경우 task_c가 trigger된다.

  7. none_failed_or_skipped
    [task_a, task_b] >> task_c인 경우
    task_a와 task_b가 모두 실패하지 않고 하나라도 성공하면 task_c가 trigger된다.

  8. none_skipped
    [task_a, task_b] >> task_c인 경우
    task_a와 task_b가 모두 성공 또는 실패인 경우

  9. dummy
    의존성은 보여주기 위한 것이고 마음대로 trigger 가능

위와 같은 trigger_rule을 사용함으로써 복잡한 branch rule을 적용할 수 있다.



Reference)

0개의 댓글