DevCourse TIL Day5 Week12 - Airflow (dag dependencies)

김태준·2023년 6월 25일
0

Data Enginnering DevCourse

목록 보기
55/93
post-thumbnail

그동안 크론탭을 활용한 주기적인 실행(schedule로 지정한)으로 dag를 실행했다.
하지만 다음과 같은 방법도 존재한다.

  • 타 dag에 의해 trigger. (explicit trigger(A가 B를 분명히 트리거), Reactive trigger(B가 A끝나기를 대기))
  • 이외에 알아두면 좋은 것 (BranchPythonOperator - 조건에 따라 타 태스크로 분기, LatestOnlyOperator-backfill 시 불필요 태스크 처리, 앞단의 태스크 실행 상황들)

✅ Dag Dependencies

: dag간 실행 순서 정리 (task 실패, 순서 정리)

  • Explicit trigger : TriggerDagOperator 활용, DAG A가 명시적으로 DAG B를 트리거
    -> trigger_B = TriggerDagOperator(task_id='trigger_B', trigger_dag_id='트리거하려는DAG이름', conf={'path':'/opt/ml/conf' : DAG B에 넘기고 싶은 정보들), jinja template 활용 필요
    💯 -> jinja template ?
    : django에서 처음 사용된 python에서 사용되는 탬플릿 엔진으로 프레젠테이션 로직과 애플리케이션 로직을 분리해 동적으로 HTML 생성, Flask에서 활용. (변수는 이중 중괄호, 제어문은 {% %} 표시

-> airflow에서 operator 별로 각 변수마다 사용 가능 ex) {{ ds }}, {{ conn }}

💯 Sensor ?
: 특정 조건 충족될 때까지 대기하는 operator, 외부 리소스 가용성이나 특정 조건 완료와 같은 상황 동기화에 유용. Airflow는 자체 내장 sensor존재.
(filesensor, httpsensor, sqlsensor, timesensor, externaltasksensor ... )

  • Reactive trigger : ExternalTaskSensor 활용, DAG B가 DAG A의 태스크가 끝나길 대기 (이 경우 DAG A는 사실 모름)
    -> 단, 두 태스크의 execution date 동일해야 하고 실수발생 가능성 높음. (서로 다른 frequency있으면 활용도 낮아짐)
  • BranchPythonOperator : 상황에 따라 뒤에 실행될 태스크를 동적으로 결정해줌.
    -> TriggerDagOperator 앞에 사용되는 경우 종종 있음.
  • LatestOnlyOperator : time-sensitive한 태스크들이 과거 데이터 backfill 시 실행 중단 처리

🎈 Trigger Rules

  • Upstream 태스크 성공실패 상황에 따라 뒷단 태스크 실행여부 결정하고 싶을 때.
    -> Operator에 trigger_rule 파라미터로 결정 가능. (all_success, all_failed, all_done, one_failed, one_success, none_failed, none_failed_min_one_success)

    ALL_DONE, one_failed처리 활용, 태스크 순서대로 처리 가능.

< 터미널 환경에서 실습 >
docker exec -it schedulerid sh
airflow dags list | grep Source
(trigger)
airflow tasks test SourceDag trigger_task 2023-06-18 -> TargetDag 실행

airflow - postgres 연결 방식

default니까 airflow를 넣는데도 계속에러가 나온다.. 왜 그럴까 계정변경하려고 airflow.cfg를 해도 이런다.

✅ task grouping

  • 태스크 수가 많은 dag라면 태스크를 성격에 따라 관리해줄 필요가 있다.
    현 추세 subdag -> task group
  • 다수의 파일 처리를 하는 dag일 경우
    -> 파일 다운로드 태스크, 파일 체크 태스크, 데이터 처리 태스크로 나누어 구성

✅ dynamic dags

jinja template, yaml 기반 dag를 동적으로 만들기.

  • jinja 기반 dag 자체 템플릿 디자인 후 yaml 통해 앞서 만든 template에 파라미터 제공
  • 비슷한 dag 계속 만들 때 메뉴얼하게 개발되도록 하기 (효율성 높이기)
  • dag 계속해서 생성 VS 한 DAG 내 태스크 늘리는 것 사이의 밸런스 필요.
    -> DAGS폴더 내 yml 파일 수 만큼의 dag 코드 생성해줌. (generator 파이썬 파일 실행으로)

    -> generator 실행은 root 파일에서 진행! (airflow/Programmers_Dag/python3 dags/dynamic_dags/generator.py)
    -> 또는 web UI에서 실행 가능.
profile
To be a DataScientist

0개의 댓글