[데이터 엔지니어링 데브코스] TIL 50일차 - Airflow 고급 기능 배우기(3)&(4)

박단이·2024년 1월 9일
0

데브코스 TIL

목록 보기
50/56

오늘 공부한 내용🤓

Dynamics DAG

  • 비슷한 기능을 하는 DAG들 각각을 manual하게 개발하지 말고 템플릿을 만들어서 인자를 넘겨주는 형태로 개발하는 DAG
  • DAG를 계속해서 만드는 것과 한 DAG 안에서 task를 늘리는 것 사이의 밸런스가 필요하다.
    • 비슷한 기능을 하는 DAG지만 오너가 다를 경우, 한 DAG 안에 너무 많은 task가 있는 경우는 DAG 단위로 쪼개는 것이 좋다.
    • 비슷한 기능을 하는 DAG와 중복되는 Task는 Dynamics DAG를 사용할 수 있다.
  • 주로 Jinja2파일을 템플릿으로 만들고 yaml 파일에 파라미터를 저장하여 만든다. 템플릿에 파라미터를 적용하기 위한 generator.py 파일을 만들어서 일정한 주기 혹은 조건에 대해 실행한다.
  • DAG 파일을 yaml 파일 수 만큼 만들 수 있기 때문에 generator 파일을 실행하는 주기/조건이 중요하다.
# config_dag.yml
dag_id: 'TEST'
schedule: '@daily'
catchup: False
name: 'name'
# tempaltes_dag.jinja2
from airflow import DAG

with DAG(
	dag_id='get+{{ dag_id }}',
    
)as DAG:
# generator.py
from jinja2 import Environment, FileSystemLoader
import yaml

# jinja 파일을 template 형태로 가지고 있기
env = Environment(loader=FileSystemLoader('jinja_file_dir'))
template = env.get_template(tempaltes_dag.jinja2)

# yaml 파일을 모두 DAG로 만들어 주기
for f in os.listdir('yaml_file_dir'):
	if f.endswith(".yml"):
    	with open(f"yaml_file_dir/{f}", "r") as cf:
        	config = yaml.safe_load(cf)
            with open(f"dags/get_{config['dag_id']}.py", "w") as f:
            	f.write(template.render(config))

production 사용 시 유의할 점

  1. airflow.cfg
    • webserver와 scheduler를 재시작해야 변경사항이 적용
    • [core]의 dags_folder에 DAG들이 들어있는 폴더를 지정해야 한다.
    • [scheduler]의 dag_dir_list_interval을 통해 dags_folder 스캔 주기 명시
  2. Meta DB 업그레이드
    • 기본적으로 sqlite이 설치되어있지만 Postgres이나 MySQL로 변경한다.
    • [database]의 sql_alchemy_conn을 변경하여 설정한다.
    • 변경한 DB를 주기적으로 백업해야한다.
  3. Executor 변경
    • Sequential Executor가 기본적으로 설정되어있지만 이는 Sqlite일 때만 적용가능 하므로 변경이 필수
    • [core]의 executor를 수정한다.
      • singler server : LocalExecutor 또는 CeleryExecutor
      • multi server : CeleryExecutor 또는 KubernetesExecutor
  4. Authentication 활성화 & 강력한 비밀번호 사용
    • 보안을 위해 가능한 VPN 뒤에 위치할 것
  5. Log 데이터 관리
    • [logging]의 base_log_folder, [scheduler]의child_process_log_directory를 확인하고 수정한다.
    • log 데이터는 빠르게 채워지기 때문에 주기적인 cleanup이 필요하다.
    • 필요가 없다면 아예 삭제해도 좋고, 필요가 있다면 S3와 같은 Data Lake에 백업한다.
      이 과정은 DAG로 만들어서 관리할 수 있다.
  6. 최대한 서버 1대의 사양을 늘리는 것으로 버티다가 도저히 감당이 안될 때 분산 서버를 구성하며 cloud를 사용하자.
  7. 주기적인 Meta DB 백업
    • DB의 내용을 암호화했다면 암호키도 같이 백업해야한다.
    • Meta DB가 외부에 있다면 그 DB에 백업을 하고, airflow 환경 내부에 있다면 DAG를 사용하여 외부 DB에 백업을 한다.
  8. Health-check 모니터링
    • web, scheduler, MetaDB의 Health check가 필요하다.
    • HealthCheck API로 항상 체크하고, 모니터링 툴(DataDog, Grafana 등)을 사용할 수도 있다.

느낀 점😊

Dynamic DAG를 사용할 수 있는 건 많을 것 같다. 마지막으로 airflow 주요 환경설정에 대해 정리해주셔서 머리속에 기억을 남기기 좋았다.

profile
데이터 엔지니어를 꿈꾸는 주니어 입니다!

0개의 댓글