[Airflow] 5-3. 데이터 파이프라인 구성하기 : Executor

Denver·2022년 9월 11일
0

Airflow

목록 보기
6/11
post-thumbnail

Executor 란?

@ airflow.cfg
executor = SequentialExecutor

@ docker-compose.yaml
environment:
AIRFLOW_CORE_EXECUTOR: CeleryExecutor

=> AIRFLOW_DORE_EXECUTOR 값이 executor 값을 오버라이드한다

SequentialExecutor

Airflow의 기본 Executor
1번에 1 task 실행. 동시에 여러개 실행 불가
t1 - t2 - t4 인 경우
ㅤ \ t3 /
t1 > t2 > t3 > t4 순서로 실행된다

LocalExecutor

task 병렬 실행 가능
로컬 실행기는 실행시키는 시스템(리소스)에 따라 다르기 때문에 잘 확장되지 않습니다.
executor=LocalExecutor
sqlal =

CeleryExecutor

여러 machine에서 task 실행 위해 salary cluster 사용함으로써 동시에 여러 task 실행 가능
여러 worker 노드에 task 인스턴스 실행을 분산할 수 있습니다.
Celery Executors를 사용하면 원하는 만큼 확장할 수 있고 작업을 다양한 기계에 배포할 수 있습니다.

airflow worker : task 실행 담당

celery queue : result backend + broker
result broker : 실행된 task 상태 저장
broker : 스케줄러가 실행할 task 보내고 worker가 가져간다

DAG 트리거 >
스케줄러가 task t1 브로커로 전송 > 워커중 1개가 task t1 가져감 > 완료되면 태스크 상태 result broker에 저장(DB아무거나 사용 가능) >
스케줄러가 task T2, T3 브로커로 전송 > 워커 2개가 task T2, T2 가져감 > 완료되면 태스크 상태 result broker에 저장 >

celery queue 설치해야함

Concurrency(동시성) : 동시에(병렬로) 실행할 수 있는 작업 및 DAG 실행 수
worker에게 각각 다른 리소스를 할당 할 수있고, 워커들에게 가는 큐 여러개 설정해서 맞는 워커에게 가도록

@Airflpw.cfg parallelism /
AIRFLOWCOREPARALELISM
: 스케줄러당 최대 task 인스턴스 수 정의. 리소스와 스케줄러 수에 따라 정의하기default 32 (ex. 스케줄러 2개면 최대 64개)

max_active_tasks_per_dag / AIRFLOWCOREMAX_ACTIVE_TASKS_PER_DAG
: 각 DAG에서 동시에 실행할 수 있는 최대 작업 인스턴스 수.
default 16

max_active_runs_per_dag / AIRFLOWCOREMAX_ACTIVE_RUNS_PER_DAG
: DAG당 최대 active DAG run 수.
default 16

SQLite는 한번에 readersms 제한 없으나 1 writer 제한있어서 local executor나 celery executer에 SQLite 사용 할 수 없음

각 task가 어느 큐로 갈지 설정하기

큐 생성하기
-q QueueName 옵션을 추가하면 된다

airflow-worker:
<<: *airflow-common
command: celery worker -q <<Queue_Name>>

새 woker 가 추가되었고 "Worker Name" 을 클릭해보면

Queue 를 보면 test_queue 이름으료 큐가 생성되었다

Task가 어느 Queue로 갈지 설정하기

task_a = BashOperator(
task_id = 'task_a',
queue = 'test_queue',
bash_command = 'sleep 10'
)

ip:555 Flower 에서 Worker, Queue, task 확인 가능

Task UUID를 클릭하면 상세 정보를 볼 수 있다.

profile
까먹었을 미래의 나를 위해

0개의 댓글