Data Quality (3) - Great Expectations 실습

jihunnit·2025년 10월 18일
0

데이터엔지니어링

목록 보기
6/6

저번에 Great Expectations의 기초적인 요소들에 대해 학습을 했다.
해당 내용을 실제 코드로 옮겨보고자 한다.
환경은 모두가 다를 수 있고, 이렇게 되는구나를 느끼면 될 것 같다.

필자는 Jupyter Notebook, pyspark, great expectation을 사용하였다.
dataset은
https://github.com/databricks/LearningSparkV2/tree/master/databricks-datasets/learning-spark-v2/flights
의 departuredelays.csv를 사용하였다.


  1. import
from pyspark.sql import SparkSession
import great_expectations as gx

  1. 일단 SparkSession 만들고 dataframe 생성
spark = SparkSession.builder.appName("test_app").getOrCreate()
df = spark.read.option("header", "true").option("inferSchema", "true").csv("departuredelays.csv")
# inferSchema 옵션을 true로 할 경우 처리 시간이 오래 걸린다. 현업에서는 스키마 따로 정의하자

  1. great expectaion 시작. data context 생성
context = gx.get_context(mode="ephemeral")
# 간단한 테스트니까 ephemeral 도 충분하다.
  1. datasource 생성
data_source = context.data_sources.add_spark(name="test_source")
# source는 저번에 말했듯 datalake 같은 저장소에 대한 것이다
  1. data asset 생성
data_asset = data_source.add_dataframe_asset(name="test_asset")
# asset은 곧 dataset 같은 단위라고 보면 된다.
  1. batch_definition 생성
batch_def = data_asset.add_batch_definition_whole_dataframe("test_batch")
# batch는 위의 data asset, 즉 dataset이 실제로는 매우 클 때가 있는데 (수백 Gb ~ Tb 단위)
# 이를 한 번에 처리하기 어려우므로 파티셔닝해서 처리할 때 쓰인다.
# 우리는 예시이므로 asset 전체를 batch로 가져가는 add_batch_definition_whole_dataframe 를 사용했다.
  1. expectation 생성
expectations_distance =  gx.expectations.ExpectColumnValuesToBeBetween(column="distance", max_value=5000, min_value=10)
# 여기서부터는 dataset에 따라, 조직의 요구사항에 따라 dataset quality check 요소를 expectation으로 만든다.
# 나는 항공기의 출발지,도착지,거리, 지연 등을 담은 데이터셋이므로 거리에 대해 체크했다.
  1. expectation suite 생성 및 expectation 추가
expectation_suite_ref = gx.ExpectationSuite(name="test_suite")
expectation_suite = context.suites.add(expectation_suite_ref)
expectation_suite.add_expectation(expectations_distance)
# expectation_suite를 먼저 만들어주고, 이 후에 expectation을 suite에 추가한다.
  1. validation 생성
validation = gx.ValidationDefinition(data=batch_def, suite=expectation_suite, name="test_valid")
# 이제 실제로 수행할 validation에 우리의 작업 단위와 평가 기준 등을 담아준다.
  1. validation 수행
batch_params = {"dataframe": df}
result = validation.run(batch_parameters=batch_params)
# dataframe을 사용한다고 정의해주고, 이를 실행해서 결과를 얻는다.
  1. result 확인
print(result)
# 확인 결과는 아래와 같다.
{
  "success": true,
  "results": [
    {
      "success": true,
      "expectation_config": {
        "type": "expect_column_values_to_be_between",
        "kwargs": {
          "batch_id": "test_source-test_asset",
          "column": "distance",
          "min_value": 10.0,
          "max_value": 5000.0
        },
        "meta": {},
        "id": "b692dcb0-1813-4e82-91ce-8c045572d107",
        "severity": "critical"
      },
      "result": {
        "element_count": 1391578,
        "unexpected_count": 0,
        .
        .
        .
#성공적으로 수행되었음을 확인했다. 아래의 결과중 중요한 부분은
# success: expectation을 만족하지 못하면 false
# 그 이후 우리가 정의한 expectation type을 확인하고, element_count와 unexpected_count를 통해
# 실패케이스를 확인하고 카운팅하고 한다.

실패 케이스도 쉽게 만들 수 있으나, 여러분들이 해보시길 권장한다.
이렇게 가볍게 spark와 great_expectation을 연동하여 test를 해봤다.

profile
인간은 노력하는 한 방황한다

0개의 댓글