PySpark TestCode

Log·2023년 2월 20일
0

문서 목적

해당 문서는 PySpark를 이용하여 UnitTest 하는 방법을 정리 하기 위해 작성된 문서이다.

UnitTest

  • 단위 테스트는 모듈 또는 응용 프로그램 내의 개별 코드 단위가 예상대로 잘 작동하는지 검증하는 절차
  • Python에서 UnitTest의 경우, unittest.TestCase를 상속받는 클래스 형태로 정의한다.

UnitTest를 이용한 TestCode

import unittest
import pyspark.sql.functions as F
from pyspark.sql import SparkSession, DataFrame

def assert_test_data_ignore_ordering(
    df1: DataFrame, df2: DataFrame
):
    print('test assert_test_data_ignore_ordering')
    df3 = df1.subtract(df2)
    df4 = df2.subtract(df1)
    df3_count = df3.count()
    df4_count = df4.count()
    if (df3_count == 0) and (df4_count == 0):
        assert True
    else:      
        assert False

def assert_test_data_with_ordering(
    df1: DataFrame, df2: DataFrame
):
    print('test assert_test_data_with_ordering')
    df1_collect = df1.collect()
    df2_collect = df2.collect()

    for row_index in range(len(df1_collect)):
        for column_name in df1.columns:
            left_cell = df1_collect[row_index][column_name]
            right_cell = df2_collect[row_index][column_name]
            if left_cell == right_cell:
                assert True
            elif left_cell is None and right_cell is None:
                assert True
            
            else:
                msg = f"Data mismatch\n\nRow = {row_index + 1} : Column = {column_name}\n\nACTUAL: {left_cell}\nEXPECTED: {right_cell}\n"
                assert False, msg


class WordCountSparkTestCode(unittest.TestCase):
    def setUp(self) -> None:
        self.spark = SparkSession.builder \
            .master('local[*]').appName('WordCountTest') \
            .getOrCreate()

    def tearDown(self) -> None:
        self.spark.stop()

    def test_word_count(self):
        word_count = self.spark.createDataFrame(
            data= [
                ('Hello World',),
                ('The world is wide',),
                ('This is for word count test',)
            ],
            schema=['text']
        )

        word_count_2 = word_count.select(
            F.split(F.lower('text'), ' ').alias('word')
        ).select(F.explode('word').alias('word')).groupBy(
            F.trim('word').alias('word')
        ).count().orderBy(
            ['count', 'word'], ascending=[False, True]
        )

        result_1 = self.spark.createDataFrame(
            data = [
                ('is', 2),
                ('world', 2),
                ('count', 1),
                ('for', 1),
                ('hello', 1),
                ('test', 1),
                ('the', 1),
                ('this', 1),
                ('wide', 1),
                ('word', 1),
            ],
            schema=['word', 'count']
        )

        assert_test_data_ignore_ordering(word_count_2, result_1)
        assert_test_data_with_ordering(word_count_2, result_1)

if __name__ == '__main__':
    unittest.main()
profile
열심히 정리하는 습관 기르기..

0개의 댓글