해당 문서는 PySpark를 이용하여 UnitTest 하는 방법을 정리 하기 위해 작성된 문서이다.
unittest.TestCase
를 상속받는 클래스 형태로 정의한다.import unittest
import pyspark.sql.functions as F
from pyspark.sql import SparkSession, DataFrame
def assert_test_data_ignore_ordering(
df1: DataFrame, df2: DataFrame
):
print('test assert_test_data_ignore_ordering')
df3 = df1.subtract(df2)
df4 = df2.subtract(df1)
df3_count = df3.count()
df4_count = df4.count()
if (df3_count == 0) and (df4_count == 0):
assert True
else:
assert False
def assert_test_data_with_ordering(
df1: DataFrame, df2: DataFrame
):
print('test assert_test_data_with_ordering')
df1_collect = df1.collect()
df2_collect = df2.collect()
for row_index in range(len(df1_collect)):
for column_name in df1.columns:
left_cell = df1_collect[row_index][column_name]
right_cell = df2_collect[row_index][column_name]
if left_cell == right_cell:
assert True
elif left_cell is None and right_cell is None:
assert True
else:
msg = f"Data mismatch\n\nRow = {row_index + 1} : Column = {column_name}\n\nACTUAL: {left_cell}\nEXPECTED: {right_cell}\n"
assert False, msg
class WordCountSparkTestCode(unittest.TestCase):
def setUp(self) -> None:
self.spark = SparkSession.builder \
.master('local[*]').appName('WordCountTest') \
.getOrCreate()
def tearDown(self) -> None:
self.spark.stop()
def test_word_count(self):
word_count = self.spark.createDataFrame(
data= [
('Hello World',),
('The world is wide',),
('This is for word count test',)
],
schema=['text']
)
word_count_2 = word_count.select(
F.split(F.lower('text'), ' ').alias('word')
).select(F.explode('word').alias('word')).groupBy(
F.trim('word').alias('word')
).count().orderBy(
['count', 'word'], ascending=[False, True]
)
result_1 = self.spark.createDataFrame(
data = [
('is', 2),
('world', 2),
('count', 1),
('for', 1),
('hello', 1),
('test', 1),
('the', 1),
('this', 1),
('wide', 1),
('word', 1),
],
schema=['word', 'count']
)
assert_test_data_ignore_ordering(word_count_2, result_1)
assert_test_data_with_ordering(word_count_2, result_1)
if __name__ == '__main__':
unittest.main()