As of Airflow 2.7.0, you need to install the
celeryprovider package to use this executor. This can be done by installingapache-airflow-providers-celery>=3.3.0or by installing Airflow with theceleryextra:pip install 'apache-airflow[celery]'.
CeleryExecutor는 작업자의 수를 확장(scale out)하는 방법 중 하나. 이를 사용하려면 Celery 백엔드(RabbitMQ, Redis, Redis Sentinel 등)를 설정하고, 필요한 의존성(librabbitmq, redis 등)을 설치하며, airflow.cfg 파일에서 executor 매개변수를 CeleryExecutor로 변경하고 관련 Celery 설정을 해야한다. Celery broker 설정에 관한 자세한 내용은 Celery documentation on the topic 참조, Celery Executor 파라미터 설정에 관한 자세한 내용은 Configuration Reference 참조
worker는 자신의 DAGS_FOLDER에 접근할 수 있어야 하며, File System은 사용자가 직접 동기화해야 한다. 일반적인 설정으로는 DAGS_FOLDER를 Git 저장소에 저장하고 Chef, Puppet, Ansible과 같은 도구를 사용하여 환경 내의 머신 간에 이를 동기화하는 방법이 있다. 만약 모든 머신에 공통 마운트 지점이 있다면, 해당 위치에 파이프라인 파일을 공유하는 방식도 가능하다.
worker를 실행하기 위해서는, Airflow 설정을 하고 아래 명령어를 통해 worker를 실행할 수 있다.
airflow celery worker
실행중인 worker를 멈추기 위해서는 아래 명령어를 실행. 이 명령어를 통해 SIGTERM 시그널을 보내 graceful stop을 진행한다.
airflow celery stop
Celery Flower는 Celery 위에 구축된 웹 UI로, 작업자들을 모니터링할 수 있다. Flower 웹 서버를 시작하려면 다음 단축 명령어를 사용
airflow celery flower
airflow celery bundler을 설치하는 방법은 아래 명령어를 통해 가능하다
pip install 'apache-airflow[celery]'

초기에는 두개의 프로세스가 실행
2개의 database도 사용됨
process를 실행하면서 두개의 프로세스도 생성된다
execute() 함수와 같은 작업 로직을 수행
[1] SchedulerProcess processes the tasks and when it finds a task that needs to be done, sends it to the QueueBroker.
[2] SchedulerProcess also begins to periodically query ResultBackend for the status of the task.
[3] QueueBroker, when it becomes aware of the task, sends information about it to one WorkerProcess.
[4] WorkerProcess assigns a single task to a one WorkerChildProcess.
[5] WorkerChildProcess performs the proper task handling functions - execute_command(). It creates a new process - LocalTaskJobProcess.
[6] LocalTaskJobProcess logic is described by LocalTaskJob class. It starts new process using TaskRunner.
[7][8] Process RawTaskProcess and LocalTaskJobProcess is stopped when they have finished their work.
[10][12] WorkerChildProcess notifies the main process - WorkerProcess about the end of the task and the availability of subsequent tasks.
[11] WorkerProcess saves status information in ResultBackend.
[13] When SchedulerProcess asks ResultBackend again about the status, it will get information about the status of the task.
CeleryExecutor를 사용할 때, 작업이 전송될 Celery 큐를 지정할 수 있다. 큐는 BaseOperator의 속성으로 모든 작업은 원하는 큐에 할당될 수 있고, 기본 큐는 airflow.cfg의 operators -> default_queue에 정의된다. 이 설정은 각 task의 큐가 특정되지 않은 경우 할당되는 큐를 정의한다. (현재 띄워져 있는 airflow의 airflow.cfg 파일을 확인해보니 default_queue = default로 설정되어 있다.)
worker는 하나 이상의 큐를 연결할 수 있고, 작업자가 시작될 때(airflow celery worker 명령어 사용), 쉼표로 구분된 큐 이름 목록(공백 없음)을 지정할 수 있다(예: airflow celery worker -q spark,quark). 그러면 이 worker는 지정된 큐에 들어온 작업을 처리한다.
이 설정은 리소스 관점(예: 아주 가벼운 작업의 경우 하나의 작업자가 수천 개의 작업을 문제없이 처리할 수 있는 경우)이나 환경 관점(예: 매우 특정한 환경과 보안 권한이 필요한 작업이 있기 때문에 Spark 클러스터 내에서 작업자가 실행되도록 설정하고자 하는 경우)에서 유용하다.
execute_command() 를 실행. 새로운 프로세스인 LocalTaskJobProcess을 생성한다.LocalTaskJob** 클래스에 의해 정의됨. TaskRunner를 통해 새로운 process를 실행한다.