[TIL] Airflow의 다양한 고급 기능과 CI / CD 환경에 대해 학습 (3)

이원진·2023년 6월 29일
0

데브코스

목록 보기
53/54
post-thumbnail

학습내용


  1. DAG Dependencies

  2. Task Grouping

  3. Dynamic DAGs

1. DAG Dependencies


  • DAG 실행 방법

    • schedule로 지정해 주기적으로 실행

    • 다른 DAG에 의해 트리거

      • Explicit Trigger: DAG가 다른 DAG를 명시적으로 트리거

        • TriggerDagRunOperator

      • Reactive Trigger: DAG B가 DAG A의 태스크 끝날 때까지 대기

        • DAG A는 해당 사실을 모름

        • ExternalTaskSensor

    • 기타

      • BranchPythonOperator: 조건에 따라 뒤에 실행될 태스크를 동적으로 결정

        • TriggerDagOperator 앞에 사용하기도 함

      • LatestOnlyOperator: 과거 데이터 Backfill 시 Time-sensitive한 태스크들이 실행되는 것을 막음

        • 현재 시간이 지금 태스크의 execution_date보다 미래이고 다음 execution_date보다는 과거일 경우에만 뒤로 실행을 이어가고, 아닐 경우 중단

      from airflow.operators.latest_only import LatestOnlyOperator
      from airflow.operators.empty import EmptyOperator
    
      with DAG(
          dag_id = "latest_only_example",
          schedule = timedelta(hours = 48), # 매 48시간마다 실행되는 DAG로 설정
    
          start_date = datetime(2023, 6, 14),
          catchup = True) as dag:
    
          t1 = EmptyOperator(task_id = "task1")
          t2 = LatestOnlyOperator(task_id = "latest_only")
          t3 = EmptyOperator(task_id = "task3")
          t4 = EmptyOperator(task_id = "task4")
    
          t1 >> t2 >> [t3, t4]

  • TriggerDagRunOperator

    from airflow.operators.trigger_dagrun import TriggerDagRunOperator
    
    trigger_B = TriggerDagRunOpertor(
        task_id = "trigger_B",
        trigger_dag_id = "트리거하려는 DAG 이름",
        conf = {'path': '/opt/ml/conf'},
        execution_date = "{{ds}}"
        reset_dag_run = True,
        wait_for_completion = True
    )
    • conf: DAG B에 넘기려는 정보

    • execution_date: Jinja Template을 사용해 DAG A의 execution_date 전달

    • reset_dag_run: True일 경우, 해당 날짜가 이미 실행되었더라도 재실행

    • wait_for_completion: DAG B가 끝날 때까지 기다릴지 여부 설정(default: False)

  • Jinja Template

    • Python에서 널리 사용되는 템플릿 엔진

      • Django의 템플릿 엔진에서 영감

    • 프레젠테이션 로직과 애플리케이션 로직을 분리해 동적으로 HTML 생성

    • {{variable_name}}: 변수 사용

    • {% %}: 제어문

    • Airflow에서 사용 시 작업 이름, 파라미터 또는 SQL 쿼리와 같은 작업 매개변수를 템플릿화된 문자열로 정의 가능

      • 재사용 가능, 사용자 정의 워크플로우 생성 가능

  • Sensor

    • 특정 조건이 충족될 때까지 대기하는 Operator

    • 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용

    • Airflow가 제공하는 Sensor

      • FileSensor: 지정한 위치에 파일이 생길 때까지 대기

      • HttpSensor: HTTP 요청을 수행하고 지정된 응답이 올 때까지 대기

      • SqlSensor: DB에서 특정 조건을 충족할 때까지 대기

      • TimeSensor: 특정 시간까지 워크플로우 일시 중지

      • ExternalTaskSensor: 다른 Airflow DAG의 특정 작업 완료까지 대기

    • 주기적으로 poke하는 방식으로 동작

      • mode: worker를 하나 붙잡고 poke 중에 sleep할지 혹은 worker를 릴리스하고 다시 잡아서 poke할지 결정하는 파라미터

        • mode = [poke / reschedule]

  • ExternalTaskSensor

    • DAG B의 ExternalTaskSensor 태스크가 DAG A의 특정 태스크가 끝났는지 확인

      • 두 태스크의 Execution Date, frequency가 동일해야 함

      • 두 DAG의 schedule interval이 다를 경우 execution_delta 사용

    from airflow.sensors.external_task import ExternalTaskSensor
    
    waiting_for_end_of_dag_a = ExternalTaskSensor(
        task_id = "waiting_for_end_of_dag_a",
        external_dag_id = "DAG_name",
        external_task_id = "end",
        timeout = 5 * 60,
        mode = "reschedule",
        execution_delta = timedelta(minutes = 5)
    )

  • Trigger Rules

    • Upstream 태스크의 성공 여부에 따라 이후 태스크의 실행 여부 결정

    • Operator에 trigger_rule이란 파라미터로 설정

      • ALL_SUCCESS(default)

      • ALL_FAILED

      • ALL_DONE

      • ONE_SUCESS

      • ONE_FAILED

      • NONE_FAILED

      • NONE_FAILED_MIN_ONE_SUCESS

    from airflow.utils.trigger_rule import TriggerRule
    
    with DAG("trigger_rules", default_args = default_args, schedule = timedelta(1)) as dag:
        t1 = BashOperator(task_id = "print_date", bash_command="date")
        t2 = BashOperator(task_id = "sleep", bash_command = "sleep 5")
        t3 = BashOperator(task_id = "exit", bash_command = "exit 1")
    
        t4 = BashOperator(
            task_id = "final_task",
            bash_command = "echo DONE!",
            trigger_rule = TriggerRule.ALL_DONE
        )
    
        [t1, t2, t3] >> t4

2. Task Grouping


  • 태스크 수가 너무 많은 DAG라면 태스크들의 성격에 따라 관리하기 위해 태스크 그룹핑이 필요

    • 다수의 파일을 처리하는 DAG는 파일 다운로드 태스크, 파일 체크 태스크, 데이터 처리 태스크로 구성

    from airflow.utils.task_group import TaskGroup
    
    start = EmptyOperator(task_id = "start")
    
    with TaskGroup("Download", tooltip = "Tasks for downloading data") as section_1:
        task_1 = EmptyOperator(task_id = "task_1")
        task_2 = BashOperator(task_id = "task_2", bash_command = "echo 1")
        task_3 = EmptyOperator(task_id = "task_3")
    
        task_1 >> [task_2, task_3]
    
    start >> section_1

3. Dynamic DAGs


  • Jinja를 기반으로 DAG 자체의 템플릿을 디자인하고 YAML을 통해 앞서 만든 템플릿에 파라미터 제공

  • DAG를 계속해서 매뉴얼하게 개발하는 것 방지

  • DAG를 계속 만드는 것과 DAG 내에서 태스크를 늘리는 것 사이의 밸런싱 필요

    • 오너가 다르거나 태스크의 수가 너무 많아지는 경우 DAG를 복제하는 것이 더 좋음

메모


  • config API를 사용하기 위해 docker-compose.yaml에 아래와 같이 expose_config를 True로 설정

    x-airflow-common:
        &airflow-common
        ...
        environment:
            &airflow-common-env
            ...
            AIRFLOW_WEBSERVER_EXPOSE_CONFIG: 'true'

0개의 댓글