[프로그래머스] 데브코스 데이터엔지니어링 TIL Day 53

주재민·2024년 1월 3일
0
post-thumbnail

📖 학습주제

Airflow의 다양한 고급 기능과 CI/CD 환경 설정에 대해 학습 (3)


Dag Dependencies

Dag를 실행하는 방법

  • 주기적 실행 : schedule로 지정
  • 다른 Dag에 의해 트리거
    - Explicit Trigger: Dag A가 분명하게 Dag B를 트리거 (TriggerDagRunOperator)
    - Reactive Trigger: Dag B가 Dag A가 끝나기를 대기 (ExternalTaskSensor)
  • 알아두면 좋은 상황에 따라 다른 태스크 실행 방식들
    - 조건에 따라 다른 태스크로 분기 (BranchPythonOperator)
    - 과거 데이터 Backfill시에는 불필요한 태스크 처리 (LatestOnlyOperator)
    - 앞단 태스크들의 실행상황 (어떤 경우에는 앞단이 실패해도 동작해야하는 경우가 있을 수 있음)

Explicit trigger

  • TriggerDagRunOperator
  • DAG A가 명시적으로 DAG B를 트리거

Reactive trigger

  • ExternalTaskSensor
  • DAG B가 DAG A의 태스크가 끝나기를 대기
    - 이 경우 DAG A는 이 사실을 모름

TriggerDagRunOperator

  • DAG A의 태스크를 TriggerDagRunOperator로 구현
trigger_B = TriggerDagRunOperator(
 task_id="trigger_B",
 trigger_dag_id="트리거하려는DAG이름"
)

Jinja Template

  • Jinja 템플릿은 Python에서 널리 사용되는 템플릿 엔진
    - Django 템플릿 엔진에서 영감을 받아 개발
    - Jinja를 사용하면 프레젠테이션 로직과 애플리케이션 로직을 분리하여 동적으로 HTML 생성
    - Flask에서 사용됨
  • 변수는 이중 중괄호 {{ }}로 감싸서 사용
<h1>안녕하세요, {{ name }}님!</h1>
  • 제어문은 퍼센트 기호 {% %}로 표시
<ul>
{% for item in items %}
 <li>{{ item }}</li>
{% endfor %}
</ul>

Jinja Template + Airflow

  • Airflow에서 Jinja 템플릿을 사용하면 작업 이름, 파라미터 또는 SQL 쿼리와 같은 작업 매개변수를 템플릿화된 문자열로 정의 가능
    - 이를 통해 재사용가능하고 사용자 정의 가능한 워크플로우 생성
  • 모든 오퍼레이터, 파라미터에 쓸 수 있는게 아님 (정해져 있음)
    e.g. 1) execution_date을 코드 내에서 쉽게 사용: {{ ds }}
# BashOperator를 사용하여 템플릿 작업 정의
task1 = BashOperator(
 task_id='task1',
 bash_command='echo "{{ ds }}"',
 dag=dag
)

e.g. 2) 파라미터 등으로 넘어온 변수를 쉽게 사용 가능

# 동적 매개변수가 있는 다른 템플릿 작업 정의
task2 = BashOperator(
 task_id='task2',
 bash_command='echo "안녕하세요, {{ params.name }}!"',
 params={'name': 'John'}, # 사용자 정의 가능한 매개변수
 dag=dag
)

가능한 시스템 변수

https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html

TriggerDagRunOperator에서 Jinja Template 이용

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger_B = TriggerDagRunOperator(
 task_id="trigger_B",
 trigger_dag_id="트리거하려는DAG이름",
 conf={ 'path': '/opt/ml/conf' }, # DAG B에 넘기고 싶은 정보. DAG B에서는 Jinja
                                    템플릿(dag_run.conf["path"])으로 접근 가능.

                                  # DAG B PythonOperator(**context)에서라면
                                    kwargs['dag_run'].conf.get('path')

 execution_date="{{ ds }}", # Jinja 템플릿을 통해 DAG A의 execution_date을 패스
 reset_dag_run=True, # True일 경우 해당 날짜가 이미 실행되었더라는 다시 재실행
 wait_for_completion=True # DAG B가 끝날 때까지 기다릴지 여부를 결정. 디폴트값은 False
)

Sensor

  • Sensor는 특정 조건이 충족될 때까지 대기하는 Operator
  • Sensor는 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용
  • Airflow는 몇 가지 내장 Sensor를 제공
    - FileSensor : 지정된 위치에 파일이 생길 때까지 대기
    - HttpSensor : HTTP 요청을 수행하고 지정된 응답이 대기
    - SqlSensor : SQL 데이터베이스에서 특정 조건을 충족할 때까지 대기
    - TimeSensor : 특정 시간에 도달할 때까지 워크플로우를 일시 중지
    - ExternalTaskSensor : 다른 Airflow DAG의 특정 작업 완료를 대기
  • 기본적으로 주기적으로 poke를 하는 것
    - worker를 하나 붙잡고 poke간에 sleep를 할지 아니면 worker를 릴리스하고 다시 잡아서 poke(주기적으로 체크)를 할지 결정해주는 파라미터가 존재 : mode (mode의 값은 reschedule 혹은 poke가 됨)

ExternalTaskSensor

  • DAG B의 ExternalTaskSensor 태스크가 DAG A의 특정 태스크가 끝났는지 체크함
    - 먼저 동일한 schedule_interval을 사용
    - 이 경우 두 태스크들의 Execution Date이 동일해야함. 아니면 매칭이 안됨
from airflow.sensors.external_task import ExternalTaskSensor

waiting_for_end_of_dag_a = ExternalTaskSensor(
 task_id='waiting_for_end_of_dag_a',
 external_dag_id='DAG이름',
 external_task_id='end',
 timeout=5*60,
 mode='reschedule'
)
  • DAG A와 DAG B가 서로 다른 schedule interval을 갖는 경우
    -> 예를 들어 DAG A가 DAG B보다 5분 먼저 실행된다면?
    - execution_delta를 사용
    - execution_date_fn을 사용하면 조금더 복잡하게 컨트롤 가능
    - 만일 두개의 DAG가 서로 다른 frequency를 갖고 있다면 이 경우 ExternalTaskSensor는 사용불가
from airflow.sensors.external_task import ExternalTaskSensor

waiting_for_end_of_dag_a = ExternalTaskSensor(
 task_id='waiting_for_end_of_dag_a',
 external_dag_id='DAG이름',
 external_task_id='end',
 timeout=5*60,
 mode='reschedule',
 execution_delta=timedelta(minutes=5)
)

BranchPythonOperator

  • 상황에 따라 뒤에 실행되어야할 태스크를 동적으로 결정해주는 오퍼레이터
    - 미리 정해준 Operator들 중에 선택하는 형태로 돌아감
  • TriggerDagOperator 앞에 이 오퍼레이터를 사용하는 경우도 있음

LatestOnlyOperator

  • Time-sensitive한 태스크들이 과거 데이터의 backfill시 실행되는 것을 막기 위함
  • 현재 시간이 지금 태스크가 처리하는 execution_date보다 미래이고 다음 execution_date보다는 과거인 경우에만 뒤로 실행을 이어가고 아니면 여기서 중단됨

Trigger Rules

  • Upstream 태스크의 성공실패 상황에 따라 뒷단 태스크의 실행여부를 결정하고 싶은 경우
    -> 보통 앞단이 하나라도 실패하면 뒷 단의 태스크는 실행불가
  • Operator에 trigger_rule이란 파라미터로 결정 가능
    - trigger_rule은 태스크에 주어지는 파라미터로 다음과 같은 값이 가능

Trigger Rule의 가능값 (airflow.utils.trigger_rule.TriggerRule)

  • ALL_SUCCESS (default) : 앞의 태스크들이 모두 성공해야 실행
  • ALL_FAILED : 앞의 태스크들이 모두 실패해야 실행
  • ALL_DONE : 앞의 태스크들이 모두 실행한 후 실행 (성공실패 여부와 관계없이)
  • ONE_FAILED : 앞의 태스크들이 하나라도 실패할 경우 실행
  • ONE_SUCCESS : 앞의 태스크들이 하나라도 성공할 경우 실행
  • NONE_FAILED : 앞의 태스크들 중 실패한 것이 없을 경우(성공 혹은 스킵) 실행
  • NONE_FAILED_MIN_ONE_SUCCESS : 앞의 태스크들이 실패하지 않은 상황에서 하나라도 성공했을 때 실행

Task Grouping

Task Grouping의 필요성

  • 태스크 수가 많은 DAG라면 태스크들을 성격에 따라 관리하고 싶은 니즈 존재
    - SubDAG이 사용되다가 Airflow 2.0에서 나온 Task Grouping으로 넘어가는 추세
    - SubDAG를 비슷한 일을 하는 태스크들을 SubDAG라는 Child Dag로 만들어서 관리
  • 다수의 파일 처리를 하는 DAG라면 파일 다운로드 태스크들과 파일 체크 태스크와 데이터 처리 태스크들로 구성

Dynamic Dags

Dynamic Dags

  • 비슷한 DAG를 계속해서 매뉴얼하게 개발하는 것을 방지
  • DAG를 계속해서 만드는 것과 한 DAG안에서 태스크를 늘리는 것 사이의 밸런스 필요
    - 오너가 다르거나 태스크의 수가 너무 커지는 경우 DAG를 복제해나가는 것이 더 좋음

0개의 댓글