📖 학습주제
Airflow의 다양한 고급 기능과 CI/CD 환경 설정에 대해 학습 (3)
Dag Dependencies
Dag를 실행하는 방법
- 주기적 실행 : schedule로 지정
- 다른 Dag에 의해 트리거
- Explicit Trigger: Dag A가 분명하게 Dag B를 트리거 (TriggerDagRunOperator)
- Reactive Trigger: Dag B가 Dag A가 끝나기를 대기 (ExternalTaskSensor)
- 알아두면 좋은 상황에 따라 다른 태스크 실행 방식들
- 조건에 따라 다른 태스크로 분기 (BranchPythonOperator)
- 과거 데이터 Backfill시에는 불필요한 태스크 처리 (LatestOnlyOperator)
- 앞단 태스크들의 실행상황 (어떤 경우에는 앞단이 실패해도 동작해야하는 경우가 있을 수 있음)
Explicit trigger

- TriggerDagRunOperator
- DAG A가 명시적으로 DAG B를 트리거
Reactive trigger

- ExternalTaskSensor
- DAG B가 DAG A의 태스크가 끝나기를 대기
- 이 경우 DAG A는 이 사실을 모름
TriggerDagRunOperator
- DAG A의 태스크를 TriggerDagRunOperator로 구현
trigger_B = TriggerDagRunOperator(
task_id="trigger_B",
trigger_dag_id="트리거하려는DAG이름"
)
Jinja Template
- Jinja 템플릿은 Python에서 널리 사용되는 템플릿 엔진
- Django 템플릿 엔진에서 영감을 받아 개발
- Jinja를 사용하면 프레젠테이션 로직과 애플리케이션 로직을 분리하여 동적으로 HTML 생성
- Flask에서 사용됨
- 변수는 이중 중괄호 {{ }}로 감싸서 사용
<h1>안녕하세요, {{ name }}님!</h1>
<ul>
{% for item in items %}
<li>{{ item }}</li>
{% endfor %}
</ul>
Jinja Template + Airflow
- Airflow에서 Jinja 템플릿을 사용하면 작업 이름, 파라미터 또는 SQL 쿼리와 같은 작업 매개변수를 템플릿화된 문자열로 정의 가능
- 이를 통해 재사용가능하고 사용자 정의 가능한 워크플로우 생성
- 모든 오퍼레이터, 파라미터에 쓸 수 있는게 아님 (정해져 있음)
e.g. 1) execution_date을 코드 내에서 쉽게 사용: {{ ds }}
# BashOperator를 사용하여 템플릿 작업 정의
task1 = BashOperator(
task_id='task1',
bash_command='echo "{{ ds }}"',
dag=dag
)
e.g. 2) 파라미터 등으로 넘어온 변수를 쉽게 사용 가능
# 동적 매개변수가 있는 다른 템플릿 작업 정의
task2 = BashOperator(
task_id='task2',
bash_command='echo "안녕하세요, {{ params.name }}!"',
params={'name': 'John'}, # 사용자 정의 가능한 매개변수
dag=dag
)
가능한 시스템 변수
https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
TriggerDagRunOperator에서 Jinja Template 이용
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_B = TriggerDagRunOperator(
task_id="trigger_B",
trigger_dag_id="트리거하려는DAG이름",
conf={ 'path': '/opt/ml/conf' }, # DAG B에 넘기고 싶은 정보. DAG B에서는 Jinja
템플릿(dag_run.conf["path"])으로 접근 가능.
# DAG B PythonOperator(**context)에서라면
kwargs['dag_run'].conf.get('path')
execution_date="{{ ds }}", # Jinja 템플릿을 통해 DAG A의 execution_date을 패스
reset_dag_run=True, # True일 경우 해당 날짜가 이미 실행되었더라는 다시 재실행
wait_for_completion=True # DAG B가 끝날 때까지 기다릴지 여부를 결정. 디폴트값은 False
)
Sensor
- Sensor는 특정 조건이 충족될 때까지 대기하는 Operator
- Sensor는 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용
- Airflow는 몇 가지 내장 Sensor를 제공
- FileSensor : 지정된 위치에 파일이 생길 때까지 대기
- HttpSensor : HTTP 요청을 수행하고 지정된 응답이 대기
- SqlSensor : SQL 데이터베이스에서 특정 조건을 충족할 때까지 대기
- TimeSensor : 특정 시간에 도달할 때까지 워크플로우를 일시 중지
- ExternalTaskSensor : 다른 Airflow DAG의 특정 작업 완료를 대기
- 기본적으로 주기적으로 poke를 하는 것
- worker를 하나 붙잡고 poke간에 sleep를 할지 아니면 worker를 릴리스하고 다시 잡아서 poke(주기적으로 체크)를 할지 결정해주는 파라미터가 존재 : mode (mode의 값은 reschedule 혹은 poke가 됨)
ExternalTaskSensor
- DAG B의 ExternalTaskSensor 태스크가 DAG A의 특정 태스크가 끝났는지 체크함
- 먼저 동일한 schedule_interval을 사용
- 이 경우 두 태스크들의 Execution Date이 동일해야함. 아니면 매칭이 안됨
from airflow.sensors.external_task import ExternalTaskSensor
waiting_for_end_of_dag_a = ExternalTaskSensor(
task_id='waiting_for_end_of_dag_a',
external_dag_id='DAG이름',
external_task_id='end',
timeout=5*60,
mode='reschedule'
)
- DAG A와 DAG B가 서로 다른 schedule interval을 갖는 경우
-> 예를 들어 DAG A가 DAG B보다 5분 먼저 실행된다면?
- execution_delta를 사용
- execution_date_fn을 사용하면 조금더 복잡하게 컨트롤 가능
- 만일 두개의 DAG가 서로 다른 frequency를 갖고 있다면 이 경우 ExternalTaskSensor는 사용불가
from airflow.sensors.external_task import ExternalTaskSensor
waiting_for_end_of_dag_a = ExternalTaskSensor(
task_id='waiting_for_end_of_dag_a',
external_dag_id='DAG이름',
external_task_id='end',
timeout=5*60,
mode='reschedule',
execution_delta=timedelta(minutes=5)
)
BranchPythonOperator
- 상황에 따라 뒤에 실행되어야할 태스크를 동적으로 결정해주는 오퍼레이터
- 미리 정해준 Operator들 중에 선택하는 형태로 돌아감
- TriggerDagOperator 앞에 이 오퍼레이터를 사용하는 경우도 있음
LatestOnlyOperator
- Time-sensitive한 태스크들이 과거 데이터의 backfill시 실행되는 것을 막기 위함
- 현재 시간이 지금 태스크가 처리하는 execution_date보다 미래이고 다음 execution_date보다는 과거인 경우에만 뒤로 실행을 이어가고 아니면 여기서 중단됨
Trigger Rules
- Upstream 태스크의 성공실패 상황에 따라 뒷단 태스크의 실행여부를 결정하고 싶은 경우
-> 보통 앞단이 하나라도 실패하면 뒷 단의 태스크는 실행불가
- Operator에 trigger_rule이란 파라미터로 결정 가능
- trigger_rule은 태스크에 주어지는 파라미터로 다음과 같은 값이 가능
Trigger Rule의 가능값 (airflow.utils.trigger_rule.TriggerRule)
- ALL_SUCCESS (default) : 앞의 태스크들이 모두 성공해야 실행
- ALL_FAILED : 앞의 태스크들이 모두 실패해야 실행
- ALL_DONE : 앞의 태스크들이 모두 실행한 후 실행 (성공실패 여부와 관계없이)
- ONE_FAILED : 앞의 태스크들이 하나라도 실패할 경우 실행
- ONE_SUCCESS : 앞의 태스크들이 하나라도 성공할 경우 실행
- NONE_FAILED : 앞의 태스크들 중 실패한 것이 없을 경우(성공 혹은 스킵) 실행
- NONE_FAILED_MIN_ONE_SUCCESS : 앞의 태스크들이 실패하지 않은 상황에서 하나라도 성공했을 때 실행
Task Grouping
Task Grouping의 필요성
- 태스크 수가 많은 DAG라면 태스크들을 성격에 따라 관리하고 싶은 니즈 존재
- SubDAG이 사용되다가 Airflow 2.0에서 나온 Task Grouping으로 넘어가는 추세
- SubDAG를 비슷한 일을 하는 태스크들을 SubDAG라는 Child Dag로 만들어서 관리
- 다수의 파일 처리를 하는 DAG라면 파일 다운로드 태스크들과 파일 체크 태스크와 데이터 처리 태스크들로 구성

Dynamic Dags
Dynamic Dags
- 비슷한 DAG를 계속해서 매뉴얼하게 개발하는 것을 방지
- DAG를 계속해서 만드는 것과 한 DAG안에서 태스크를 늘리는 것 사이의 밸런스 필요
- 오너가 다르거나 태스크의 수가 너무 커지는 경우 DAG를 복제해나가는 것이 더 좋음