DAG Task 작업의 영향을 받는 대상이 동시성 제약 (concurrency limit)을 갖는 경우 Airflow 에서 적절한 rate limit 을 통해 한 번에 많은 요청이 외부 자원에 전달되지 않도록 제어할 필요가 있다.
예를 들어, DAG에서 BigQuery 테이블 업데이트를 하는 경우 DML Concurrency Limit 로 인해 하나의 테이블에 동시에 접근 가능한 Task 숫자를 제한해야 한다.
(Update DML의 경우 2개의 concurrency 만 허용)
이번 글에서는 Airflow에서 동시성 제어와 관련한 설정들을 살펴보고자 한다.
concurrency is about dealing with lots of things at once but parallelism is about doing lots of things at once.
동시성은 논리적인 개념, 병렬성은 (동시)실행의 관점.
airflow.cfg 의 설정값을 통해서 동시성 제어
: Worker에서 동시에 실행 가능한 타스크 인스턴수의 수 제어. “maximum active tasks anywhere.”max_active_tasks_per_dag (dag_concurrency)
: DAG 당 동시에 스케쥴링되는 Task의 수. 하나의 DAG이 slot을 독점하는 것을 방지.max_active_runs_per_dag
: DAG당 한 순간에 실행 가능한 DAG Run의 개수. Backfilling (catchup=True) 상황과 관련.concurrency
: 모든 DAG Runs 에서의 최대 타스크 인스턴스의 수. DAG parameter에 지정. default는 max_active_tasks_per_dag 사용.max_active_tasks
하나의 DAG Run에서의 최대 타스크 인스턴스 수. Default는 max_active_tasks_per_dag 사용.max_active_runs
DAG Runs의 수 제한. Default는 max_active_runs_per_dag. backfill 상황에서 고려.# Allow a maximum of concurrent 10 tasks across a max of 3 active DAG runs
dag = DAG('my_dag_id', concurrency=10, max_active_runs=3)
: This setting defines the amount of pools available for a task. Pools are a way to limit the number of concurrent instances of an arbitrary group of tasks. max_active_tis_per_dag(task_concurrency)
: concurrency limit for the same task across multiple DAG runs
[그림출처] https://stackoverflow.com/a/63955004/4599185
[그림출처] https://livebook.manning.com/book/data-pipelines-with-apache-airflow
t1 = PythonOperator(pool='my_custom_pool', max_active_tis_per_dag=14)
pools are a way of limiting the number of concurrent instances of a specific type of task. This is great if you have a lot of workers in parallel, but you don’t want to overwhelm a source or destination.