그동안 크론탭을 활용한 주기적인 실행(schedule로 지정한)으로 dag를 실행했다.
하지만 다음과 같은 방법도 존재한다.
: dag간 실행 순서 정리 (task 실패, 순서 정리)
- Explicit trigger : TriggerDagOperator 활용, DAG A가 명시적으로 DAG B를 트리거
-> trigger_B = TriggerDagOperator(task_id='trigger_B', trigger_dag_id='트리거하려는DAG이름', conf={'path':'/opt/ml/conf' : DAG B에 넘기고 싶은 정보들), jinja template 활용 필요
💯 -> jinja template ?
: django에서 처음 사용된 python에서 사용되는 탬플릿 엔진으로 프레젠테이션 로직과 애플리케이션 로직을 분리해 동적으로 HTML 생성, Flask에서 활용. (변수는 이중 중괄호, 제어문은 {% %} 표시-> airflow에서 operator 별로 각 변수마다 사용 가능 ex) {{ ds }}, {{ conn }}
💯 Sensor ?
: 특정 조건 충족될 때까지 대기하는 operator, 외부 리소스 가용성이나 특정 조건 완료와 같은 상황 동기화에 유용. Airflow는 자체 내장 sensor존재.
(filesensor, httpsensor, sqlsensor, timesensor, externaltasksensor ... )
- Reactive trigger : ExternalTaskSensor 활용, DAG B가 DAG A의 태스크가 끝나길 대기 (이 경우 DAG A는 사실 모름)
-> 단, 두 태스크의 execution date 동일해야 하고 실수발생 가능성 높음. (서로 다른 frequency있으면 활용도 낮아짐)
- BranchPythonOperator : 상황에 따라 뒤에 실행될 태스크를 동적으로 결정해줌.
-> TriggerDagOperator 앞에 사용되는 경우 종종 있음.
- LatestOnlyOperator : time-sensitive한 태스크들이 과거 데이터 backfill 시 실행 중단 처리
🎈 Trigger Rules
- Upstream 태스크 성공실패 상황에 따라 뒷단 태스크 실행여부 결정하고 싶을 때.
-> Operator에 trigger_rule 파라미터로 결정 가능. (all_success, all_failed, all_done, one_failed, one_success, none_failed, none_failed_min_one_success)
ALL_DONE, one_failed처리 활용, 태스크 순서대로 처리 가능.< 터미널 환경에서 실습 >
docker exec -it schedulerid sh
airflow dags list | grep Source
(trigger)
airflow tasks test SourceDag trigger_task 2023-06-18 -> TargetDag 실행airflow - postgres 연결 방식
default니까 airflow를 넣는데도 계속에러가 나온다.. 왜 그럴까 계정변경하려고 airflow.cfg를 해도 이런다.
- 태스크 수가 많은 dag라면 태스크를 성격에 따라 관리해줄 필요가 있다.
현 추세 subdag -> task group- 다수의 파일 처리를 하는 dag일 경우
-> 파일 다운로드 태스크, 파일 체크 태스크, 데이터 처리 태스크로 나누어 구성
jinja template, yaml 기반 dag를 동적으로 만들기.
- jinja 기반 dag 자체 템플릿 디자인 후 yaml 통해 앞서 만든 template에 파라미터 제공
- 비슷한 dag 계속 만들 때 메뉴얼하게 개발되도록 하기 (효율성 높이기)
- dag 계속해서 생성 VS 한 DAG 내 태스크 늘리는 것 사이의 밸런스 필요.
-> DAGS폴더 내 yml 파일 수 만큼의 dag 코드 생성해줌. (generator 파이썬 파일 실행으로)
-> generator 실행은 root 파일에서 진행! (airflow/Programmers_Dag/python3 dags/dynamic_dags/generator.py)
-> 또는 web UI에서 실행 가능.