How Airflow Works?

우상욱·2024년 2월 25일
0

Airflow

목록 보기
2/22

One Node Architecture



하나의 노드는 Web Server와, Scheduler, MetaStore, Executor로 구성됩니다.

  1. 먼저 웹서버는 Meta Database에 기반해서 Metadata를 fetch해서 Dag, Task Instance, User Interface에 해당하는 정보를 Display합니다.
  2. Scheduler는 Meta Database 및 Executor와 상호 작용합니다. Dag를 Trigger합니다.
  3. 마지막으로, Executor는 Meta Database를 완료된 Task들의 상태를 업데이트합니다.


Airflow에선 Meta Database를 활용해서 Scheduler, Executor, Webserver는 상호작용합니다.


Executor에겐 내부 Queue가 있습니다. 이 Queue는 Executor의 일부입니다. 만약 Local Executor를 사용할 경우, Queue 안의 Task가 순서대로 실행되는 방식입니다. 이 One Node Architecture 구조는 훌륭하지만, 다양한 데이터파이프라인을 처리하기에는 스케일링이 필요합니다.

Multi Nodes Architecture(Celery)


Celery를 사용하는 것은 여러 컴퓨터에서 작업을 처리하는 방법 중 하나입니다.

  1. 첫번째 노드는 Web Server, Scheduler, Executor를 가지고 있습니다.
  2. 두번째 노드는 MetaStore와 Queue를 가지고 있습니다. 여기서 방금 전 One Node Architecture와 다른 것은, Queue가 노드 밖에 있다는 점입니다. 이 Queue는 다중 컴퓨터와 다중 작업자 노드 사이에 작업을 분산하는데 사용됩니다. Rabbit MQ 혹은 Redis를 사용합니다.
  3. 또한 다량의 Worke Node를 가지고 있습니다.

상호작용

  1. Web Server가 MetaStore에서 데이터를 fetch합니다.
  2. 다음은 Scheduler가 Executor, Metastore와 상호작용합니다.
  3. Executor가 Task를 Queue로 밀어넣습니다.
  4. 각 Worker Node들은 Queue(Rabbit MQ, Redis Queue)에서 각 Task를 가져옵니다.

Task가 Trigger되면 발생하는 일

  1. 폴더에 새 Dag.py가 생긴다면

  2. Scheduler와 Webserver가 Dag를 Parsing합니다.

  3. 이 때 Scheduler는 MetaStore에 DagRun 객체를 생성합니다. 이 때 Dag 객체는 Dag 실행 인스턴스에 불과합니다. DagRun 객체는 데이터베이스에 저장됩니다.

  4. Scheduler는 MetaStore의 상태 스케줄러와 일치하는 TaskInstance를 MetaStore에 만듭니다. 이후 TaskInstance를 상태 Queue에 있는 채로 Executor에게 보냅니다.

  5. Executor가 Task를 Run할 준비가 됐다면, TaskInstance의 상태를 Running으로 업데이트합니다. 그리고 Executor는 MetaStore의 task의 상태를 업데이트합니다.

  6. 만약 Task가 완료됐다면, Executor는 MetaStore의 Task의 상태를 업데이트합니다.

  7. 이후 Scheduler는 task가 끝났는지, 조금 더 task를 수행할 것이 있는지 구분합니다.

  8. 만약 잘 종료 됐다면, Dag Object의 상태는 completed로 전환됩니다.

  9. 마지막으로 WebServer는 UI를 Metastore의 정보를 통해서 업데이트합니다(새로고침하면 MetaStore의 정보를 기반으로 업데이트 됩니다)

Dag 탐지 관련 세팅


Airflow config 파일을 통해 수정할 수 있습니다.

Scheduler

  • min_file_process_interval
    Dag 파일이 parsing 되기까지 기다리는 시간(초)입니다. dag_file을 업데이트했다면, 업데이트 되는 최소의 시간이라고 보시면 됩니다. 기본값은 30초입니다.

  • dag_dir_list_interval
    얼마나 자주 Dags 디렉토리를 찾아서, 새로운 Dags 파일을 찾아낼지에 대한 옵션입니다. 기본값은 5분입니다.

WebServer

  • worker_refresh_interval
    Worker batch를 새롭게 고치기 전에 기다려야하는 시간입니다. 기본적으로 30초입니다. 웹 서버가 30초마다 DAG 폴더에서 새로운 DAG를 Parsing 한다는 것입니다.
profile
데이터엔지니어

0개의 댓글