[데이터 엔지니어링 데브코스] TIL 58일차 - 하둡과 spark(3)

박단이·2024년 2월 5일
0

데브코스 TIL

목록 보기
55/56

오늘 공부한 내용🤓

Spark SQL

  • SQL은 빅데이터 세상에서 필수!
    • 구조화된 데이터는 규모와 상관없이 SQL을 사용
    • 모든 대용량 data warehouse는 SQL 기반
      • Redshift, Snowflake, BigQuery
      • Hive / Presto
    • Spark도 Spark SQL을 지원함으로써 SQL을 사용

Spark SQL 특징

  • 구조화된 데이터 처리를 위한 Spark 모듈
  • DataFrame 작업을 SQL로 처리 가능
    • SQL로 가능한 작업이라면 굳이 df를 사용할 이유 X
    • Spark SQL, DataFrame 동시에 사용 가능하므로 필요에 따라 혼용할 것!
  • SQL 사용 시 장점
    1. Familiarity/Readability
      : 가독성이 좋고, 더 많은 사람들이 사용 가능
    2. Optimization
      : Spark SQL 엔진이 최적화하기 더 좋다. (Catalyst Optimizer, Project Tungsten)
    3. Interoperability/Data Management
      : 포팅이 쉽고 접근 권한 체크도 쉽다
  • HQL(Hive Query Language)과 호환
    Hive 테이블들을 읽고 쓸 수 있다. (Hive Metastore)

Spark SQL 사용법

  1. DataFrame을 기반으로 데이블 뷰 생성
    • createOrReplaceTempView : Spark Session이 살아 있는 동안 존재
    • createOrReplaceGlobalTempView : Spark Driver가 살아 있는 동안 존재
  2. Spark Session의 .sql 함수로 SQL 결과를 DataFrame으로 받음

Aggregation 함수

이 함수들은 DataFrame보다 SQL로 작성하는 것을 추천

  1. Group By : SUM, MIN, MAX, AVG, COUNT, ...
  2. Window 함수 : ROW_NUMBER, FIRST_VALUE, LAST_VALUE
  3. RANK

JOIN 함수

  • 2개 혹은 그 이상의 테이블들을 공통 필드를 통해 merge하여 새로운 테이블을 생성
  • 스타 스키마로 구성된 테이블들로 분산되어 있던 정보를 통합하는 데 사용
  • 종류 : inner join, full join(outer join), left join, right join, cross join, self join
  • 최적화 관점에서 본 join의 종류들
    • Shuffle Join : 일반 조인 방식 (무조건 shuffle 발생)
      • Bucket Join : 조인 키를 바탕으로 새로 파티션을 새로 만들고 조인하는 방식 (shuffle 발생 X)
    • Broadcast Join : 큰 데이터와 작은 데이터 간의 조인
      df 하나가 충분히 작으면 작은 df을 다른 df이 있는 서버들로 뿌리는 방법 (broadcasting)
      spark.sql.autoBroadcastJoinThreshold로 충분히 작은지 여부 결정
      (executor에 지정한 memory보다 조금 작게 설정한다.)

UDF (User Defined Function)

  • DataFrame이나 SQL에서 적용할 수 있는 사용자 정의 함수
  • Scalar 함수 : UPPER, LOWER, ...
  • Aggregation 함수(UDAF) : SUM, MIN, MAX,

UDF 사용 방법

  1. 함수 등록
    • pyspark.sql.functions.udf : df에서만 사용 가능
    • spark.udf.register: df, sql 모두 사용 가능
  2. 함수 사용
    • .withColumn, .agg
    • .sql

UDF 사용 예시

upper 함수를 구현한다고 해보자.

# df에 lambda 함수를 적용한 UDF
import pyspark.sql.functions as F
from pyspark.sql.types as *

upperUDF = F.udf(lambda x : x.upper(x))
df.withColumn("Upper Name", upperUDF("Name"))
# python 함수를 사용하여 Spark SQL에 적용
def upper(s):
	return s.upper()

upperUDF = spark.udf.register("upper", upper)
df.createOrReplaceTempView("test")

# 아래의 두 함수는 같은 결과
spark.sql("""SELECT name, upper(name) as `Upper Name`
  FROM test
  """).show()
df.select("name", upperUDF("name").alias("Upper Name")).show()
# pandas_udf를 사용하기 (이 방법을 추천)
# 왜? input으로 레코드들의 집합(Series) 형태로 넣어주기 때문에 빠르게 계산

from pyspark.sql.functions import pandas_udf
import pandas as pd

# 결과 데이터의 타입을 입력
@pandas_udf(StringType())
def upper_udf(s:pd.Series) -> pd.Series:
	return s.str.upper()

upperUDF = spark.udf.register("upper_udf", upper_udf)

df.select("name", upperUDF("name").alias("Upper Name")).show()
spark.sql("""SELECT name, upper_udf(name) as `Upper Name`
  FROM test
  """).show()

DataFrame에 Aggregation을 사용해보자.
Pandas UDF 방식을 권장

# 예시 : 나이 평균 구하기

@pandas_udf(FloatType())
def average(s:pd.Series) -> float:
	return s.mean()

averageUDF = spark.udf.register("average", average)

spark.sql("""SELECT average(age) FROM test""").show()
df.select(averageUDF("age")).show()

느낀 점😊

UDF를 적용하는 건 익숙하게 하지 않으면 계속 까먹을 것 같다. 자주 실습해봐야겠다.

profile
데이터 엔지니어를 꿈꾸는 주니어 입니다!

0개의 댓글