How to use dbt-expectations to detect data quality issues

문주은·2024년 7월 9일

Great Expectations와 dbt를 이용한 데이터 검증 관리

1. 개요

  1. Expectation Suite 생성 및 관리: Great Expectations를 사용하여 데이터에 대한 기대 조건을 정의합니다.
  2. 주기적 검증 (스케줄링): dbt의 run-operation 명령어와 함께 dbt Cloud나 Airflow를 사용하여 검증을 주기적으로 실행합니다.
  3. 결과 모니터링: 검증 결과를 지속적으로 모니터링하고 알림을 설정합니다.

2. 설정 과정

2-1. Great Expectations 초기화 및 Expectation Suite 생성

먼저 Great Expectations 프로젝트를 초기화하고 Expectation Suite를 생성합니다.

# Great Expectations 초기화
great_expectations init

great_expectations 디렉토리가 생성되며, 여기에서 데이터를 로드하고 Expectation Suite를 정의할 수 있습니다.

2-2. dbt 프로젝트 초기화

dbt 프로젝트를 초기화하고, 필요한 데이터 소스를 설정합니다.

# dbt 프로젝트 초기화
dbt init my_dbt_project
cd my_dbt_project

# profiles.yml 파일을 설정하여 데이터베이스 연결을 설정합니다.

2-3. dbt 모델 생성 및 검증 스크립트 작성

dbt 모델을 생성하고, 이를 기반으로 Great Expectations 검증을 수행하는 스크립트를 작성합니다.

dbt 모델 생성

models/my_model.sql 파일을 생성합니다.

-- models/my_model.sql
with source as (
    select * from {{ ref('my_source_table') }}
)
select * from source
dbt 실행 후 Great Expectations 검증 스크립트 작성

models/great_expectations_validation.sql 파일을 생성합니다.

-- models/great_expectations_validation.sql
{% set results = run_query('SELECT * FROM {{ ref('my_model') }} LIMIT 1') %}
{{ return(results) }}

2-4. dbt Run-Operation 명령어로 Great Expectations 검증 실행

dbt의 run-operation 명령어를 사용하여 Great Expectations 검증을 실행하는 스크립트를 작성합니다.

Python 스크립트 작성

great_expectations_validation.py 파일을 생성합니다.

import subprocess
from great_expectations.data_context import DataContext

def validate_data():
    context = DataContext()
    
    # Checkpoint 이름과 Expectation Suite 이름을 설정합니다.
    checkpoint_name = "my_quickstart_checkpoint"
    expectation_suite_name = "suite"

    # Checkpoint를 실행하여 검증을 수행합니다.
    results = context.run_checkpoint(checkpoint_name=checkpoint_name)

    # 검증 결과를 확인합니다.
    if not results.success:
        raise ValueError("데이터 검증 실패")

if __name__ == "__main__":
    subprocess.run(["dbt", "run"])
    validate_data()

2-5. dbt Cloud 또는 Airflow에서 스케줄링

dbt Cloud나 Airflow를 사용하여 주기적으로 스크립트를 실행합니다.

dbt Cloud에서 스케줄링

dbt Cloud의 작업(job) 설정에서 스크립트를 스케줄링할 수 있습니다.

  1. dbt Cloud에 로그인하고 프로젝트를 선택합니다.
  2. Jobs 메뉴에서 새로운 작업을 생성합니다.
  3. Commands 섹션에 dbt run-operation 명령어를 추가합니다.
  4. 스케줄을 설정하여 주기적으로 작업이 실행되도록 합니다.
Airflow에서 스케줄링

Airflow DAG을 생성하여 주기적으로 스크립트를 실행할 수 있습니다.

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
}

dag = DAG(
    'dbt_ge_validation',
    default_args=default_args,
    schedule_interval='@daily',
)

run_dbt_and_ge_validation = BashOperator(
    task_id='run_dbt_and_ge_validation',
    bash_command='python /path/to/your/great_expectations_validation.py',
    dag=dag,
)

3. dbt-expectations Tests

Reference: https://hub.getdbt.com/metaplane/dbt_expectations/latest

dbt-expectation에는 많은 tests 과정들이 있지만 그중에서도 아래와 같은 data quality test를 진행했습니다.

CategoryTest NameTest Description
Table shapeexpect_row_values_to_have_recent_data날짜 컬럼이 최근 N일 이내의 값을 포함하는지 확인 (데이터 freshness 체크에 사용)
Table shapeexpect_grouped_row_values_to_have_recent_data그룹핑된 데이터 내에서 각 그룹이 최근 N일 이내의 데이터를 포함하고 있는지 확인
Table Shapeexpect_table_aggregation_to_equal_other_table집계 결과(예: SUM, COUNT 등)가 다른 테이블의 집계 결과와 같은지 비교
Missing valuesexpect_column_values_to_not_be_null특정 컬럼에 NULL이 없어야 함
Unique valuesexpect_column_values_to_be_unique컬럼 값이 유일해야 함 (중복 불가)
Sets and rangesexpect_column_values_to_be_in_set컬럼 값이 주어진 값 집합 안에 있는지 확인
Sets and rangesexpect_column_values_to_be_between값이 특정 최소/최대값 사이에 있어야 함
Stringexpect_column_value_lengths_to_be_between문자열 길이가 지정된 최소~최대 길이 사이인지 확인
Stringexpect_column_value_lengths_to_equal문자열 길이가 특정 값과 정확히 일치하는지 확인
Stringexpect_column_values_to_match_like_patternSQL LIKE 패턴과 일치하는지 확인 (예: %abc%)
Stringexpect_column_values_to_match_regex정규 표현식에 일치하는지 확인
Aggregateexpect_column_distinct_count_to_equal유니크한 값 개수가 지정된 숫자와 정확히 일치해야 함
Aggregateexpect_column_distinct_count_to_be_greater_than유니크한 값의 개수가 지정된 숫자보다 커야 함
Aggregateexpect_column_distinct_count_to_be_less_than유니크한 값 개수가 지정된 숫자보다 작아야 함
Aggregateexpect_column_distinct_values_to_be_in_set고유값들이 특정 집합 내에 포함되어야 함
Aggregateexpect_column_distinct_values_to_contain_set특정 고유값 집합이 반드시 포함되어야 함 (예: 특정 필수 코드)
Aggregateexpect_column_proportion_of_unique_values_to_be_between고유값 비율 (고유값 수 / 전체 row 수)이 특정 범위에 있어야 함
Aggregateexpect_column_stdev_to_be_between표준편차가 지정된 범위 내에 있어야 함
Aggregateexpect_column_sum_to_be_between전체 합계가 특정 범위 내에 있어야 함

4. 결과 모니터링 및 알림 설정

Great Expectations는 검증 결과를 Data Docs로 시각화합니다. 검증 실패 시 이메일 알림 또는 Slack 알림을 설정할 수 있습니다.

from great_expectations.data_context import DataContext
from great_expectations.checkpoint import SimpleCheckpoint

context = DataContext()

checkpoint_config = {
    "name": "my_checkpoint",
    "config_version": 1.0,
    "class_name": "SimpleCheckpoint",
    "site_names": ["local_site"],
    "notify_on": "failure",
    "slack_webhook": "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX",
}

checkpoint = SimpleCheckpoint(
    f"{checkpoint_config['name']}", context, **checkpoint_config
)

results = checkpoint.run()

if not results["success"]:
    print("Validation failed.")
else:
    print("Validation succeeded.")

이러한 설정을 통해 Great Expectations와 dbt를 통합하여 데이터 검증을 주기적으로 실행하고, 검증 결과를 모니터링하며, 문제가 발생하면 알림을 받을 수 있습니다.


profile
Data Engineer

0개의 댓글