[6/19] TIL - SparkSQL

Sangwon Jwa·2024년 6월 19일

데브코스 TIL

목록 보기
49/54
post-thumbnail

📖 학습 주제


  1. SparkSQL
  2. UDF (User Defined Function)

✏️ 주요 메모 사항 소개


SparkSQL

구조화된 데이터 처리를 위한 Spark 모듈로, 데이터 프레임 작업을 SQL로 처리 가능하게 해준다.

  • 데이터 프레임에 테이블 이름 지정 후 sql함수 사용 가능
    • pandas에도 pandasql 모듈의 sqldf 함수를 이용하는 동일한 패턴 존재
  • HQL(Hive Query Language)과의 호환도 제공
    • Hive 테이블들을 읽고 쓸 수 있음 (Hive Meatastore)

DataFrame과 비교해서 Spark SQL은 다음과 같은 장점이 있다.

  1. SQL이 가독성이 더 좋고 더 많은 사람들이 사용 가능
  2. Spark SQL 엔진이 최적화가 더 좋음
  3. 포팅이 쉽고, 접근권한 체크도 쉬음

사용 방법

데이터 프레임을 기반으로 테이블 뷰를 생성 : 테이블이 만들어짐

  • createOrReplaceTempView : spark Session이 살아있는 동안 존재
  • createOrReplaceGlobalTempView : Spark 드라이버가 살아있는 동안 존재

Spark Session의 sql 함수로 SQL 결과를 데이터 프레임으로 받음

namegender_df.createOrReplaceTempView("namegender")
namegender_group_df = spark.sql("""
	SELECT gender, count(1) FROM namegender GROUP BY 1
""")

print(namegender_group_df.collect())

JDBC를 이용하여 외부의 관계형 데이터베이스와 연결하려면 Spark Session의 read 함수를 호출하면 결과가 데이터프레임으로 리턴됨

df_user_session_channel = spark.read \
	.format("jdbc") \
    .option("driver","com.amazon.redshift.jdbc42.Driver") \
    .option("url", "jdbc:redshift//HOST:PORT/DB?user=ID&password=PASSWORD") \
    .option("dbtable", "raw_data.user_session_channel") \
    .load()

User Defined Function

특정 조건을 만족시키기 위해 새로운 함수를 만들어야 할 경우에 유용하게 사용할 수 있는 것이 UDF이다. 데이터프레임의 경우 .withColumn 함수와 같이 사용하는 것이 일반적이다.

Aggregation용 UDAF도 존재한다. (GROUP BY에서 사용되는 SUM, AVG와 같은 함수를 만드는 것, PySpark에서는 지원되지 않음)

# 예시(Pandas UDF Scalar 함수)

from pyspark.sql.functions import pandas_udf
import pandas as pd

# UDF 생성
@pandas_udf(StringType())
def upper_udf2(s: pd.Series) -> pd.Series:
	return s.str.upper()
    
upperUDF = spark.udf.register("upper_udf", upper_udf2)

# 결과 출력
df.select("Name", upperUDF("Name")).show()
spark.sql("""SELECT name, upper_udf(name) 'Curated Name' FROM test""").show()

Aggregation용 UDF도 만들어보자

# 예시(Pandas UDF로 Aggregation 사용)

from pyspark.sql.functions import pandas_udf
import pandas as pd

# UDF 생성
@pandas_udf(FloatType())
def average(v: pd.Series) -> float:
	return v.mean()
    
averageUDF = spark.udf.register('average', average)

# 결과 출력
spark.sql('SELECT average(b) FROM test').show()
df.agg(averageUDF("b").alias("count")).show()

0개의 댓글