오늘 공부한 내용🤓
Dynamics DAG
- 비슷한 기능을 하는 DAG들 각각을 manual하게 개발하지 말고 템플릿을 만들어서 인자를 넘겨주는 형태로 개발하는 DAG
- DAG를 계속해서 만드는 것과 한 DAG 안에서 task를 늘리는 것 사이의 밸런스가 필요하다.
- 비슷한 기능을 하는 DAG지만 오너가 다를 경우, 한 DAG 안에 너무 많은 task가 있는 경우는 DAG 단위로 쪼개는 것이 좋다.
- 비슷한 기능을 하는 DAG와 중복되는 Task는 Dynamics DAG를 사용할 수 있다.
- 주로 Jinja2파일을 템플릿으로 만들고 yaml 파일에 파라미터를 저장하여 만든다. 템플릿에 파라미터를 적용하기 위한
generator.py
파일을 만들어서 일정한 주기 혹은 조건에 대해 실행한다.
- DAG 파일을 yaml 파일 수 만큼 만들 수 있기 때문에 generator 파일을 실행하는 주기/조건이 중요하다.
# config_dag.yml
dag_id: 'TEST'
schedule: '@daily'
catchup: False
name: 'name'
# tempaltes_dag.jinja2
from airflow import DAG
with DAG(
dag_id='get+{{ dag_id }}',
)as DAG:
from jinja2 import Environment, FileSystemLoader
import yaml
env = Environment(loader=FileSystemLoader('jinja_file_dir'))
template = env.get_template(tempaltes_dag.jinja2)
for f in os.listdir('yaml_file_dir'):
if f.endswith(".yml"):
with open(f"yaml_file_dir/{f}", "r") as cf:
config = yaml.safe_load(cf)
with open(f"dags/get_{config['dag_id']}.py", "w") as f:
f.write(template.render(config))
production 사용 시 유의할 점
- airflow.cfg
- webserver와 scheduler를 재시작해야 변경사항이 적용
- [core]의
dags_folder
에 DAG들이 들어있는 폴더를 지정해야 한다.
- [scheduler]의
dag_dir_list_interval
을 통해 dags_folder
스캔 주기 명시
- Meta DB 업그레이드
- 기본적으로 sqlite이 설치되어있지만 Postgres이나 MySQL로 변경한다.
- [database]의
sql_alchemy_conn
을 변경하여 설정한다.
- 변경한 DB를 주기적으로 백업해야한다.
- Executor 변경
- Sequential Executor가 기본적으로 설정되어있지만 이는 Sqlite일 때만 적용가능 하므로 변경이 필수
- [core]의
executor
를 수정한다.
- singler server : LocalExecutor 또는 CeleryExecutor
- multi server : CeleryExecutor 또는 KubernetesExecutor
- Authentication 활성화 & 강력한 비밀번호 사용
- Log 데이터 관리
- [logging]의
base_log_folder
, [scheduler]의child_process_log_directory
를 확인하고 수정한다.
- log 데이터는 빠르게 채워지기 때문에 주기적인 cleanup이 필요하다.
- 필요가 없다면 아예 삭제해도 좋고, 필요가 있다면 S3와 같은 Data Lake에 백업한다.
이 과정은 DAG로 만들어서 관리할 수 있다.
- 최대한 서버 1대의 사양을 늘리는 것으로 버티다가 도저히 감당이 안될 때 분산 서버를 구성하며 cloud를 사용하자.
- 주기적인 Meta DB 백업
- DB의 내용을 암호화했다면 암호키도 같이 백업해야한다.
- Meta DB가 외부에 있다면 그 DB에 백업을 하고, airflow 환경 내부에 있다면 DAG를 사용하여 외부 DB에 백업을 한다.
- Health-check 모니터링
- web, scheduler, MetaDB의 Health check가 필요하다.
- HealthCheck API로 항상 체크하고, 모니터링 툴(DataDog, Grafana 등)을 사용할 수도 있다.
느낀 점😊
Dynamic DAG를 사용할 수 있는 건 많을 것 같다. 마지막으로 airflow 주요 환경설정에 대해 정리해주셔서 머리속에 기억을 남기기 좋았다.