How to test your DAGs(중요)

우상욱·2024년 3월 11일
0

Airflow

목록 보기
22/22

Unit testing with Pytest


Pytest

  • 테스트를 간단한 코드 작성으로 가능하게
  • 이해하기 쉽고, 보기 좋고, 유용한 실패 정보
  • 널리 사용되는, 잘 정리된 문서
  • 사용하기 쉽고, 확장 가능한

How to test a DAG?


  • DAG Validation Tests
  • DAG/Pipeline Definition Tests
  • Unit Tests
  • Integration Tests
  • End to End Pipeline Tests

DAG Validation Tests

Airflow UI에 DAG가 고장났는지를 확인하기 위한 Test로 보면 편합니다.

  • 유효한지 체크(Dag에 오타가 있는지)
  • 비순환성 체크(순환되어선 안됨)
  • default arguments 체크

DAG/Pipeline Definition Tests

  • task의 총 개수 체크
  • task의 nature 체크
  • task의 종속성 체크(downstream, upstream)

Unit Tests(단위 테스트)

외부 함수 혹은 custom Operators들에 대한 단위 테스트

  • 로직을 체크합니다
  • 유의할 점은 프로세싱 로직을 체크하는 것을 에어플로우 내에서 하는 것이 아닙니다.
  • 이런 유닛 테스트는 스파크, 판다스 내 에서 해야합니다.

Integration Tests(통합 테스트)

production data의 subset을 사용해서, 각각의task들이 잘 작동하는지 확인하는 테스트입니다.

  • task가 데이터를 교환할 수 있는지 체크
  • task의 input을 체크
  • 여러 개의 task간 종속성 체크

End to End Pipeline Tests

데이터 파이프라인을 테스트합니다.

  • output이 정확한지 체크
  • 모든 로직이 정확한지 체크
  • 성능(perfomances) 체크

Create different environments


  1. Development
  • 가짜 작은 데이터 인풋을 활용한 테스트
  • Dag Validation Tests, DAG/pipeline Test, Unit Tests
  1. Test
  • 조금 더 큰 실제 데이터를 인풋으로 테스트
  • Integration Tests
  1. Acceptance
  • production 데이터 셋을 카피해서 인풋으로 테스트
  • End to End Pipeline Tests
  1. Production
  • 최종 유저가 사용하는 모든 Production Data를 인풋으로

Command Line interfaces


  • airflow run
    하나의 task를 실행합니다.
  • airflow list_dags
    모든 DAG의 리스트를 봅니다.
  • airflow dag_state
    DAG run의 상태를 봅니다.
  • airflow task_state
    task의 상태를 봅니다.
  • airflow test
    메타 db의 상태를 기록하는 것, 의존성을 확인하는 것들 전부 없이 task를 테스트합니다.

How to test


  • 다음과 같이 테스트 폴더를 만듭니다.
  • 해당 폴더에 pytest 기반 테스트 코드를 작성합니다.
import pytest
from airflow.models import DagBag


class TestDagValidation:

    LOAD_SECOND_THRESHOLD = 2
    REQUIRED_EMAIL = "owner@test.com"
    EXPECTED_NUMBER_OF_DAGS = 7

    def test_import_dags(self, dagbag):
        """
        Verify that Airflow is able to import all DAGs
        in the repo
        - check for typos
        - check for cycles
        """
        assert len(dagbag.import_errors) == 0, "DAG failures detected! Got: {}".format(
            dagbag.import_errors
        )

    def test_time_import_dags(self, dagbag):
        """
        Verify that DAGs load fast enough
        - check for loading time
        """
        stats = dagbag.dagbag_stats
        slow_dags = list(
            filter(lambda f: f.duration > self.LOAD_SECOND_THRESHOLD, stats)
        )
        res = ", ".join(map(lambda f: f.file[1:], slow_dags))

        assert (
            len(slow_dags) == 0
        ), "The following DAGs take more than {0}s to load: {1}".format(
            self.LOAD_SECOND_THRESHOLD, res
        )

이런 식으로 test를 작성한 뒤에 pytest test_dag_validation.py -v 명령어를 통해서 test를 성공한지 확인합니다.

이런 test는 ci/cd 파이프라인에 적용될 경우, 더욱 더 견고한 인프라를 구축할 수 있습니다.

profile
데이터엔지니어

1개의 댓글

comment-user-thumbnail
2025년 7월 30일

안녕하세요 좋은글 잘봤습니다. 혹시 글 내 이미지 출처가 어딜까여?

답글 달기