[Airflow] 2. Airflow 구성 자세히 살펴보기

Denver·2022년 9월 11일
0

Airflow

목록 보기
4/11
post-thumbnail

airflow 최초 설치 시 metadatabase 초기화 명령어?
airflow initdb

스케줄러란?

The scheduler component is responsible for creatging DagRuns
DagRuns? Dag 생성 담당

그래프뷰
DAG종속성 확인하기 가장 좋은 뷰

Landing Times View
여러 DAG 실행에서 작업을 완료하는 데 걸리는 시간을 모니터링하는 가장 좋은 방법은 무엇입니까?

간트 뷰
Dag 병목 현상 감지하는제 가자 유용

코드 뷰
수정 사항이 DAG에 적용되었는지 여부를 확인하는 데 사용할 수 있는 보기는 무엇입니까?

Executor 란?

@ airflow.cfg
executor = SequentialExecutor

@ docker-compose.yaml
environment:
AIRFLOW_CORE_EXECUTOR: CeleryExecutor

=> AIRFLOW_DORE_EXECUTOR 값이 executor 값을 오버라이드한다

SequentialExecutor

Airflow의 기본 Executor
1번에 1 task 실행. 동시에 여러개 실행 불가
t1 - t2 - t4 인 경우
ㅤ \ t3 /
t1 > t2 > t3 > t4 순서로 실행된다

LocalExecutor

task 병렬 실행 가능
로컬 실행기는 실행시키는 시스템(리소스)에 따라 다르기 때문에 잘 확장되지 않습니다.
executor=LocalExecutor
sqlal =

CeleryExecutor

여러 machine에서 task 실행 위해 salary cluster 사용함으로써 동시에 여러 task 실행 가능
여러 worker 노드에 task 인스턴스 실행을 분산할 수 있습니다.
Celery Executors를 사용하면 원하는 만큼 확장할 수 있고 작업을 다양한 기계에 배포할 수 있습니다.

airflow worker : task 실행 담당

celery queue : result backend + broker
result broker : 실행된 task 상태 저장
broker : 스케줄러가 실행할 task 보내고 worker가 가져간다

DAG 트리거 >
스케줄러가 task t1 브로커로 전송 > 워커중 1개가 task t1 가져감 > 완료되면 태스크 상태 result broker에 저장(DB아무거나 사용 가능) >
스케줄러가 task T2, T3 브로커로 전송 > 워커 2개가 task T2, T2 가져감 > 완료되면 태스크 상태 result broker에 저장 >

celery queue 설치해야함

Concurrency(동시성) : 동시에(병렬로) 실행할 수 있는 작업 및 DAG 실행 수
worker에게 각각 다른 리소스를 할당 할 수있고, 워커들에게 가는 큐 여러개 설정해서 맞는 워커에게 가도록

@Airflpw.cfg parallelism /
AIRFLOWCOREPARALELISM
: 스케줄러당 최대 task 인스턴스 수 정의. 리소스와 스케줄러 수에 따라 정의하기default 32 (ex. 스케줄러 2개면 최대 64개)

max_active_tasks_per_dag / AIRFLOWCOREMAX_ACTIVE_TASKS_PER_DAG
: 각 DAG에서 동시에 실행할 수 있는 최대 작업 인스턴스 수.
default 16

max_active_runs_per_dag / AIRFLOWCOREMAX_ACTIVE_RUNS_PER_DAG
: DAG당 최대 active DAG run 수.
default 16

SQLite는 한번에 readersms 제한 없으나 1 writer 제한있어서 local executor나 celery executer에 SQLite 사용 할 수 없음

각 task가 어느 큐로 갈지 설정하기

큐 생성하기
-q QueueName 옵션을 추가하면 된다

airflow-worker:
<<: *airflow-common
command: celery worker -q <<Queue_Name>>

Dag 란?

from airflow import DAG
from datetime import datetime

with DAG('user_processing', startdate=datetime(2022,8,1), \
schedule_interval='daily', catchup=False) as dag:
	None

Operator 란?

1 operator 1 task


Login, PW : airflow user

profile
까먹었을 미래의 나를 위해

0개의 댓글