[airflow] 동시성(Concurrency)에 관여하는 옵션

ehwnghks·2023년 5월 2일

airflow

목록 보기
1/1

데이터 엔지니어 과제 중 궁금했던 operator weight_rule 옵션에 관한 내용을 메모한다.
검색 중 airflow에서 동시성에 관여하는 여러 옵션을 소개하는 좋은 블로그 글도 가져왔다.

Baseoperator

모든 airflow operator는 operator 기본 클래스인 baseoperator를 상속한다.
따라서 baseoperator의 옵션을 다른 operator에서도 사용가능하다.
ex) PostgresOperator 에는 weight_rule에 대한 파라미터 설명이 없지만
baseoperator를 상속하기 때문에 해당 옵션을 사용할 수 있다.

그럼 Baseoperator에 weight_rule이 어떻게 들어가 있는지 확인해 보았다.

airflow/models/baseoperator.py

baseoperator의 weight_rule의 값은 DEFAULT_WEIGHT_RULE 값으로 들어가 있고,
abstractoperator의 DEFAULT_WEIGHT_RULE 값을 사용하는 듯하다.

airflow/models/abstractoperator.py

airflow/utils/weight_rule.py

abstractoperator의 DEFAULT_WEIGHT_RULE 값은 airflow/utils/weight_rule.py의 WeightRule 클래스의 DOWNSTREAM 값을 사용한다.

Weight Rule 옵션

  • downstream - DAG 내의 후순위의 task가 낮은 우선순위를 가진다.
  • upstream - DAG 내의 후순위의 task가 높은 우선순위를 가진다.
  • absolute - 모든 task가 동일한 우선순위를 가진다.

Weight Rule 옵션 변경 방법

$AIRFLOW_HOME/models/baseoperator.py 또는 각 Operator의 Class 내 매개변수 수정

# airflow/models/baseoperator.py 

class BaseOperator:
    def __init__(
        ...
        # weight_rule=WeightRule.DOWNSTREAM,  # type: str
        weight_rule=WeightRule.ABSOLUTE,  # type: str
        ...
    )

Airflow 동시성 제어 구조 개요

Airflow Cluster 내에서 동시성에 영향을 줄 수 있는 요소들은 크게 다음으로 나눌 수 있다.

  • Scheduler 단계에서 설정 가능한 Parallelism과 Pools
  • Celery Worker 사용시에 설정 가능한 Worker Concurrency
  • Scheduler~Metadata Database 사이의 Connection 설정

설정값 이해하기

최근 버전의 경우 위 그림에서 설정값들이 이름이 변경
max_active_runs >> max_active_runs_per_dag
task_concurrency >> max_active_tis_per_dag
concurrency >> max_active_tasks_per_dag

Airflow configuration

  1. core.parallelism(AIRFLOW__CORE__PARALLELISM)
    Parallelism(병렬성)은, Scheduler별로 Worker 수와 관계 없이 Airflow Cluster 내에서 동시에 수행될 수 있는(= Runnning state에 진입할 수 있는) Task instance의 전체 갯수
    기본 설정 값은 32이며, 최상위의 설정값이므로 후술할 Pool이나 Worker Concurrency가 아무리 높은 값을 가지고 있더라도 동시 실행이 가능한 최대 Task 수는 반드시 이 값을 초과할 수 없다.

    2.3.3 버전 이전 문서에서는 Scheduler 당이 아닌 "Scheduler의 수와 상관없이(= 전체가 같은 값을 공유하는 뉘앙스)"로 표현되었으나 표현오류로 수정되었음 (링크)

  2. core.max_active_tasks_per_dag (2.2.0 이전의 dag_concurrency와 같음)
    DAG 당 최대로 실행 가능한 Task 수 (1개의 DAG 내 모든 실행이 값을 공유)

  3. core.max_active_runs_per_dag (AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG)
    각 DAG 당 실행중인 최대 DAG Run의 수

  4. celery.worker_concurrency (AIRFLOW__CELERY__WORKER_CONCURRENCY)
    Celery Worker 사용 시, 각 Worker가 처리할 수 있는 최대 Task instance 수

    그러나 core.worker_autoscale 옵션이 활성화될 경우 이 옵션은 적용되지 않음

    적용방법

    • $AIRFLOW_HOME/airflow.cfg >> 해당 옵션 값 변경 (공식 문서)
      ...
      [core]
      ...
      paralellism = 32
    • Airflow Scheduler 재시작

Pools

Pool은 Airflow에서 동시 실행을 제어하기 위한 실행 대기열의 개념이다. Parallelism은 Airflow 환경 내의 전체 Task 실행의 최대 수를 제한할 수 있는 Scheduler의 옵션이며, Configuration에서만 설정할 수 있기 때문에 Scheduler 내의 Configuration에 접근해야만 이 값을 변경할 수 있다.
하지만 pool은 Webserver GUI에서 설정과 관리가 가능하며, 여러 개의 pool을 정의하고, Task를 사용자 임의의 기준으로 분류하여 각 pool의 Slot(concurrency)을 제한하여 pool에 따라 개별적인 동시 실행을 제어할 수 있다.
이 Slot의 개념은 다른 동시성 관련 설정값과 다른 특징이 하나 있는데, 바로 각 Task가 가지는 slot 값은 사용자가 유동적으로 조절할 수 있다. Task의 경중에 상관없이 Concurrency에서는 1개의 task로써 동시성을 제어받게 된다.
예를 들어 Task A는 부하 1, Task B는 부하 3을 가지고 있다고 가정해보자. 각 Task의 20개가 동시에 수행될 경우 Server와 Worker에게는 차원이 다른 부하가 될 것이다. 이를 다른 동시성 설정으로는 통제할 수 없지만, Pool을 사용할 경우 slot 값을 부여하여 작업의 무게(Weight)를 반영한 동시성 제어를 수행할 수 있게 한다.
Airflow는 default_pool 이라는 기본 pool을 가지고 있으며, 기본 설정값은 128이다. 이 기본 값은 airflow.cfgcore.default_pool_task_slot_count을 변경하여 조정할 수 있다.

적용방법

  • DAG 내 Task 정의 시 poolpool_slots 매개변수 삽입
task_name = BashOperator(
	task_id="test_task",
    pool="pool_name", # Pool 이름 입력. 기본값은 "default_pool"
    pool_slots=2, # 해당 Task가 사용할 Slot 갯수. 기본값은 1
    bash_command="sleep 10",
    dag=dag,
)

주의사항

  • default_pool은 삭제 불가능
  • 1개의 Task는 1개의 Pool에만 등록 가능
  • SubDAG을 사용할 경우 SubDAG 안의 Task들에만 Pool 적용 가능. SubDAGOperator 자체는 Pool에 등록되지 않음
  • 잘못된 Pool 이름 입력 시 Task가 작동하지 않으므로 오타로 인해 작동불능이 되지 않도록 주의할 것

💡 출처

  1. https://velog.io/@graphy-young/Apache-Airflow%EC%9D%98-%EB%8F%99%EC%8B%9C%EC%84%B1-%EC%84%A4%EC%A0%95-%EC%9D%B4%ED%95%B4%ED%95%98%EA%B8%B0-%EC%9D%B4%EB%A1%A0%ED%8E%B8

  2. https://airflow.apache.org/docs/apache-airflow/2.5.3/_modules/airflow/models/abstractoperator.html#AbstractOperator.weight_rule
    https://airflow.apache.org/docs/apache-airflow/2.5.3/_modules/airflow/models/baseoperator.html#BaseOperator
    https://airflow.apache.org/docs/apache-airflow/2.5.3/_api/airflow/models/baseoperator/index.html
    https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/priority-weight.html

  3. https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-2

  4. https://tech.socarcorp.kr/data/2021/06/01/data-engineering-with-airflow.html

profile
반갑습니다.

0개의 댓글