How to Structure your DAG folder

우상욱·2024년 3월 11일
0

Airflow

목록 보기
19/22

DAG Folder


  • Airflow dags가 위치하는 곳
  • dags_folder라는 파라미터로 정의되었습니다
  • 이 경로는 절대경로입니다.
  • 기본값 : $AIRFLOW_HOME/dags

문제는 너무 많은 DAG들이나, DAGs가 많은 외부 파일들을 이용한다는 점입니다. 이런 문제를 해결하기 위해서 DAG Folder를 어떻게 구조화하는지 알아보겠습니다.

1. Zip

  • 외부 파일들과, DAG들을 패킹해서 하나의 zip file을 만듭니다.
  • DAG는 반드시 zip file의 root에 위치해야합니다.
  • 에어플로우는 zip file을 자동으로 스캔하고 로드할 겁니다.
  • 만약 모듈 종속성이 필요하다면, virtual env 혹은 pip를 사용합니다.

  • helpers.py 파일을 만들어서, 자주 사용되는 모듈들을 만들어 놓습니다.
  • __init__.py 파일을 통해서, 모듈을 불러올 수 있게 합니다.
  • 그리고 해당 task를 다른 DAG 파일에서 불러옵니다.
from functions.helpers import first_task, second_task, third_task

2. DagBag

  • DagBag은 DAGs의 collection입니다.
  • dev, staging, prod 환경에서 동작하는 것을 조금 더 쉽게 만듭니다.
  • 하나의 시스템은 다양한 독립적인 세팅 셋으로 동작할 수 있습니다
  • 기본 DAGs 폴더에서 script를 생성하는 것만으로도, 새로운 DAG 폴더를 만들 수 있습니다.
  • 주의 사항 : 만약 DagBag 안에서 부터 로드되는 Dag가 broken 되었을 경우
    • UI 상에 에러가 표시되지 않습니다.
    • airflow list_dags 명령어를 사용해도, 보이지 않습니다.
    • 웹서버 로그를 통해서 에러를 볼 수는 있습니다.
 INFO - Filling up the DagBag from /usr/local/airflow/dags

DAG bag 폴더는 airflow.cfg를 통해 변경할 수 있습니다.
airflow/dags/add_dagbags.py dags 폴더 내에 py 파일을 만들고, dags 폴더 뿐만 아니라 project_a라는 폴더와, project_b라는 폴더를 dag Bag에 추가합니다.

import os
from airflow.models import DagBag

dags_dirs = ["/usr/local/airflow/project_a", "/usr/local/airflow/project_b"]

for dir in dags_dirs:
   dag_bag = DagBag(os.path.expanduser(dir))

   if dag_bag:
       for dag_id, dag in dag_bag.dags.items():
           globals()[dag_id] = dag

잘 캐치되는 것을 볼 수 있습니다. 하지만 DAGBAG에 추가된 DAG들은 UI 상에서, 에러를 보여주지 못합니다. 이것들은 로그를 통해서 확인해야합니다.

3. .airflowignore

  • 무시되어야하는 DAG 폴더 내에 있는 디렉토리나 파일을 구체화합니다.
  • .gitignore와 유사한 역할을 합니다.
  • 각각의 라인은 정규표현식 패턴으로 작성하면 됩니다.
  • .airflowignore의 범위는 subfolder 뿐 아니라, 현재 디렉토리입니다.
  • airflow가 스캔할 필요없는 DAG 혹은 폴더들을, 무시하게 함으로써 자원을 아낄 수 있습니다.
  • 모범 사례로는, 항상 DAG 폴더 내에 .airflowignore 파일을 위치 시켜야합니다.
profile
데이터엔지니어

0개의 댓글