LocalExecutor
CeleryExecutor
dag_concurrency
, concurrency(해당 dag만)
)max_active_runs_per_dag
, max_active_runs(해당 dag만)
)SequentialExecutor 내에서는
task1
>> [task2, task3]
>> task4
으로 구성하는 경우,
task1
-> task2 or task3
-> task2, task3 중 앞에서 수행하지 않은 것
-> task4
순으로 동작한다.
즉, 순차적으로 진행된다.
task2, task3
이 동시에, 병렬적으로 수행되게 하려면 어떻게 해야 하는가?
2가지 설정이 선행되어야 한다.
1. 순차 데이터베이스(SQLite
)는 한번에 하나의 writer만 가지기 때문에, 데이터베이스를 변경해야 한다.(postgresql
로 변경)
2. SequentialExecutor
를 LocalExecutor
로 변경해야 한다.
$ sudo apt update
$ sudo apt install postgresql
$ sudo -u postgres psql
postgres=# ALTER USER postgres PASSWORD [PASSWORD]
postgresql과 연결 준비 완료
$ pip install 'apache-airflow[postgres]'
postgres를 airflow의 metastore(metaDB)로 사용할 준비 완료
airflow.cfg의
sql_alchemy_conn = ...
을
sql_alchemy_conn = postgresql+psycopg2://postgres:[PASSWORD]@localhost/postgres
로 변경
저장 후 제대로 설정되었는지 metaDB 확인
$ airflow db check
로 확인할 수 있다.
executor = SequentialExecutor
를executor = LocalExecutor
로 변경$ airflow db init
$ airflow users create \
--username [username] \
--firstname [firstname] \
--lastname [lastname] \
--role Admin \
--password [password] \
--email [email@email.com]
이후 적용시키려면 webserver를 재시작해야 한다.
Gantt를 확인하면 두 가지 작업이 동시에 수행되는 것을 확인할 수 있다.
(Single Machine에서도 자원에 따라 동시에 수행할 수 있음을 알 수 있다.)
CeleryExecutor
KubernetesExecutor
가장 중요한 것은 모든 executor가 순서에 맞게 실행되어야 한다는 점이다. > Queue
를 사용.
LocalExecutor
에서는 Task를 Queue
에 Push, Queue
에서 Pulled out, execute된다.LocalExecutor
로 Single Machine에서 multiple tasks를 parallel하게 수행할 수 있지만, 이 역시 한계가 있을 수 있다. -> Multiple Machines(Workers)에서 Multiple Tasks를 수행해야 할때는? "CeleryExecutor"
Queue 역할로 Redis를 사용한다.
CeleryExecutor
는 tasks를 multiple machines에서 수행 가능하도록 해준다.(반면, LocalExecutor
에서는 tasks를 single machine에서만 수행할 수 있다.)CeleryExecutor
를 사용하고 싶다면 external tool을 install and setup해야 한다.(Redis 같은)$ pip install 'apache-airflow[celery]'
airflow.cfg에서 executor = CeleryExecutor
로 변경
message broker 역할(Queue)로 Redis
를 설치
$ sudo apt update
$ sudo apt install redis-server
$ sudo nano /etc/redis/redis.conf
들어가서supervised no
-> `supervised systemd로 변경 후 저장(ctrl + x)$ sudo systemctl restart redis.service
$ sudo systemctl status redis.service
active : active(running)
상태 확인broker_url
: airflow.cfg의
broker_url - celeryExecutor에서 task를 redis message로 push하기 위해서
사용된다.
broker_url = redis://redis:6379/0
-> broker_url = redis://localhost:6379/0
6379는 포트이고 0은 데이터베이스 이름이 0이라는 뜻이다.
result_backend
: celery에서 해당 작업의 실행과 관련된 일부 metadata가 저장된다.
=> result_backend 설정해야 하는 이유이다.
result_backend = db+postgresql://postgres:airflow@postgres/airflow
-> result_backend = postgresql+psycopg2://postgres:postgres@localhost/postgres
$ pip install 'apache-airflow[redis]'
2.1.0 version에서는 default가 아닌 것으로 보인다. -> Link를 이용해서 버전 변경 후
$ airflow celery flower
로 동작한다.
result_backend = postgresql+psycopg2://postgres:postgres@localhost/postgres
에서result_backend = db+postgresql://postgres:postgres@localhost/postgres
로 변경$ airflow celery worker
- flower UI에서 worker가 추가 된 것을 확인할 수 있다.(이제 해당 worker(machine)에 task를 추가할 수 있다.)
- Trigger 시에 worker에서 성공, 실패 등 task의 상태를 점검할 수 있다.
parallelism
)dag_concurrency
, concurrency
)max_active_runs_per_age
, max_active_runs
)parallelism
: 전체 DAG instance에서 실행 가능한 최대 task 수를 정의ex) LocalExecutor를 사용하더라도 parallelism = 1로 설정하면
SequentialExecutor와 동일하게 동작한다.
dag_concurrency
: 전체 DAG instance에서 동시에 수행할 수 있는 최대 task의 수concurrency
: 각각의 DAG에서 설정할 수 있으며 해당 DAG에서 동시에 수행할 수 있는 task의 수를 제한# x는 해당 DAG에서 동시에 수행할 수 있는 최대 tasks 수
with DAG(..., concurrency = x)
max_active_runs_per_dag
: 전체 DAG instance에서 동시에 수행할 수 있는 최대 DAG의 수max_active_runs
: 해당 DAG에 허용되는 활성 DAG 실행의 최대 수# x는 해당 DAG에 허용되는 활성 DAG 실행의 최대 수
with DAG(..., concurrency = x)
예를 들어) 하나의 DAGrun이 일부
데이터
를 처리하며,
이데이터
는 다음 DAGrun에서 사용된다고 가정하자.
이전 Dagrun이 수행되고 다음 DAGrun이 수행되어야 하므로, 동시에 실행할 수 있는
DAGrun의 수를 제한하는 것이 필요하다. ->max_active_runs
사용
1)
parallelism = 4이고
dags_concurrency = 6이면?
parallelism에 우선순위가 있기 때문에
한 dag에서 동시에 수행 가능한 task의 수는 4이다.
2)
2개의 DAG가 있다.
DAG A [T1, T2] >> T3
DAG B [T1, T2] >> T3
parallelism = 4
dags_concurrency = 2
max_active_runs_per_dag = 1
이면, DAG A, DAG B가 동시에 수행될 수 있는가?
parallelism, dags_concurrency만 봐서는 가능하지만,
한번에 최대 1개의 DAG만 동시에 수행 가능하기 때문에 불가능하다.