DAG Dependencies
Task Grouping
Dynamic DAGs
DAG 실행 방법
schedule로 지정해 주기적으로 실행
다른 DAG에 의해 트리거
Explicit Trigger: DAG가 다른 DAG를 명시적으로 트리거
Reactive Trigger: DAG B가 DAG A의 태스크 끝날 때까지 대기
DAG A는 해당 사실을 모름
ExternalTaskSensor
기타
BranchPythonOperator: 조건에 따라 뒤에 실행될 태스크를 동적으로 결정
LatestOnlyOperator: 과거 데이터 Backfill 시 Time-sensitive한 태스크들이 실행되는 것을 막음
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id = "latest_only_example",
schedule = timedelta(hours = 48), # 매 48시간마다 실행되는 DAG로 설정
start_date = datetime(2023, 6, 14),
catchup = True) as dag:
t1 = EmptyOperator(task_id = "task1")
t2 = LatestOnlyOperator(task_id = "latest_only")
t3 = EmptyOperator(task_id = "task3")
t4 = EmptyOperator(task_id = "task4")
t1 >> t2 >> [t3, t4]
TriggerDagRunOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_B = TriggerDagRunOpertor(
task_id = "trigger_B",
trigger_dag_id = "트리거하려는 DAG 이름",
conf = {'path': '/opt/ml/conf'},
execution_date = "{{ds}}"
reset_dag_run = True,
wait_for_completion = True
)
conf: DAG B에 넘기려는 정보
execution_date: Jinja Template을 사용해 DAG A의 execution_date 전달
reset_dag_run: True일 경우, 해당 날짜가 이미 실행되었더라도 재실행
wait_for_completion: DAG B가 끝날 때까지 기다릴지 여부 설정(default: False)
Jinja Template
Python에서 널리 사용되는 템플릿 엔진
프레젠테이션 로직과 애플리케이션 로직을 분리해 동적으로 HTML 생성
{{variable_name}}: 변수 사용
{% %}: 제어문
Airflow에서 사용 시 작업 이름, 파라미터 또는 SQL 쿼리와 같은 작업 매개변수를 템플릿화된 문자열로 정의 가능
Sensor
특정 조건이 충족될 때까지 대기하는 Operator
외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용
Airflow가 제공하는 Sensor
FileSensor: 지정한 위치에 파일이 생길 때까지 대기
HttpSensor: HTTP 요청을 수행하고 지정된 응답이 올 때까지 대기
SqlSensor: DB에서 특정 조건을 충족할 때까지 대기
TimeSensor: 특정 시간까지 워크플로우 일시 중지
ExternalTaskSensor: 다른 Airflow DAG의 특정 작업 완료까지 대기
주기적으로 poke하는 방식으로 동작
mode: worker를 하나 붙잡고 poke 중에 sleep할지 혹은 worker를 릴리스하고 다시 잡아서 poke할지 결정하는 파라미터
ExternalTaskSensor
DAG B의 ExternalTaskSensor 태스크가 DAG A의 특정 태스크가 끝났는지 확인
두 태스크의 Execution Date, frequency가 동일해야 함
두 DAG의 schedule interval이 다를 경우 execution_delta 사용
from airflow.sensors.external_task import ExternalTaskSensor
waiting_for_end_of_dag_a = ExternalTaskSensor(
task_id = "waiting_for_end_of_dag_a",
external_dag_id = "DAG_name",
external_task_id = "end",
timeout = 5 * 60,
mode = "reschedule",
execution_delta = timedelta(minutes = 5)
)
Trigger Rules
Upstream 태스크의 성공 여부에 따라 이후 태스크의 실행 여부 결정
Operator에 trigger_rule이란 파라미터로 설정
ALL_SUCCESS(default)
ALL_FAILED
ALL_DONE
ONE_SUCESS
ONE_FAILED
NONE_FAILED
NONE_FAILED_MIN_ONE_SUCESS
from airflow.utils.trigger_rule import TriggerRule
with DAG("trigger_rules", default_args = default_args, schedule = timedelta(1)) as dag:
t1 = BashOperator(task_id = "print_date", bash_command="date")
t2 = BashOperator(task_id = "sleep", bash_command = "sleep 5")
t3 = BashOperator(task_id = "exit", bash_command = "exit 1")
t4 = BashOperator(
task_id = "final_task",
bash_command = "echo DONE!",
trigger_rule = TriggerRule.ALL_DONE
)
[t1, t2, t3] >> t4
태스크 수가 너무 많은 DAG라면 태스크들의 성격에 따라 관리하기 위해 태스크 그룹핑이 필요
from airflow.utils.task_group import TaskGroup
start = EmptyOperator(task_id = "start")
with TaskGroup("Download", tooltip = "Tasks for downloading data") as section_1:
task_1 = EmptyOperator(task_id = "task_1")
task_2 = BashOperator(task_id = "task_2", bash_command = "echo 1")
task_3 = EmptyOperator(task_id = "task_3")
task_1 >> [task_2, task_3]
start >> section_1
Jinja를 기반으로 DAG 자체의 템플릿을 디자인하고 YAML을 통해 앞서 만든 템플릿에 파라미터 제공
DAG를 계속해서 매뉴얼하게 개발하는 것 방지
DAG를 계속 만드는 것과 DAG 내에서 태스크를 늘리는 것 사이의 밸런싱 필요
config API를 사용하기 위해 docker-compose.yaml에 아래와 같이 expose_config를 True로 설정
x-airflow-common:
&airflow-common
...
environment:
&airflow-common-env
...
AIRFLOW_WEBSERVER_EXPOSE_CONFIG: 'true'