[Airflow] 설정, 예제

JunMyung Lee·2022년 5월 16일
0

데이터

목록 보기
8/14

해당 페이지에서는 Airflow를 통한 데이터를 조회하고 삽입하는 부분에 대해서 서술한다.
Jupyter Notebook을 통해 선행으로 예제가 작성되었으며 이로인한 task는
하나로만 정의하고자 한다.

task를 분할하지 않는 이유는, airflow에서 DataFrame타입을 제대로 return할 수 없기 때문이다.
DataFrame으로 return 할 시, Serialize 관련 예외가 발생하는데, 이때 지원이 제대로 안되는 듯 하다.
동작을 위해서는 DataFrame -> to_json() 을 통한 json 리턴을 해야하는데, 이때 내부의 DataFrame.merge를 사용하기 위해선
또다른 Desialize가 필요하므로, 하나의 task로 간주하고 처리하고자 한다.

Airflow Task

task는 airflow의 기본 실행 단위이다. DAG 안에 task들을 나열한 다음, upstream, downstream으로 종속성을 정렬

  • Operator : DAG를 구축하는데 자주 사용하는 작업 템플릿 클래스
    • BashOperator : bash command를 실행
    • PythonOperator : Python 함수를 실행
    • EmailOperator : Email 발송
    • MySqlOperator : SQL 쿼리 수행
  • Sensor : 외부 이벤트가 발생할 때까지 대기하는 오퍼레이터의 하위 클래스
  • TaskFlow : @task 데코레이터를 사용하여 사용자가 작성한 함수 코드를 task로 인식하게 만드는 기능
    • 이 기능들은 전부 BaseOperator 클래스를 상속받아 구현

표현식

여러 방식이 있지만 <<, >>으로 표현하는게 읽기 쉽고 권장함.
task 끼리 정보를 전달을 하지 않는 것을 권장하지만, task끼리 정보를 전달하려면 XCom의 기능을 사용해야함. ( 해당 예제에서는 사용 X )

Airflow Executor

task가 실행되는 매커니즘. 현재는 Local Executor로 설정

Sequential Executor

Airflow에서 제공하는 기본 Executor. SQLite와 함께 사용할 수 있으며, 한번에 하나의 task만 실행할 수 있어 병렬성을 제공하지 않음

Local Executor

task를 병렬로 실행하는 것이 가능하며, 옵션값을 통해 최대 task를 병렬처리 가능. RDBMS와 사용 가능( PostgreSQL )

Celery Executor

Task를 메시지 브로커에 전달하고, Celery Worker가 Task를 가져가서 실행하는 방식(Redis, RabbitMQ).
Worker 수를 스케일아웃 할 수 있다는 장점이 있지만, 메시지 브로커를 따로 관리해야 하고 워크 프로세스에 대한 모니터링이 필요하다는 단점이 존재한다.

Kubernetes Executor

Task를 스케줄러가 실행가능 상태로 변경하면 메시지 브로커에 전달하는게 아니라 Kubernetes API를 사용하여 Airflow 워커를 pod 형태로 실행한다. 매 Task마다 pod가 생성되므로 가볍고, Worker에 대한 유지 보수가 필요없다는 장점이 존재한다.. 또한 Kubernetes를 활용하여 지속적으로 자원을 점유하지 않기 때문에 효율적으로 자원을 사용 가능하다. 하지만 짧은 Task에도 pod을 생성하는 overhead가 있으며, celery executor에 비해 자료가 적고 구성이 복잡하다는 단점이 있다.

Task instance 상태

DAG가 실행될 때 task instance를 생성.

StatusDescryption
nonetask가 대기열에 올라가지 않음 (종속성이 아직 충족되지 않음)
scheduled스케줄러가 task의 종속성이 충족되고 실행되어야 한다고 결정한 상태
queuedtask가 Executor에 할당되었으며 실행되기를 기다리고 있는 상태
runningtask가 실행 중
successtask 성공
failedtask를 실행하는 동안 오류가 발생하여 실패
skipped조건식, LatestOnly 등으로 인해 작업을 건너뜀
upstream_failedupstream task가 실패했고 trigger rule이 필요한 상태
up_for_retrytask가 실패했지만 재시도 횟수가 남아 있어서 재 스케쥴을 할 예정인 상태
up_for_reschedule해당 task가 reschedule 모드인 sensor라고 나타내는 상태
sensing해당 task가 Smart Sensor 라고 나타냄
removedDAG가 실행된 이후에 task가 삭제됨

Airflow DAG 설정

schedule_interval

DAG의 입력 파라미터에서 정의하고, cron 표현법 또는 datetime.timedelta 객체를 이용하여 표현 가능.
표는 네이버-파파고 번역

PresetRun once a year at midnight of January 1Cron
None예약하지 않고 "외부적으로 트리거된" DAG 전용으로 사용
@once한 번만 예약
@hourly정각에 한 시간에 한 번씩 예약0 * * * *
@daily하루에 한 번 자정에 예약0 0 * * *
@weekly주 1회 일요일 아침 자정에 예약0 0 * * 0
@monthly월 1일 자정에 한 달에 한 번 예약0 0 1 * *
@yearly년에 한 번 1월 1일 자정에 예약0 0 1 1 *

catchup

Scheduler는 DAG의 전체 lifetime을 확인하면서 실행되지 않은 DAG를 실행시긴다. 이렇게 정해진 시간에 실행되지 못한 DAG를 늦게라도 실행하는 것을 catchup이라고 한다.
만약 과거의 작업은 중요하지 않고, 현재 시점의 이후 DAG만 실행되어야 한다면, 설정을 변경할 수 있다.

concurrency

task 기준으로 최대로 실행될 수 있는 개수

max_active_runs

dag 기준으로 동시에 실행될 수 있는 개수

execution_timeout

설정한 시간을 초과하여 실행될 경우, 실패로 처리

depends_on_past

이전 Dag run에서 현재 task에 대해 실패시 이후 처리

Airflow Python 코드 설정

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
          'owner': 'airflow'
        , 'catchup': False                         # 정해진 시간에 실행되지 못한 DAG를 늦게라도 실행 처리
        , 'execution_timeout': timedelta(hours=6)  # 설정한 시간을 초과하여 실행될 경우, 실패로 처리
        , 'depends_on_past': False                 # 이전 Dag run에서 현재 task에 대해 실패시 이후 처리
        , 'schedule_interval':  '@hourly'          # cron 가능, 시간마다 수행
        , 'concurrency': 5                         # 최대 task 실행 개수
        , 'max_active_runs': 2                     # 동시 dag 실행 개수
    }

dag = DAG(
          'dag_id'
        , description = "Sample"
        , default_args = default_args
        , start_date = days_ago(3)
        , tags = ['hourly']
    )
import reservation.DatabaseProperties as data

# 두개의 테이블 테이터를 가지고와서 merge 이후 list로 변경한 데이터 Insert
def work_sample():
    branch_frame = data.getMergeFrame(data.getSqlData('branch'), data.getSqlData('advertise'), 'branch_id')
    branch_group_frame = data.getGroupByFrame(branch_frame)
    data.setSqlData('branch', branch_group_frame.values.tolist())

task_reservation = PythonOperator(
              task_id = 'task_id'
            , python_callable = work_sample
            , dag = dag
        )

task_reservation

0개의 댓글