Pytest
Airflow UI에 DAG가 고장났는지를 확인하기 위한 Test로 보면 편합니다.
task의 총 개수 체크task의 nature 체크task의 종속성 체크(downstream, upstream)외부 함수 혹은 custom Operators들에 대한 단위 테스트
production data의 subset을 사용해서, 각각의task들이 잘 작동하는지 확인하는 테스트입니다.
task가 데이터를 교환할 수 있는지 체크task의 input을 체크task간 종속성 체크데이터 파이프라인을 테스트합니다.

Dag Validation Tests, DAG/pipeline Test, Unit TestsIntegration TestsEnd to End Pipeline Testsairflow runtask를 실행합니다.airflow list_dagsairflow dag_stateairflow task_statetask의 상태를 봅니다.airflow testtask를 테스트합니다.
import pytest
from airflow.models import DagBag
class TestDagValidation:
LOAD_SECOND_THRESHOLD = 2
REQUIRED_EMAIL = "owner@test.com"
EXPECTED_NUMBER_OF_DAGS = 7
def test_import_dags(self, dagbag):
"""
Verify that Airflow is able to import all DAGs
in the repo
- check for typos
- check for cycles
"""
assert len(dagbag.import_errors) == 0, "DAG failures detected! Got: {}".format(
dagbag.import_errors
)
def test_time_import_dags(self, dagbag):
"""
Verify that DAGs load fast enough
- check for loading time
"""
stats = dagbag.dagbag_stats
slow_dags = list(
filter(lambda f: f.duration > self.LOAD_SECOND_THRESHOLD, stats)
)
res = ", ".join(map(lambda f: f.file[1:], slow_dags))
assert (
len(slow_dags) == 0
), "The following DAGs take more than {0}s to load: {1}".format(
self.LOAD_SECOND_THRESHOLD, res
)
이런 식으로 test를 작성한 뒤에 pytest test_dag_validation.py -v 명령어를 통해서 test를 성공한지 확인합니다.

이런 test는 ci/cd 파이프라인에 적용될 경우, 더욱 더 견고한 인프라를 구축할 수 있습니다.
안녕하세요 좋은글 잘봤습니다. 혹시 글 내 이미지 출처가 어딜까여?