run-operation 명령어와 함께 dbt Cloud나 Airflow를 사용하여 검증을 주기적으로 실행합니다.먼저 Great Expectations 프로젝트를 초기화하고 Expectation Suite를 생성합니다.
# Great Expectations 초기화
great_expectations init
great_expectations 디렉토리가 생성되며, 여기에서 데이터를 로드하고 Expectation Suite를 정의할 수 있습니다.
dbt 프로젝트를 초기화하고, 필요한 데이터 소스를 설정합니다.
# dbt 프로젝트 초기화
dbt init my_dbt_project
cd my_dbt_project
# profiles.yml 파일을 설정하여 데이터베이스 연결을 설정합니다.
dbt 모델을 생성하고, 이를 기반으로 Great Expectations 검증을 수행하는 스크립트를 작성합니다.
models/my_model.sql 파일을 생성합니다.
-- models/my_model.sql
with source as (
select * from {{ ref('my_source_table') }}
)
select * from source
models/great_expectations_validation.sql 파일을 생성합니다.
-- models/great_expectations_validation.sql
{% set results = run_query('SELECT * FROM {{ ref('my_model') }} LIMIT 1') %}
{{ return(results) }}
dbt의 run-operation 명령어를 사용하여 Great Expectations 검증을 실행하는 스크립트를 작성합니다.
great_expectations_validation.py 파일을 생성합니다.
import subprocess
from great_expectations.data_context import DataContext
def validate_data():
context = DataContext()
# Checkpoint 이름과 Expectation Suite 이름을 설정합니다.
checkpoint_name = "my_quickstart_checkpoint"
expectation_suite_name = "suite"
# Checkpoint를 실행하여 검증을 수행합니다.
results = context.run_checkpoint(checkpoint_name=checkpoint_name)
# 검증 결과를 확인합니다.
if not results.success:
raise ValueError("데이터 검증 실패")
if __name__ == "__main__":
subprocess.run(["dbt", "run"])
validate_data()
dbt Cloud나 Airflow를 사용하여 주기적으로 스크립트를 실행합니다.
dbt Cloud의 작업(job) 설정에서 스크립트를 스케줄링할 수 있습니다.
dbt run-operation 명령어를 추가합니다.Airflow DAG을 생성하여 주기적으로 스크립트를 실행할 수 있습니다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
dag = DAG(
'dbt_ge_validation',
default_args=default_args,
schedule_interval='@daily',
)
run_dbt_and_ge_validation = BashOperator(
task_id='run_dbt_and_ge_validation',
bash_command='python /path/to/your/great_expectations_validation.py',
dag=dag,
)
Reference: https://hub.getdbt.com/metaplane/dbt_expectations/latest
dbt-expectation에는 많은 tests 과정들이 있지만 그중에서도 아래와 같은 data quality test를 진행했습니다.
| Category | Test Name | Test Description |
|---|---|---|
| Table shape | expect_row_values_to_have_recent_data | 날짜 컬럼이 최근 N일 이내의 값을 포함하는지 확인 (데이터 freshness 체크에 사용) |
| Table shape | expect_grouped_row_values_to_have_recent_data | 그룹핑된 데이터 내에서 각 그룹이 최근 N일 이내의 데이터를 포함하고 있는지 확인 |
| Table Shape | expect_table_aggregation_to_equal_other_table | 집계 결과(예: SUM, COUNT 등)가 다른 테이블의 집계 결과와 같은지 비교 |
| Missing values | expect_column_values_to_not_be_null | 특정 컬럼에 NULL이 없어야 함 |
| Unique values | expect_column_values_to_be_unique | 컬럼 값이 유일해야 함 (중복 불가) |
| Sets and ranges | expect_column_values_to_be_in_set | 컬럼 값이 주어진 값 집합 안에 있는지 확인 |
| Sets and ranges | expect_column_values_to_be_between | 값이 특정 최소/최대값 사이에 있어야 함 |
| String | expect_column_value_lengths_to_be_between | 문자열 길이가 지정된 최소~최대 길이 사이인지 확인 |
| String | expect_column_value_lengths_to_equal | 문자열 길이가 특정 값과 정확히 일치하는지 확인 |
| String | expect_column_values_to_match_like_pattern | SQL LIKE 패턴과 일치하는지 확인 (예: %abc%) |
| String | expect_column_values_to_match_regex | 정규 표현식에 일치하는지 확인 |
| Aggregate | expect_column_distinct_count_to_equal | 유니크한 값 개수가 지정된 숫자와 정확히 일치해야 함 |
| Aggregate | expect_column_distinct_count_to_be_greater_than | 유니크한 값의 개수가 지정된 숫자보다 커야 함 |
| Aggregate | expect_column_distinct_count_to_be_less_than | 유니크한 값 개수가 지정된 숫자보다 작아야 함 |
| Aggregate | expect_column_distinct_values_to_be_in_set | 고유값들이 특정 집합 내에 포함되어야 함 |
| Aggregate | expect_column_distinct_values_to_contain_set | 특정 고유값 집합이 반드시 포함되어야 함 (예: 특정 필수 코드) |
| Aggregate | expect_column_proportion_of_unique_values_to_be_between | 고유값 비율 (고유값 수 / 전체 row 수)이 특정 범위에 있어야 함 |
| Aggregate | expect_column_stdev_to_be_between | 표준편차가 지정된 범위 내에 있어야 함 |
| Aggregate | expect_column_sum_to_be_between | 전체 합계가 특정 범위 내에 있어야 함 |
Great Expectations는 검증 결과를 Data Docs로 시각화합니다. 검증 실패 시 이메일 알림 또는 Slack 알림을 설정할 수 있습니다.
from great_expectations.data_context import DataContext
from great_expectations.checkpoint import SimpleCheckpoint
context = DataContext()
checkpoint_config = {
"name": "my_checkpoint",
"config_version": 1.0,
"class_name": "SimpleCheckpoint",
"site_names": ["local_site"],
"notify_on": "failure",
"slack_webhook": "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX",
}
checkpoint = SimpleCheckpoint(
f"{checkpoint_config['name']}", context, **checkpoint_config
)
results = checkpoint.run()
if not results["success"]:
print("Validation failed.")
else:
print("Validation succeeded.")
이러한 설정을 통해 Great Expectations와 dbt를 통합하여 데이터 검증을 주기적으로 실행하고, 검증 결과를 모니터링하며, 문제가 발생하면 알림을 받을 수 있습니다.