데이터 엔지니어 과제 중 궁금했던 operator weight_rule 옵션에 관한 내용을 메모한다.
검색 중 airflow에서 동시성에 관여하는 여러 옵션을 소개하는 좋은 블로그 글도 가져왔다.
모든 airflow operator는 operator 기본 클래스인 baseoperator를 상속한다.
따라서 baseoperator의 옵션을 다른 operator에서도 사용가능하다.
ex) PostgresOperator 에는 weight_rule에 대한 파라미터 설명이 없지만
baseoperator를 상속하기 때문에 해당 옵션을 사용할 수 있다.
그럼 Baseoperator에 weight_rule이 어떻게 들어가 있는지 확인해 보았다.


baseoperator의 weight_rule의 값은 DEFAULT_WEIGHT_RULE 값으로 들어가 있고,
abstractoperator의 DEFAULT_WEIGHT_RULE 값을 사용하는 듯하다.


abstractoperator의 DEFAULT_WEIGHT_RULE 값은 airflow/utils/weight_rule.py의 WeightRule 클래스의 DOWNSTREAM 값을 사용한다.
$AIRFLOW_HOME/models/baseoperator.py 또는 각 Operator의 Class 내 매개변수 수정
# airflow/models/baseoperator.py
class BaseOperator:
def __init__(
...
# weight_rule=WeightRule.DOWNSTREAM, # type: str
weight_rule=WeightRule.ABSOLUTE, # type: str
...
)

Airflow Cluster 내에서 동시성에 영향을 줄 수 있는 요소들은 크게 다음으로 나눌 수 있다.

최근 버전의 경우 위 그림에서 설정값들이 이름이 변경
max_active_runs >> max_active_runs_per_dag
task_concurrency >> max_active_tis_per_dag
concurrency >> max_active_tasks_per_dag
core.parallelism(AIRFLOW__CORE__PARALLELISM)
Parallelism(병렬성)은, Scheduler별로 Worker 수와 관계 없이 Airflow Cluster 내에서 동시에 수행될 수 있는(= Runnning state에 진입할 수 있는) Task instance의 전체 갯수
기본 설정 값은 32이며, 최상위의 설정값이므로 후술할 Pool이나 Worker Concurrency가 아무리 높은 값을 가지고 있더라도 동시 실행이 가능한 최대 Task 수는 반드시 이 값을 초과할 수 없다.
2.3.3 버전 이전 문서에서는 Scheduler 당이 아닌 "Scheduler의 수와 상관없이(= 전체가 같은 값을 공유하는 뉘앙스)"로 표현되었으나 표현오류로 수정되었음 (링크)
core.max_active_tasks_per_dag (2.2.0 이전의 dag_concurrency와 같음)
DAG 당 최대로 실행 가능한 Task 수 (1개의 DAG 내 모든 실행이 값을 공유)
core.max_active_runs_per_dag (AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG)
각 DAG 당 실행중인 최대 DAG Run의 수
celery.worker_concurrency (AIRFLOW__CELERY__WORKER_CONCURRENCY)
Celery Worker 사용 시, 각 Worker가 처리할 수 있는 최대 Task instance 수
그러나
core.worker_autoscale옵션이 활성화될 경우 이 옵션은 적용되지 않음
...
[core]
...
paralellism = 32Pool은 Airflow에서 동시 실행을 제어하기 위한 실행 대기열의 개념이다. Parallelism은 Airflow 환경 내의 전체 Task 실행의 최대 수를 제한할 수 있는 Scheduler의 옵션이며, Configuration에서만 설정할 수 있기 때문에 Scheduler 내의 Configuration에 접근해야만 이 값을 변경할 수 있다.
하지만 pool은 Webserver GUI에서 설정과 관리가 가능하며, 여러 개의 pool을 정의하고, Task를 사용자 임의의 기준으로 분류하여 각 pool의 Slot(concurrency)을 제한하여 pool에 따라 개별적인 동시 실행을 제어할 수 있다.
이 Slot의 개념은 다른 동시성 관련 설정값과 다른 특징이 하나 있는데, 바로 각 Task가 가지는 slot 값은 사용자가 유동적으로 조절할 수 있다. Task의 경중에 상관없이 Concurrency에서는 1개의 task로써 동시성을 제어받게 된다.
예를 들어 Task A는 부하 1, Task B는 부하 3을 가지고 있다고 가정해보자. 각 Task의 20개가 동시에 수행될 경우 Server와 Worker에게는 차원이 다른 부하가 될 것이다. 이를 다른 동시성 설정으로는 통제할 수 없지만, Pool을 사용할 경우 slot 값을 부여하여 작업의 무게(Weight)를 반영한 동시성 제어를 수행할 수 있게 한다.
Airflow는 default_pool 이라는 기본 pool을 가지고 있으며, 기본 설정값은 128이다. 이 기본 값은 airflow.cfg의 core.default_pool_task_slot_count을 변경하여 조정할 수 있다.
pool 및 pool_slots 매개변수 삽입task_name = BashOperator(
task_id="test_task",
pool="pool_name", # Pool 이름 입력. 기본값은 "default_pool"
pool_slots=2, # 해당 Task가 사용할 Slot 갯수. 기본값은 1
bash_command="sleep 10",
dag=dag,
)
https://airflow.apache.org/docs/apache-airflow/2.5.3/_modules/airflow/models/abstractoperator.html#AbstractOperator.weight_rule
https://airflow.apache.org/docs/apache-airflow/2.5.3/_modules/airflow/models/baseoperator.html#BaseOperator
https://airflow.apache.org/docs/apache-airflow/2.5.3/_api/airflow/models/baseoperator/index.html
https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/priority-weight.html
https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-2
https://tech.socarcorp.kr/data/2021/06/01/data-engineering-with-airflow.html