위의 3가지 질문에 대한 시작 단계에서의 답변
Download -> Process -> Store
를 수행한다.Airflow를 사용하면
- 데이터 파이프라인을 관리할 수 있다.
- 모니터링할 수 있다.
- 자동적으로 이를 수행할 수 있다.
DAG는 데이터 파이프라인을 나타내는 그래프 객체. 여러 Tasks와 그들의 의존성으로 구성되어 있다. (A DAG is basically a graph object representing your data pipeline composed of different tasks with their dependency)
Web Server
가 Metastore
에서 metadata를 fetchScheduler
Metastore
와 Talk,Executor
에 Task를 보낸다.Executor
가 Metastore
의 Task의 상태를 updateExecutor
내에 Queue를 가지고 있는데, 실행될 Task의 순서가 담겨 있다.Web Server
가 Metastore
에서 metadata를 fetchScheduler
Metastore
와 Talk,Executor
에 Task를 보낸다.Executor
가 trigger하고 싶은 Task를 Queue에 넣는다.(이 때 Queue는 Executor
밖에 있다.)Worker(s)
가 Queue에서 이런 Task들을 빼와서 수행한다.Folder DAGs
)Web Server
와 Scheduler
가 Folder DAGs
를 parse(새로운 데이터 파이프라인을 알기 위해서)Scheduler
에서 Metastore
에 DagRun Object
생성(인스턴스 그 이상의 무엇은 아니다.)Scheduler
가Task Instance
생성,Scheduler
가 Executor
로 Task Instance
를 보냄Executor
가 Task Instance
실행Task Instance
수행 상태 변경(완료)Scheduler
가 완료되었는지 체크DagRun Object
의 상태 -> 완료$ python -m venv sandbox
$ source bin/activate/sandbox
$ pip install wheel
$ pip install apache-airflow==2.1.0 --constraint https://gist.github.com/marclamberti/742efaef5b2d94f44666b0aec020be7c
https://gist.github.com/marclamberti/742efaef5b2d94f44666b0aec020be7c
는 contraint가 적힌 주소airflow 초기 설정
$ airflow db init
- airflow 폴더 생성
- airflow.cfg : 설정 파일
- airflow.db : default로 사용하는 SQLite같은 DB
- logs : 로그
- unittests.cfg : air에 영향을 주지 않고 airflow의 일부 구성을 test하기 유용
- webserver_config.py : 웹 서버의 재구성 시 유용
$ airflow users create -u admin -p admin
$ airflow db upgrade
$ airflow db reset
$ airflow webserver
$ airflow scheduler
$ airflow dags list
$ airflow tasks list 'DAG 이름(위의 DAGs list에서 나온 것 중 하나)'
$ airflow dags trigger ...
왼쪽부터
출처 : Udemy The Complete Hands-On Introduction to Apache Airflow 강의 Section 2