해당 페이지에서는 Airflow를 통한 데이터를 조회하고 삽입하는 부분에 대해서 서술한다.
Jupyter Notebook을 통해 선행으로 예제가 작성되었으며 이로인한 task는
하나로만 정의하고자 한다.
task를 분할하지 않는 이유는, airflow에서 DataFrame타입을 제대로 return할 수 없기 때문이다.
DataFrame으로 return 할 시, Serialize 관련 예외가 발생하는데, 이때 지원이 제대로 안되는 듯 하다.
동작을 위해서는DataFrame -> to_json()
을 통한 json 리턴을 해야하는데, 이때 내부의DataFrame.merge
를 사용하기 위해선
또다른 Desialize가 필요하므로,하나의 task로 간주
하고 처리하고자 한다.
task는 airflow의 기본 실행 단위이다. DAG 안에 task들을 나열한 다음, upstream
, downstream
으로 종속성을 정렬
@task
데코레이터를 사용하여 사용자가 작성한 함수 코드를 task로 인식하게 만드는 기능BaseOperator
클래스를 상속받아 구현여러 방식이 있지만 <<, >>
으로 표현하는게 읽기 쉽고 권장함.
task 끼리 정보를 전달을 하지 않는 것을 권장하지만, task끼리 정보를 전달하려면 XCom의 기능을 사용해야함. ( 해당 예제에서는 사용 X )
task가 실행되는 매커니즘. 현재는 Local Executor
로 설정
Airflow에서 제공하는 기본 Executor. SQLite와 함께 사용할 수 있으며, 한번에 하나의 task만 실행할 수 있어 병렬성을 제공하지 않음
task를 병렬로 실행하는 것이 가능하며, 옵션값을 통해 최대 task를 병렬처리 가능. RDBMS와 사용 가능( PostgreSQL )
Task를 메시지 브로커에 전달하고, Celery Worker가 Task를 가져가서 실행하는 방식(Redis, RabbitMQ).
Worker 수를 스케일아웃 할 수 있다는 장점이 있지만, 메시지 브로커를 따로 관리해야 하고 워크 프로세스에 대한 모니터링이 필요하다는 단점이 존재한다.
Task를 스케줄러가 실행가능 상태로 변경하면 메시지 브로커에 전달하는게 아니라 Kubernetes API를 사용하여 Airflow 워커를 pod 형태로 실행한다. 매 Task마다 pod가 생성되므로 가볍고, Worker에 대한 유지 보수가 필요없다는 장점이 존재한다.. 또한 Kubernetes를 활용하여 지속적으로 자원을 점유하지 않기 때문에 효율적으로 자원을 사용 가능하다. 하지만 짧은 Task에도 pod을 생성하는 overhead가 있으며, celery executor에 비해 자료가 적고 구성이 복잡하다는 단점이 있다.
DAG가 실행될 때 task instance를 생성.
Status | Descryption |
---|---|
none | task가 대기열에 올라가지 않음 (종속성이 아직 충족되지 않음) |
scheduled | 스케줄러가 task의 종속성이 충족되고 실행되어야 한다고 결정한 상태 |
queued | task가 Executor에 할당되었으며 실행되기를 기다리고 있는 상태 |
running | task가 실행 중 |
success | task 성공 |
failed | task를 실행하는 동안 오류가 발생하여 실패 |
skipped | 조건식, LatestOnly 등으로 인해 작업을 건너뜀 |
upstream_failed | upstream task가 실패했고 trigger rule이 필요한 상태 |
up_for_retry | task가 실패했지만 재시도 횟수가 남아 있어서 재 스케쥴을 할 예정인 상태 |
up_for_reschedule | 해당 task가 reschedule 모드인 sensor라고 나타내는 상태 |
sensing | 해당 task가 Smart Sensor 라고 나타냄 |
removed | DAG가 실행된 이후에 task가 삭제됨 |
DAG의 입력 파라미터에서 정의하고, cron 표현법 또는 datetime.timedelta 객체를 이용하여 표현 가능.
표는 네이버-파파고 번역
Preset | Run once a year at midnight of January 1 | Cron |
---|---|---|
None | 예약하지 않고 "외부적으로 트리거된" DAG 전용으로 사용 | |
@once | 한 번만 예약 | |
@hourly | 정각에 한 시간에 한 번씩 예약 | 0 * * * * |
@daily | 하루에 한 번 자정에 예약 | 0 0 * * * |
@weekly | 주 1회 일요일 아침 자정에 예약 | 0 0 * * 0 |
@monthly | 월 1일 자정에 한 달에 한 번 예약 | 0 0 1 * * |
@yearly | 년에 한 번 1월 1일 자정에 예약 | 0 0 1 1 * |
Scheduler는 DAG의 전체 lifetime을 확인하면서 실행되지 않은 DAG를 실행시긴다. 이렇게 정해진 시간에 실행되지 못한 DAG를 늦게라도 실행하는 것을 catchup
이라고 한다.
만약 과거의 작업은 중요하지 않고, 현재 시점의 이후 DAG만 실행되어야 한다면, 설정을 변경할 수 있다.
task 기준으로 최대로 실행될 수 있는 개수
dag 기준으로 동시에 실행될 수 있는 개수
설정한 시간을 초과하여 실행될 경우, 실패로 처리
이전 Dag run에서 현재 task에 대해 실패시 이후 처리
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
'owner': 'airflow'
, 'catchup': False # 정해진 시간에 실행되지 못한 DAG를 늦게라도 실행 처리
, 'execution_timeout': timedelta(hours=6) # 설정한 시간을 초과하여 실행될 경우, 실패로 처리
, 'depends_on_past': False # 이전 Dag run에서 현재 task에 대해 실패시 이후 처리
, 'schedule_interval': '@hourly' # cron 가능, 시간마다 수행
, 'concurrency': 5 # 최대 task 실행 개수
, 'max_active_runs': 2 # 동시 dag 실행 개수
}
dag = DAG(
'dag_id'
, description = "Sample"
, default_args = default_args
, start_date = days_ago(3)
, tags = ['hourly']
)
import reservation.DatabaseProperties as data
# 두개의 테이블 테이터를 가지고와서 merge 이후 list로 변경한 데이터 Insert
def work_sample():
branch_frame = data.getMergeFrame(data.getSqlData('branch'), data.getSqlData('advertise'), 'branch_id')
branch_group_frame = data.getGroupByFrame(branch_frame)
data.setSqlData('branch', branch_group_frame.values.tolist())
task_reservation = PythonOperator(
task_id = 'task_id'
, python_callable = work_sample
, dag = dag
)
task_reservation