[6/5] TIL - Dag Dependencies(여러가지 Operator), Task Grouping, Dynamic Dags, DockerOperator

Sangwon Jwa·2024년 6월 5일

데브코스 TIL

목록 보기
44/54
post-thumbnail

📖 학습 주제


  1. Dag Dependencies
  2. Task Grouping
  3. Dynamic Dags
  4. DockerOperator

✏️ 주요 메모 사항 소개


Dag Dependencies

Airflow 사용자는 상황에 맞는 적절한 Operator를 사용해서 여러가지 방법으로 Dag를 실행할 수 있다.

  • 주기적 실행 : schedule로 지정
  • 다른 Dag에 의해 트리거

    • Explicit Trigger : Dag A가 명시적으로 Dag B를 트리거 (TriggerDagOperator)
    • Reactive Trigger : Dag B가 Dag A가 끝나기를 대기 (ExternalTaskSensor)
      • 이 경우 Dag A는 이 사실을 모름
  • 상황에 따른 태스크 실행 방식들

    • 조건에 따라 다른 태스크로 분기 (BranchPythonOperator)
    • 과거 데이터 Backfill 시에는 불필요한 태스크 처리 (LatestOnlyOperator)
    • 앞단 태스크들의 실행 상황
      • 어떤 경우에는 앞단이 실패해도 동작해야하는 경우가 있을 수 있음

Jinja Template

오퍼레이터 뒤에 여러가지 파라미터를 넣어서 'Dag B가 다 끝날때까지 기다릴건지', 'Execution Date은 어떻게 pass하는지', '추가적으로 넘겨줘야 하는 정보가 있는지' 와 같은 부분을 설정할 있는데, 이 때 Jinja Template을 사용한다.

Jinja 템플릿은 Python에서 널리 사용되는 템플릿 엔진으로, 프레젠테이션 로직과 애플리케이션 로직을 분리하여 동적으로 HTML을 생성한다. 규칙은 다음과 같다.

  • 변수는 이중 중괄호 {{ }}로 감싸서 사용
    • <h1>안녕하세요, {{name}}님!</h1>
  • 제어문은 퍼센트 기호 {% %}로 표시
    • {% for item in items %} ... {{item}} ... {% endfor %}

Airflow에서 Jinja 템플릿을 사용하면 작업 이름, 파라미터 또는 SQL 쿼리와 같은 작업 매개변수를 템플릿화된 문자열로 정의가 가능하다. 이를 통해 재사용이 가능하고 사용자 정의 가능한 Workflow를 생성할 수 있다.

# BashOpeator를 사용하여 템플릿 작업 정의
task1 = BashOperator(
	task_id = 'task1',
    
    # execution_date을 코드 내에서 쉽게 사용 {{ ds }}
    bash_command = 'echo "{{ ds }}"',
    dag = dag
)
# 파라미터로 넘어온 변수를 사용하기
# 동적 매개변수가 있는 다른 템플릿 작업 정의

task2 = BashOperator(
	task_id = 'task2',
    bash_command = 'echo "안녕하세요, {{ params.name }}!"',
    params = {'name':'John'}, # 사용자 정의 가능한 매개변수
    dag = dag
)

가능한 모든 시스템 변수는 다음 링크에서 찾아볼 수 있다. https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html

 

  • Airflow에서 사용 가능한 Jinja 변수들
    • {{ ds }}
    • {{ ds_nodash }}
    • {{ ts }}
    • {{ task }}
    • {{ dag_run }}
    • {{ var.value }}
      • {{ var.value.get('my.var', 'fallback') }}
    • {{ var.json }}
      • {{ var.json.my_dict_var.key1 }}
    • {{ conn }}
      • {{ conn.my_conn_id.login }}, {{ conn.my_conn_id.password }}

TriggerDagOperator

Dag A에서 명시적으로 Dag B를 트리거하려면 TriggerDagRunOperator로 구현하면 된다

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger_B = TriggerDagRunOperator(
	task_id = "trigger_b",
    trigger_dag_id = "트리거하려는DAG이름",
    
    # DAG B에 넘기고 싶은 정보 (DAG B에서는 Jinja 템플릿(dag_run.conf["path"])으로 접근 가능
    conf={ 'path':'/opt/ml/conf' },
    
    # JINJA 템플릿을 통해 DAG A의 execution_date을 패스
    execution_date="{{ ds }}",
    
    #True일 경우 해당 날짜에 이미 실행기록이 있더라도 다시 재실행
    reset_dag_run = True,
    
    #Dag B가 끝날 때 까지 기다릴지 결정 (default = False)
    wait_for_completion = True
)


Sensor (ExternalTaskSensor)

Sensor는 특정 조건이 충족될 때까지 대기하는 Operator를 말한다. 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용, Airflow는 몇가지 내장 Sensor를 제공한다.

  • FileSensor : 지정된 위치에 파일이 생길 때 까지 대기
  • HttpSensor : HTTP 요청을 수행하고 지정된 응답을 대기
  • SqlSensor : SQL 데이터베이스에서 특정 조건을 충족할 때까지 대기
  • TimeSensor : 특정 시간에 도달할 때까지 워크플로우를 일시 중지
  • ExternalTaskSensor : 다른 Airflow DAG의 특정 작업 완료를 대기

기본적으로 주기적으로 poke를 하는 것, worker를 하나 붙잡고 poke간에 sleep을 할지 아니면 worker를 릴리스하고 다시 잡아서 poke를 할지 결정해주는 파라미터가 존재(mode)

  • mode의 값은 reschedule 혹은 poke가 됨

DAG B의 ExternalTaskSensor 태스크가 DAG A의 특정 태스크가 끝났는지 체크한다. 다만 이 sensor를 사용하려면 제약사항이 있다.

  • 먼저 동일한 schedule_interval을 사용
  • 이 경우 두 태스크들의 Execution Date이 동일해야함. 아니면 매칭이 안됨

이러한 제약사항 때문에 조심히 사용해야 한다. 잘 쓰지 않는게 좋을 수도...

from airflow.sensors.external_task import ExternalTaskSensor

waiting_for_end_of_dag_a = ExternalTaskSensor(
	task_id = 'waiting_for_end_of_dag_a',
    external_dag_id = 'DAG이름',
    external_task_id = 'end',
    timeout = 5*60,
    mode = 'reschedule'
)

BranchPythonOperator

뒤에 Trigger해야 되는 Task가 여러개가 있는 경우 상황에 따라서 그 중 일부만 Trigger하는 Operator. 미리 정해준 Operator들 중에 선택하는 형태로 돌아간다. TriggerDagOperator 앞에 이 오퍼레이터를 사용하는 경우도 있다.

예시 코드 : https://github.com/learndataeng/learn-airflow/blob/main/dags/Learn_BranchPythonOperator.py

from airflow.operators.python import BranchPythonOperator

# 상황에 따라 뒤에 실행되어야 하는 태스크를 리턴
def skip_or_cont_trigger():
	if Variable.get("mode","dev") == "dev":
    	return []
    else:
    	return ["trigger_b"]
        
# "mode"라는 Variable의 값이 "dev"이면 trigger_b 태스크를 스킵
branching = BranchPythonOperator(
	task_id = "branching",
    python_callable = skip_or_cont_trigger,
)

LatestOnlyOperator

Time-sensitive한 태스크들이 과거 데이터의 backfill시 실행되는 것을 막기 위함, 현재 시간이 지금 태스크가 처리하는 execution_date보다 미래이고 다음 execution_date보다는 과거인 경우에만 뒤로 실행을 이어가고 아니면 여기서 중단

  • t1 >> t3 >> [t2,t4]

예시 코드 : https://github.com/learndataeng/learn-airflow/blob/main/dags/Learn_LatestOnlyOperator.py

from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.empty import EmptyOperator

with DAG(
	dag_id='latest_only_example',
    schedule=timedelta(hours=48), # 매 48시간마다 실행되는 DAG로 설정
    start_date=datetime(2023, 6, 14),
    catchup=True) as dag:
 
    t1 = EmptyOperator(task_id='task1')
    t2 = LatestOnlyOperator(task_id = 'latest_only')
    t3 = EmptyOperator(task_id='task3')
    t4 = EmptyOperator(task_id='task4')
    
    t1 >> t2 >> [t3, t4]

Trigger Rules

Upstream 태스크의 성공, 실패 상황에 따라 뒷단 태스크의 실행여부를 결정하고 싶다면, Operator에 trigger_rule 이란 파라미터로 설정이 가능하다. 보통 앞단이 하나라도 실패하면 뒷 단의 태스크는 실행불가하게 설정한다. trigger_rule의 값으로는 다음과 같은 것들이 있다.

  • ALL_SUCCESS: (default) all parents have succeeded
  • ALL_FAILED: all parents are in a failed or upstream_failed state
  • ALL_DONE: all parents are done with their execution (성공실패 여부와 관계없이)
  • ONE_FAILED:
    • fires as soon as at least one parent has failed, it does not wait for all parents to be done
  • ONE_SUCCESS:
    • fires as soon as at least one parent succeeds, it does not wait for all parents to be done
  • NONE_FAILED:
    • all parents have not failed (or upstream_failed) i.e. all parents have succeeded or been skipped
  • NONE_FAILED_MIN_ONE_SUCCESS
    • one parent at least is done but none failed

예제 코드 : https://github.com/learndataeng/learn-airflow/blob/main/dags/Learn_TriggerRule.py

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta

default_args = {
    'start_date': datetime(2023, 6, 15)
}

with DAG("Learn_TriggerRule", default_args=default_args, schedule=timedelta(1)) as dag:
   t1 = BashOperator(task_id="print_date", bash_command="date")
   t2 = BashOperator(task_id="sleep", bash_command="sleep 5")
   t3 = BashOperator(task_id="exit", bash_command="exit 1")
   t4 = BashOperator(
       task_id='final_task',
       bash_command='echo DONE!',
       trigger_rule=TriggerRule.ALL_DONE
   )
   [t1, t2, t3] >> t4

Task Grouping

태스크 수가 많은 DAG라면 태스크들을 성격에 따라 관리하고 싶은 니즈가 존재한다. Airflow에서는 이런 기능을 제공하기 위해 Task Grouping을 지원한다.

다음과 같이 파일을 다운로드 받는 태스크들과 그 파일들을 처리하는 태스크들이 있다고 가정한다면, 각각을 태스크 그룹으로 만들어서 활용할 수 있다.

예시 코드 : https://github.com/learndataeng/learn-airflow/blob/main/dags/Learn_TaskGroups.py

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
import pendulum

with DAG(dag_id="Learn_Task_Group", start_date=pendulum.today('UTC').add(days=-2), tags=["example"]) as dag:
    start = EmptyOperator(task_id="start")

    # Task Group #1
    with TaskGroup("Download", tooltip="Tasks for downloading data") as section_1:
        task_1 = EmptyOperator(task_id="task_1")
        task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
        task_3 = EmptyOperator(task_id="task_3")

        task_1 >> [task_2, task_3]

    # Task Group #2
    with TaskGroup("Process", tooltip="Tasks for processing data") as section_2:
        task_1 = EmptyOperator(task_id="task_1")

        with TaskGroup("inner_section_2", tooltip="Tasks for inner_section2") as inner_section_2:
            task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
            task_3 = EmptyOperator(task_id="task_3")
            task_4 = EmptyOperator(task_id="task_4")

            [task_2, task_3] >> task_4

    end = EmptyOperator(task_id='end')

    start >> section_1 >> section_2 >> end


Dynamic Dags

Dynamic Dag란 Jinja 템플릿과 YAML을 기반으로 DAG를 동적으로 만든 것을 말한다. Jinja를 기반으로 DAG 자체의 템플릿을 디자인하고, YAML을 통해 앞서 만든 템플릿에 파라미터를 제공하는 형식이다. 이를 통해 비슷한 DAG를 계속해서 매뉴얼하게 개발하는 것을 방지하여 재사용성을 높일 수 있다.

  • DAG를 계속해서 만드는 것과 한 DAG안에서 태스크를 늘리는 것 사이의 밸런스가 중요
    • Owner가 다르거나 태스크의 수가 너무 커지는 경우 DAG를 복제해나가는 것이 더 좋다

만약 yfinance와 같은 데이터소스에서 주식 정보들을 가져오는 DAG가 필요하다고 해보자. 주식 종목, 어느 주기로 DAG를 실행할지와 같은 파라미터만 다를 뿐, 전체적인 구조는 비슷할 것이기 때문에 위의 그림과 같이 yaml파일로 템플릿에 파라미터를 넘겨준 뒤 generator.py를 통해 동적으로 dag를 생성하게 만들 수 있다.

예시 코드 : https://github.com/learndataeng/learn-airflow/tree/main/dags/dynamic_dags

  • 템플릿 : templated_dag.jinja2
from airflow import DAG
from airflow.decorators import task
from datetime import datetime

with DAG(dag_id="get_price_{{ dag_id }}",
    start_date=datetime(2023, 6, 15),
    schedule='{{ schedule }}',
    catchup={{ catchup or True }}) as dag:

    @task
    def extract(symbol):
        return symbol

    @task
    def process(symbol):
        return symbol

    @task
    def store(symbol):
        return symbol

    store(process(extract("{{ symbol }}")))
  • yml 예시 : config_appl.yml
dag_id: 'APPL'
schedule: '@daily'
catchup: False
symbol: 'APPL'
  • generator.py
from jinja2 import Environment, FileSystemLoader
import yaml
import os

file_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(file_dir))
template = env.get_template('templated_dag.jinja2')

for f in os.listdir(file_dir):
    if f.endswith(".yml"):
        with open(f"{file_dir}/{f}", "r") as cf:
            config = yaml.safe_load(cf)
            with open(f"dags/get_price_{config['dag_id']}.py", "w") as f:
                f.write(template.render(config))
  • 만들어진 파일 : get_price_APPL.py
from airflow import DAG
from airflow.decorators import task
from datetime import datetime

with DAG(dag_id="get_price_APPL",
    start_date=datetime(2023, 6, 15),
    schedule='@daily',
    catchup=True) as dag:

    @task
    def extract(symbol):
        return symbol

    @task
    def process(symbol):
        return symbol

    @task
    def store(symbol):
        return symbol

    store(process(extract("APPL")))

0개의 댓글