저번에 Great Expectations의 기초적인 요소들에 대해 학습을 했다.
해당 내용을 실제 코드로 옮겨보고자 한다.
환경은 모두가 다를 수 있고, 이렇게 되는구나를 느끼면 될 것 같다.
필자는 Jupyter Notebook, pyspark, great expectation을 사용하였다.
dataset은
https://github.com/databricks/LearningSparkV2/tree/master/databricks-datasets/learning-spark-v2/flights
의 departuredelays.csv를 사용하였다.
- import
from pyspark.sql import SparkSession import great_expectations as gx
- 일단 SparkSession 만들고 dataframe 생성
spark = SparkSession.builder.appName("test_app").getOrCreate() df = spark.read.option("header", "true").option("inferSchema", "true").csv("departuredelays.csv") # inferSchema 옵션을 true로 할 경우 처리 시간이 오래 걸린다. 현업에서는 스키마 따로 정의하자
- great expectaion 시작. data context 생성
context = gx.get_context(mode="ephemeral") # 간단한 테스트니까 ephemeral 도 충분하다.
- datasource 생성
data_source = context.data_sources.add_spark(name="test_source") # source는 저번에 말했듯 datalake 같은 저장소에 대한 것이다
- data asset 생성
data_asset = data_source.add_dataframe_asset(name="test_asset") # asset은 곧 dataset 같은 단위라고 보면 된다.
- batch_definition 생성
batch_def = data_asset.add_batch_definition_whole_dataframe("test_batch") # batch는 위의 data asset, 즉 dataset이 실제로는 매우 클 때가 있는데 (수백 Gb ~ Tb 단위) # 이를 한 번에 처리하기 어려우므로 파티셔닝해서 처리할 때 쓰인다. # 우리는 예시이므로 asset 전체를 batch로 가져가는 add_batch_definition_whole_dataframe 를 사용했다.
- expectation 생성
expectations_distance = gx.expectations.ExpectColumnValuesToBeBetween(column="distance", max_value=5000, min_value=10) # 여기서부터는 dataset에 따라, 조직의 요구사항에 따라 dataset quality check 요소를 expectation으로 만든다. # 나는 항공기의 출발지,도착지,거리, 지연 등을 담은 데이터셋이므로 거리에 대해 체크했다.
- expectation suite 생성 및 expectation 추가
expectation_suite_ref = gx.ExpectationSuite(name="test_suite") expectation_suite = context.suites.add(expectation_suite_ref) expectation_suite.add_expectation(expectations_distance) # expectation_suite를 먼저 만들어주고, 이 후에 expectation을 suite에 추가한다.
- validation 생성
validation = gx.ValidationDefinition(data=batch_def, suite=expectation_suite, name="test_valid") # 이제 실제로 수행할 validation에 우리의 작업 단위와 평가 기준 등을 담아준다.
- validation 수행
batch_params = {"dataframe": df} result = validation.run(batch_parameters=batch_params) # dataframe을 사용한다고 정의해주고, 이를 실행해서 결과를 얻는다.
- result 확인
print(result) # 확인 결과는 아래와 같다. { "success": true, "results": [ { "success": true, "expectation_config": { "type": "expect_column_values_to_be_between", "kwargs": { "batch_id": "test_source-test_asset", "column": "distance", "min_value": 10.0, "max_value": 5000.0 }, "meta": {}, "id": "b692dcb0-1813-4e82-91ce-8c045572d107", "severity": "critical" }, "result": { "element_count": 1391578, "unexpected_count": 0, . . . #성공적으로 수행되었음을 확인했다. 아래의 결과중 중요한 부분은 # success: expectation을 만족하지 못하면 false # 그 이후 우리가 정의한 expectation type을 확인하고, element_count와 unexpected_count를 통해 # 실패케이스를 확인하고 카운팅하고 한다.
실패 케이스도 쉽게 만들 수 있으나, 여러분들이 해보시길 권장한다.
이렇게 가볍게 spark와 great_expectation을 연동하여 test를 해봤다.