1. Apache Airflow
- Airbnb에서 만든 워크플로우(workflow) 관리 툴
- Python framework를 이용하여 배치 지향 워크플로우(batch-oriented workflow)를 작성하고 스케줄링 및 모니터링하는 플랫폼
- 워크플로우를 Task의 DAG(directed acyclic graph)로 작성한다.
- 다양한 Task들을 병렬적으로 실행 가능(parellelly executable)
- Airflow webserver에서 제공해주는 웹 인터페이스를 통해 워크플로우 관리 및 모니터링이 용이하다.
- Airflow 인스턴스에 쉽게 플러그인을 설치할 수 있어서 customizing하기가 쉽다.
- 데이터 웨어하우스, 머신러닝, 데이터 분석, A/B테스트, 데이터 인프라 관리 등 여러 데이터 엔지니어링 환경에 유용하게 쓰인다.
2. Key Concepts
2.1 DAG(Directed Acyclic Graph)
- 방향성이 있는 순환하지 않는 그래프
- A -> B -> C (O) / A -> B -> C -> A (X)
2.2 Operator
- Task을 정의하는 데에 사용된다.
- Task의 Warpper 역할
2.3 Task
- Operator를 실행시키면 Task가 된다.
- Task = Operator Instance
2.4 Workflow
- 작업 흐름이라는 뜻
- 의존성으로 연결된 Task들의 집합
- 가장 작은 단위인 Operator들이 모여서 Task들이 되고, Task들이 모여 하나의 DAG가 되고, DAG들이 모여 하나의 Workflow가 된다.
3. Architecture
Airflow에서 제공하는 Executor 중 Celery Executor를 사용한다고 가정하자.
3.1 Airflow Webserver
- Airflow UI
- 워크플로우의 상태를 표시하고, 실행, 재시작, 수동 조작, 로그 확인 등을 할 수 있다.
- Airflow의 metastore에 저장된 로그를 보여주거나 DAG들을 시각화해서 제공한다.
3.2 Airflow Scheduler
- 주기적으로 DAG과 Task들을 모니터링하고 실행할 시기와 순서를 결정한다.
- 지속적으로
dags
폴더에 있는 DAG 파일들을 구문 분석하여 메모리에 각 파일에 대한 DAG를 생성한다.
- 각 DAG에 대해 스케줄링 파라미터와 Task 간의 종속성을 확인한다.
- Task의 상태가 "scheduled"인 경우, 스케줄러는 Executor에게 Task를 queue에 넣도록 지시한다.
- 스케줄러에 의해 이루어지는 모든 작업들은 metastore에 저장된다.
3.3 Celery Executor
- 스케줄러에서 실행하도록 표시된 Task를 수행하고 실제 Task의 실행을 관리한다.
- Celery의 동작 방식에 따라 queue를 이용하여 각 Worker에게 Task를 분산한다.
- Queue에 Task에 대한 메시지를 추가한다. (e.g., DAG ID, Task ID, 실행 날짜 등 포함, 실제 코드는 미포함)
- Executor에는 Squential Executor, Local Executor, Celery Executor, Kubernetes Executor 등이 있다. (Executor 종류에 대한 더 자세한 내용은 링크 참고)
3.4 Airflow Workers
- 실제 Task를 수행한다.
- Worker들은 queue에서 받은 메시지에 따라 DAG 폴더에서 해당 DAG을 찾고 DAG에 있는 해당 Task를 처리한다.
- Worker들은 Task가 완료된 후 metastore에 Task의 상태를 업데이트한다.
3.5 Airflow Flower
- Celery UI
- Worker 상태, queue 상태 등을 확인할 때 사용한다.
3.6 Message Queue(Broker)
- 스케줄러에서 Worker로 메시지를 전달하는 message queue
- Celery에서는 Redis나 RabbitMQ만 사용 가능하다.
- 메타데이터가 저장되는 데이터베이스
- Tasks, DAGs, 각종 변수들, connection 정보 등 상태에 대한 메타데이터 저장한다.
4. How DAG Works
- 유저가 DAG를 작성하여 DAG 폴더에 배치
- 스케줄러가 DAG를 파싱하여 metastore에 저장
- 스케줄러가 metastore를 통해 DagRun 오브젝트를 생성 (DagRun은 유저가 작성한 DAG의 인스턴스)
- DagRun status: Running (문제가 없으면..?)
- DagRun 오브젝트가 각 Task 인스턴스를 만듦
- 스케줄러가 각 Task 인스턴스를 metastore에 생성하고, 생성된 Task 인스턴스들을 Executor로 전달
- Executor의 방식에 따라 Worker들이 Task를 실행
- Task 완료 후 Worker들이 metastore에 상태 및 결과를 저장
- 스케줄러는 각 Task의 실행이 완료 되었는지 확인
- DagRun status: Completed
- 웹 서버가 metastore 정보를 읽어서 UI를 업데이트함