DevCourse TIL Day3 Week14 - spark SQL

김태준·2023년 7월 6일
0

Data Enginnering DevCourse

목록 보기
64/93
post-thumbnail

어제까지 spark에 대해 전반적인 이론과 colab과 pyspark를 연동하여 쿼리 처리하는 방법에 대해 학습을 했다.

오늘 학습할 내용은 spark내 빅데이터 처리 기능인 spark SQL

✅ spark SQL

🎈 aggregation 함수

JOIN : 스타 스키마로 구성된 테이블들로 분산되어 있던 정보 통합 시 사용
INNER JOIN : 양 테이블에서 매칭되는 조인 KEY에 한해 레코드 조회
FULL JOIN : 매칭여부와 상관없이 모든 레코드 조회
CROSS JOIN : 두 레코드의 조합으로 결과 조회
LEFT(RIGHT) JOIN : 오른쪽 테이블에선 매칭되는 KEY에 한해서만 레코드 조회 (왼쪽은 전부)
SELF JOIN : 동일한 테이블 조인해 조회
Shuffle JOIN : 일반 조인 방식
Bucket JOIN : 조인 KEY를 바탕으로 새로 파티션 만들고 조인하는 방식
Broadcast JOIN : 큰 데이터와 작은 데이터 간 조인. (spark.sql.autoBroadcastJoinThreshold 파라미터로 작은지 여부 결정)

✅ UDF (User Definde Function)

  • DF나 SQL에서 적용할 수 있는 사용자 정의 함수
  • scalar 함수 (upper, lower) vs aggregation 함수 (sum, min, max)
  • 함수 구현 방법은 lambda, 파이썬 함수, pandas 함수 등
  • 함수 등록 방법은 2가지 (DF - pyspark.sql.functions.udf, SQL - spark.udf.register)
# UDF - DF에 사용하기 
import pyspark.sql.functions as F
from pyspark.sql.types import *
upperUDF = F.udf(lambda z:z.upper())
df.withColumn("Curated Name", upperUDF("Name"))

# UDF - SQL 사용
def upper(s):
	return s.upper()

# test
upperUDF = spark.udf.register("upper", upper)
spark.sql("SELECT upper('aBcD')").show()

# DF 기반 SQL적용
df.createOrReplaceTempView("test")
spark.sql("""SELECT name, upper(name) "Curated Name" FROM test""").show()

# UDF - DF 사용 2
data = [
	{"a": 1, "b": 2},
    {"a": 5, "b": 5}
]
df = spark.createDataFrame(data)
df.withColumn("c", F.udf(lambda x, y: x+y)("a","b")

✅ JOIN 문 예제

join 시 주의할 점은 join에 사용되는 key가 unique한 지 체킹하는 습관들이기.

# PySpark 라이브러리 호출 및 설치
!pip install pyspark==3.3.1 py4j==0.10.9.5
from pyspark.sql import SparkSession

# SparkSession 생성
spark = SparkSession \
	.builder \
    .appName("Python Spark SQL #1") \
    .getOrCreate()

# table 2개 호출
# vital - date, userid, vitalid, weight
# alert - alertid, alerttype, date, userid, vitalid

rdd_vital = spark.sparkContext.parallelize(vital)
rdd_alert = spark.sparkContext.parallelize(alert)
df_vital = rdd_vital.toDF()
df_alert = rdd_alert.toDF()

# 아래 작성된 JOIN 예제들은 모두 DF에서 처리하는 방법.

# inner join
join_expr = df_vital.vitalid = df_alert.vitalid
df_vital.join(df_alert, join_expr, "inner").show()

# left join
join_expr = df_vital.vitalid = df_alert.vitalid
df_vital.join(df_alert, join_expr, "left").show()

# right join
join_expr = df_vital.vitalid = df_alert.vitalid
df_vital.join(df_alert, join_expr, "right").show()

# full outer join
join_expr = df_vital.vitalid = df_alert.vitalid
df_vital.join(df_alert, join_expr, "full").show()

# cross join
df_vital.join(df_alert, None, "cross").show()

# self join
join_expr = df_vital.vitalid = df_vital.vitalid
df_vital.join(df_vital, join_expr, "left").show()

# SQL로 처리하고 싶다면, 각 DF별로 createOrReplaceTempView로 작성 후 spark.sql(sql문법 사용)

✅ ranking 예제

SQL로 조인 후 지표 계산하는 방법 중 RANKING 관련 문법 학습

# PySpark 라이브러리 호출 및 설치
!pip install pyspark==3.3.1 py4j==0.10.9.5
from pyspark.sql import SparkSession

# SparkSession 생성
spark = SparkSession \
	.builder \
    .appName("Python Spark SQL #1") \
    .getOrCreate()
    
# redshift 연결 
url = "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=$&password=$"

# DF로 테이블들 로딩하기
df_user_session_channel = spark.read \
	.format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", url) \
    .option("dbtable", "raw_data.user_session_channel") \
    .load()
df_session_timestamp = spark.read \
	.format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", url) \
    .option("dbtable", "raw_data.user_session_channel") \
    .load()
df_session_transaction = spark.read \
	.format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", url) \
    .option("dbtable", "raw_data.user_session_channel") \
    .load()

df_user_session_channel.createOrReplaceTempView("user_session_channel")
df_session_timestamp.createOrReplaceTempView("session_timestamp")
df_session_transaction.createOrReplaceTempView("session_transaction")

top_rev_user_df2 = spark.sql("""
	SELECT
  		userid,
  		SUM(amount) total_amount, 
  		# amount 합 즉 총매출 필드를 내림차순하여 순위부여한 rank를 필드로 두고 진행
 		RANK() OVER (ORDER BY SUM(amount) DESC) rank
	FROM session_transaction st
	JOIN user_session_channel usc ON st.sessionid = usc.sessionid
	GROUP	BY userid
	ORDER BY rank
	LIMIT 10"""
)

✅ grouping 예제

// JOIN KEY Uniqueness checking (sessionid로 확인) , 테이블 별로 check하는 습관 必
SELECT 
	sessionid,
    count(1) counting
FROM user_session_channel
GROUP BY 1
ORDER BY 2 DESC
LIMIT 1;

// 월별 채널별 총 방문자, 구매 방문자 조회
month_channel_rev_df = spark.sql("""
	SELECT 
    	LEFT(sti.ts, 7) year_month,
        usc.channel channel,
        COUNT(DISTINCT userid) total_visitors,
        COUNT(DISTINCT CASE WHEN amount is not NULL THEN userid END) paid_visitors
    FROM user_session_channel usc
	LEFT JOIN session_timestamp sti ON usc.sessionid = sti.sessionid
    LEFT JOIN session_transaction str ON usc.sessionid = str.sessionid
    GROUP BY 1, 2 
    ORDER BY 1, 2
    """
)

// 월별 채널별 총 매출액, 총 방문자, 매출 발생 방문자, 전환률 계산 
month_channel_rev_df = spark.sql("""
	SELECT
    	LEFT(ts, 7) as year_month,
        usc.channel,
        COUNT(DISTINCT userid) uniqueUsers
        COUNT(DISTINCT (CASE WHEN amount >= 0 THEN userid END)) paidUsers,
        SUM(amount) grossRevenue,
        SUM(CASE WHEN refunded is not True THEN amount END) netRevenue,
        ROUND(COUNT(DISTINCT CASE WHEN amount >= 0 THEN userid END) * 100 
        	/ COUNT(DISTINCT userid), 2) conversionRatio
	FROM user_session_channel usc
    LEFT JOIN session_timestamp t ON t.sessionid = usc.sessionid
    LEFT JOIN session_transaction st ON st.sessionid = usc.sessionid
    GROUP BY 1, 2
    ORDER BY 1, 2
    """
)

✅ windowing 예제

ROWS BETWEEN ~ AND ~ 구문
ex1) SUM(value) OVER(ORDER BY value) ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)
-> 해당 레코드 기준 앞 2개 + 현재 레코드 값 + 뒤 2개의 합으로 처리

ex2) SUM(value) OVER(ORDER BY value) ROWS BETWEEN unbounded PRECEDING AND unbounded FOLLOWING
-> 해당 레코드를 기준으로 앞부분, 뒷부분 모든 합 처리

// 유저별 처음 채널, 마지막 채널 알아내기
// ROW_NUMBER() OVER ~ PARTITION BY 도 사용할 수 있으나, preceding, following 사용하여 조회할 것.

first_last_channel_df  = spark.sql("""
	SELECT 
    	DISTINCT A.userid,
    	FIRST_VALUE(A.channel) OVER(PARTITION BY A.userid ORDER BY B.ts rows between unbounded preceding and unbounded following) AS first_channel,
        LAST_VALUE(A.channel) OVER(PARTITION BY A.userid ORDER BY B.ts rows between unbounded preceding and unbouded following) AS last_channel
    FROM user_session_channel A
    LEFT JOIN session_timestamp B ON A.sessionid = B.sessionid
	"""
)

// CTE 로 처리하기
first_last_channel_df = spark.sql("""
	WITH RECORD AS (
		SELECT 
    		userid, 
        	channel,
        	ROW_NUMBER() OVER (PARTITION BY userid ORDER BY ts) AS seq_first,
        	ROW_NUMBER() OVER (PARTITION BY userid ORDER BY ts DESC) AS seq_last
    	FROM user_session_channel u
    	LEFT JOIN session_timestamp t ON u.sessionid = t.sessionid
	)

	SELECT 
		R.userid,
    	R.channel first_channel,
    	E.channel last_channel
	FROM RECORD R
	INNER JOIN RECORD E ON R.userid = E.userid
	WHERE R.seq_first = 1 and E.seq_last = 1
	ORDER BY 1
	"""
)

✅ Hive meta store

  • spark 내 테이블 관리 방식 : DB.TABLE
  • Catalog : 테이블과 뷰에 관한 메타 데이터 관리 (spark는 메모리 기반 카탈로그 제공, 세션 끝나면 사라지는), HDFS와 같은 파일시스템에 저장되는 테이블 만들고 싶다면? Hive와 호환되는 카탈로그 제공 - Persistent (meta store 사용)

HDFS 상 테이블 생성 시 2 종류의 테이블이 존재

  • Managed table : spark이 실제 데이터, 메타 데이터 모두 관리
    -> (CTAS, dataframe.saveAsTable("테이블명")으로 생성 가능하며 PARQUET이 기본 데이터 포맷이다. spark.sql.warehouse.dir 위치에 데이터가 저장되며 spark 테이블로 처리하는 것이 장점!
  • Unmanaged(external) table : spark이 메타 데이터만 관리
    -> LOCATION이라는 property 사용하며 이미 HDFS에 존재하는 데이터에 스키마를 정의해서 사용. external table이 삭제되도 실제 데이터가 남아있기에 데이터 변경 X

-> 스토리지 기반 카탈로그 사용 : SparkSession 생성 시 .enableHiveSupport() 호출할 것.

✅ unittest 구축하기

CI/CD를 위해 구현한 코드의 test-coverage가 굉장히 중요.
각 언어별로 정해진 test framework 사용이 일반적. python : unittest

# 파일 로딩 테스트
def load_gender(spark, file_path):
	return spark.read.option("header", True).csv(file_path)

# gender 기준 counting 테스트 
def get_gender_count(spark, df, field_to_count):
	df.createOrReplaceTempView("namegender_test")
    return spark.sql(f"SELECT {field_to_count}, COUNT(1) count FROM namegender_test GROUP BY 1")
    
    
# unittest 코드 

from unittest import TestCase

class UtilsTestCase(TestCase):
	spark = None
    
    # setUpClass로 처음에 한번 호출
    @classmethod
    def setUpClass(cls) -> None:
    	cls.spark = SparkSession.builder \
        	.appName("Spark Unit Test")
            .getOrCreate()
    
    def test_datafile_loading(self):
    	sample_df = load_gender(self.spark, "name_gender.csv")
        result_count = sample_df.count()
        self.assertEqual(result_count, 100, "Record count should be 100")
    
    def test_gender_count(self):
    	sample_df = load_gender(self.spark, "name_gender.csv")
        count_list = get_gender_count(self.spark, sample_df, "gender").collect()
        count_dict = dict()
        for row in count_list:
        	count_dict[row['gender']] = row['count']
        self.assertEqual(count_dict['F'], 65, "Count for Female should be 65")
        self.assertEqual(count_dict['M'], 28, "Count for Male should be 28")
        self.assertEqual(count_dict['Unisex'], 7, "Count for Unisex should be 28")
    
    def test_upper_udf(self):
    	test_data = [
            { "name": "John Kim" },
            { "name": "Johnny Kim"},
            { "name": "1234" }
        ]
        expected_results = [ "JOHN KIM", "JOHNNY KIM", "1234" ]

        upperUDF = self.spark.udf.register("upper_udf", upper_udf_f)
        test_df = self.spark.createDataFrame(test_data)
        names = test_df.select("name", upperUDF("name").alias("NAME")).collect()
        results = []
        for name in names:
            results.append(name["NAME"])
        self.assertCountEqual(results, expected_results)
    
    # release
    @classmethod
    def tearDownClass(cls) -> None:
    	cls.spark.stop()
    
import unittest

unittest.main(argv=[''], verbosity=2, exit=False)
profile
To be a DataScientist

0개의 댓글