📚 오늘 공부한 내용
1. Spark SQL
1) Spark SQL이란?
- Spark SQL은 구조화된 데이터 처리를 위한 Spark 모듈이다.
- 데이터 프레임 작업을 SQL로 처리 가능하다.
- 데이터 프레임에 테이블 이름 지정 후 SQL 함수로 사용 가능하다.
- HQL(Hive Query Language)와 호환을 제공한다. 보통 Hive와 Spark 두 개의 시스템을 동시에 운영하는 것이 일반적이다.
2) Spark SQL 장점
- SQL로 작업이 가능하다면 DataFrame을 사용할 이유가 없다.
- SQL이 가독성이 더 좋고, 더 많은 사람들이 사용 가능하다. (Familiarity/Readability)
- Spark SQL 엔진이 최적화하기에 더 좋다. (Optimization)
- Catalyst, Optimizer와 Project Tungsten
- SQL이 포팅이 쉽고, 접근 권한도 확인하기 쉽다. 만약 권한이 없다면 오류를 내면 된다. (Interoperability/Data Management)
3) Spark SQL 사용 방법
- 데이터 프레임 기반으로 테이블 뷰를 생성한다.
- createOrReplaceTempView: Session이 살아 있는 동안 존재
- createOrReplaceGlobalTempView: Spark 드라이버가 살아 있는 동안 존재
- Spark Session의 SQL 함수로 결과를 데이터 프레임으로 받는다.
- SparkSession을 사용해서 외부 데이터 베이스와 연결이 필요할 경우 SparkSession으로 read 함수를 호출한다. (결과는 데이터 프레임)
df_user_session_channel = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", "jdbc:redshift://HOST:PORT/DB?user=ID&password=PASSWORD") \
.option("dbtable", "raw_data.user_session_channel") \
.load()
2. JOIN
1) 최적화 관점에서 본 조인의 종류
- Spark SQL 사용 시 최적화가 필요해진다. 데이터셋의 크기가 크면 Partition이 여러 개가 발생하게 되고 Shuffle 과정에서 Data Skew가 발생할 수도 있고, 그렇게 되면 속도가 늦어진다. 즉, Performance 측면에서는 Shuffle이 적어야 한다.
- Shuffle JOIN
- 일반 조인 방식
- Bucket JOIN: Join key를 바탕으로 새로 Partition을 미리 만들어 두고 JOIN 하는 방식. (Shuffling이 발생하지 않음)
- Broadcast JOIN
- 큰 데이터와 작은 데이터 간의 조인 (둘을 비교했을 때 하나의 데이터가 작아야 함)
- 데이터 프레임 하나가 충분히 작으면 작은 데이터 프레임을 다른 데이터 프레임이 있는 서버로 뿌리는 것.
- 이때 작다의 기준을 잡아 주는 파라미터가 존재한다. spark.sql.autoBroadcastJoinThreshold로 결정한다.
2) Shuffle JOIN
- 이렇게 두 개의 데이터프레임이 shuffling을 하면서 새로운 Partition으로 복사되며 합쳐지는 것을 Shuffle JOIN이라고 한다.
- 이렇게 된다면 엄청난 이동이 발생하게 되며 이동 후에 Partition의 크기가 달라질 수도 있다.
- 일반적으로는 이 JOIN이 사용된다.
- 이때 Left DataFrame과 Right DataFrame의 데이터를 조회 혹은 로딩해 올 때부터 미리 Partition을 만들어 두고 JOIN KEY가 같은 데이터끼리 JOIN을 하는 것이 Bucket JOIN이다. (Bucket JOIN은 Shuffling이 필요하지 않다.)
3) Broadcast JOIN
- 누가 봐도 차이가 확연한 엄청 큰 데이터프레임과 작은 데이터프레임을 두고, 작은 데이터프레임 전체를 큰 데이터프레임의 Partition이 있는 서버로 복사하여 Shuffling의 양을 최소화하는 JOIN 방식이다.
3. UDF (User Defined Function)
- 데이터프레임의 경우 .withColumn 함수와 같이 사용하는 것이 일반적이다.
- Spark SQL에서도 사용 가능하다. (DataFrame에서 사용되는 함수이기 때문에 SQL에서 사용 시 함수를 등록하는 방법이 다르다.)
- Aggregation 용 UDAF(User Defined Aggregation Function)도 존재한다.
- 이는 Pyspark에서는 지원되지 않아 Scala나 Java를 사용해야 한다.
1) 특정 컬럼을 대문자로 바꾸고 싶을 때
import pyspark.sql.functions as F
from pyspark.sql.types import *
upperUDF = F.udf(lambda z:z.upper())
df.withColumn("Curated Name", upperUDF("Name"))
def upper(s):
return s.upper()
upperUDF = spark.udf.register("upper", upper)
spark.sql("SELECT upper('aBcD')").show()
df.createOrReplaceTempView("test")
spark.sql("""SELECT name
, upper(name) "Curated Name"
FROM test
""").show()
2) a와 b 컬럼 값 더하기
data = [
{"a": 1, "b": 2},
{"a": 5, "b": 5}
]
df = spark.createDataFrame(data)
df.withColumn("c", F.udf(lambda x, y: x + y)("a", "b"))
def plus(x, y):
return x + y
plusUDF = spark.udf.register("plus", plus)
spark.sql("SELECT plus(1, 2)").show()
df.createOrReplaceTempView("test")
spark.sql("SELECT a, b, plus(a, b) c FROM test").show()
3) Aggregation 사용
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(FloatType())
def average(v: pd.Series) -> float:
return v.mean()
averageUDF = spark.udf.register('average', average)
spark.sql('SELECT average(b) FROM test').show()
df.agg(averageUDF("b").alias("count")).show()
4. Spark 데이터베이스와 테이블
1) Catalog와 Table
- Catalog
- 테이블과 뷰에 관한 메타 데이터 관리
- 인메모리 기반의 카탈로그를 제공
- 하지만 세션이 끝나면 해당 카탈로그는 사라지기 때문에 디스크 기반의 메타 스토어가 필요했음
- 그래서 Hive 호환 카탈로그를 제공 Persistent
- 메모리 기반 테이블/뷰
- 스토리지 기반 테이블
- HDFS와 Parquet 포맷을 사용
- Hive와 호환되는 메타스토어 사용
- 두 종류의 테이블이 존재하는데 Spark가 실제 데이터와 메타 데이터를 모두 관리하는 Managed Table과 Spark에서 메타 데이터만 관리하는 Unmanaged(External) Table이 있음
2) Managed Table
- 성능적인 면이나 여러 방면에서 Managed Table이 좋기 때문에 가능한 Managed Table을 쓰는 게 좋다.
- 두 가지를 사용해 테이블 생성 가능
- dataframe.saveAsTable("테이블명")
- SQL 문법 사용 (CREATE TABLE, CTAS)
- spark.sql.warehouse.dir가 가리키는 위치에 데이터가 저장되며 PARQUET가 기본 포맷이다.
3) External Table
- HDFS에 존재하는 데이터에 스키마를 정의해서 사용한다. (LOCATION이라는 property 사용)
- 메타데이터만 카탈로그에 기록된다. External Table이 사라지더라도 데이터는 유지된다.
CREATE TABLE TABLE_NAME(
COL1
, COL2
, COL3
)
USING PARQUET
LOCATION 'hdfs_path'
🔎 어려웠던 내용 & 새로 알게 된 내용
✍ 회고