[데이터 엔지니어링 데브코스] TIL 49일차 - Airflow 고급 기능 배우기(3)

박단이·2024년 1월 5일
0

데브코스 TIL

목록 보기
49/56

오늘 공부한 내용🤓

Jinja Templates

  • Python에서 널리 사용되는 템플릿 엔진
    • Django 템플릿 엔진에서 영감을 받아 개발
    • Jinja를 사용하면 프레젠테이션 로직과 애플리케이션 로직을 분리하여 동적 HTML 생성
    • Flask에서 사용
  • 변수는 이중 중괄호 {{ }}
    <h1> 제 이름은 {{name}} 입니다. </h1>
  • 제어문은 퍼센트 기호 {% %}
<ul>
{% for item in items %}
  <li> {{ item }} </li>
{% endfor %}
</ul>
  • Airflow에서 Jinja 템플릿을 task를 정의할 때 사용하여 작업 이름, 파라미터 또는 SQL 쿼리와 같은 작업 파라미터를 넘겨줄 수 있다.
    => 재사용이 가능하고 사용자 정의 가능한 워크플로우 생성
  • 모든 operator의 모든 parameter에 Jinja 템플릿을 사용할 수 없다.
    공식 문서에서 각 operator의 인자 설명에 (templated)라고 붙어있는 인자만 가능
# jinja 사용한 예시
task = BashOperator(
	task_id="task1",
    dag = dag,
    bash_command = 'echo "Hi, {{ params.name }}!"',
    params = {'name' : 'John'}
)
  • Airflow에서 사용가능한 Jinja 변수들
    • {{ ds }} : execution_date를 YYYY-MM-DD 형태로 반환
    • {{ ds_nodash }} : execution_date를 YYYYMMDD 헝태
    • {{ ts }} : execution_date를 timestamp 형태로 반환
    • {{ dag }} : dag 이름 ( . 을 통해 그 안의 정보에 접근할 수 있다.)
    • {{ task }} : task 이름 ( . 을 통해 그 안의 정보에 접근할 수 있다.)
    • {{ dag_run }} : 넘어온 'conf'의 값을 불러오는 변수
    • {{ var.value }} : variable 값
      {{ var.value.get("변수key", "변수가없을떄default") }}
    • {{ var.json}} : variable의 json 형태일 때
      {{ var.json.my_dict_var.key1 }}
    • {{ conn }} : connection의 정보
      {{ conn.my_conn_id.host }}

Sensor

  • 특정 조건이 충족될 때까지 대기하는 Operator
  • 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황, 동기화에 유용
  • Sensor 종류
    • FileSensor : 지정된 위치에 파일이 생길 떄까지 대기
    • HttpSensor : HTTP 요청을 수행하고 지정된 응답을 대기
    • SqlSensor : SQL DB에서 특정 조건을 충족할 때까지 대기
    • TimeSensor : 특정 시간에 도달할 때까지 워크플로우 일시 중지
    • ExternalTaskSensor : 다른 Airflow DAG의 특정 작업 완료 대기
  • Sensor 구동 원리 : 주기적으로 poke
    • mode에 따라서 worker를 release할 지 sleep할 지 결정
    • mode=reschedule : worker를 잡아서 조건 완료를 확인하고 실패라면 release 했다가 일정 주기후 다시 worker를 잡는 방식
    • mode=poke : worker 하나를 계속 잡고 조건 완료를 확인하고 실패라면 sleep 했다가 다시 확인하는 방식
    • 정확한 주기를 원한다면 poke, 느슨하게 확인해도 된다면 reschedule

Dag Dependencies

[ Dag 실행 방법 ]

  1. 주기적 실행 : Schedule로 지정
  2. 다른 DAG에 의해 트리거
    • Explicit Trigger : Dag A가 분명하게 Dag B를 트리거(TriggerDagOperator)
    • Reactive Trigger : Dag B가 Dag A(Dag A는 그 사실을 모름)가 끝나기를 대기(ExternalTaskSensor)
  3. Task 사이에 순서로 정해주기
    • 조건에 따라 다른 태스크로 분기(BranchPythonOperator)
    • 과거 데이터 backfill를 진행할 때, backfill을 하면 안되는 task와 해야하는 task가 하나의 DAG에 섞여있다면, 하지 않아야 할 task 지정(LatestOnlyOperator)
    • 앞단이 실패해도 뒤에 task가 실행되어야 할 때 (TriggerRules)

TriggerDagOperator

  • DAG A가 끝나면 DAG B를 trigger 하는 operator
  • airflow.cfg의 설정이 필요
    dag_run_conf_overrides_params=true
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger_B = TriggerDagRunOperator(
	task_id = "trigger_B",
    trigger_dag_id = '트리거하려는 DAG 이름',
    conf = {'path' : '/opt/ml/conf'},  # DAG B에 넘겨주고 싶은 정보
    execution_date = "{{ ds }}",  # DAG A의 execution_date를 그대로 넘겨줘도 되고, 특정 날짜로 계산해도 되고, 새로운 값을 넘겨줘도 된다.
    reset_dag_run = True,  # DAG B가 끝날 때까지 기다릴지 여부 (default : False)
    wait_for_completion = True   # True일 경우, DAG B가 이미 주어진 execution_date에 대해 실행되었더라도 다시 재실행
)

ExternalTaskSensor

  • DAG B가 DAG A의 task가 끝나길 기다렸다가 끝나면 실행
    DAG A는 기다리고 있는 상황을 모르기 때문에 (명시하고 있지 않기 때문에) 수정하는 경우 예기치 못한 문제가 생길 수 있다.
  • 동일한 schedule_interval, execution_date 사용해야 한다.
  • schedule_interval이 안 맞으면 execution_delta, execution_date_fun을 사용하여 맞춰줘야 한다.
    -execution_date이 맞지 않으면 매칭이 되지 않는다.
  • 조건이 까다롭기 때문에 정말 필요한 경우가 아니라면 사용하지 않는 것을 추천
from airflow.sensors.external_task import ExternalTaskSensor

waiting_for_end_of_dag_a = ExternalTaskSensor(
	task_id = "task_id",
    external_dag_id = "DAG A 이름",
    external_task_id = "기다리는 task 이름",
    timeout = 5*60,  # poke할 주기(단위 : 초)
    mode='rescheule'/'poke'
)

BranchPythonOperator

  • 상황에 따라 뒤에 실행되어야 하는 task를 동적으로 결정해주는 Operator
  • 미리 정해준 Operator들 중에 선택하는 형태
from airflow.operators.python import BranchPythonOperator

def choose_task():
	if 조건:
    	return ['task2']
    else:
    	return ['task3']

branching = BranchPythonOperator(
	task_id = "task1",
    python_collable = choose_task
)

operator2 = ~~Operator(task_id = 'task2')
operator3 = ~~Operator(task_id = 'task3')

branching >> [operator2, operator3]

LatestOnlyOperator

  • Time-sensitive한 task들이 과거 데이터의 backfill시 실행되는 것을 막기 위함
  • 현재 시간이 지금 task가 처리하는 execution_date보다 미래이고, 다음 execution_date보다 과거인 경우에만 실행한다. 그 외의 경우에는 DAG 실행을 중지한다.
from airflow.operators.latest_only import LatestOnlyOperator

with DAG(
	..., 
    start_date=datetime(~), 
    catchup=True
) as day:
	t1 = ...
	t2 = LatestOnlyOperator(task_id="latest_only")
    t3 = ...
    
    t1 >> t2 >> t3
    # backfill의 경우 t2에서 DAG가 정지한다.

Trigger Rules

  • 앞선 task들의 성공/실패 상황에 따라 뒷단 task의 실행 여부 결정
  • Operator에 trigger_rule 파라미터로 결정
  • airflow.utils.trigger_rule.TriggerRule 안에 있는 변수 사용
    • ALL_SUCCESS : 앞선 task가 모두 성공했을 때 실행(default)
    • ALL_Failed : 앞선 task가 모두 실패했을 때 실행
    • ALL_DONE : 앞선 task가 성공/실패 여부와는 상관없이 모두 완료했을 때 실행
    • ONE_SUCCESS : 앞선 task 하나라도 성공하면 즉시 실행
    • ONE_FAILED : 앞선 task 하나라도 실패하면 즉시 실행
    • NONE_FAILED : 모두 실패하지 않았을 때 (성공 + 스킵 상황일 때)
    • NONE_FAILED_MIN_ONE_SUCCESS : 모두 실패하지 않았고 적어도 1개 이상의 task가 성공했을 때

Task Grouping

  • task 수가 많은 DAG라면 task 성경들에 따라 관리하고자 함
  • 비슷한 종류의 task들을 묶는 것이 일반적
  • TaskGroup 중첩 가능
  • TaskGroup 안에서도 순서 정의 가능
from airflow.utils.task_group import TaskGroup

start = ~operator(~)
with TaskGroup("그룹이름", tooltip="airflow web에서 확인할 내용") as section_1:
	task1 = 
    task2 =
    task3 = 
    
    task1 >> [task2, task3]
    
start >> section_1

느낀 점😊

3강 생각보다 내용이 많았고 이 내용이 꽤나 진국이었다. airflow 배우면서 궁금했던 내용들의 대부분이 오늘을 위해서 남겨둔거였을까...? 정말 신기하고 재미있었다. 4강까지 가는것이 목표였는데 조금 아쉽긴하다...
빨리 배운 내용을 가지고 직접 DAG 작성하고 싶다.

profile
데이터 엔지니어를 꿈꾸는 주니어 입니다!

0개의 댓글