[TIL 63일자] 데브코스 데이터엔지니어링

·2023년 7월 6일
0

데브코스

목록 보기
52/55
post-thumbnail
post-custom-banner

📚 오늘 공부한 내용

1. Spark SQL

1) Spark SQL이란?

  • Spark SQL은 구조화된 데이터 처리를 위한 Spark 모듈이다.
  • 데이터 프레임 작업을 SQL로 처리 가능하다.
  • 데이터 프레임에 테이블 이름 지정 후 SQL 함수로 사용 가능하다.
  • HQL(Hive Query Language)와 호환을 제공한다. 보통 Hive와 Spark 두 개의 시스템을 동시에 운영하는 것이 일반적이다.

2) Spark SQL 장점

  • SQL로 작업이 가능하다면 DataFrame을 사용할 이유가 없다.
  • SQL이 가독성이 더 좋고, 더 많은 사람들이 사용 가능하다. (Familiarity/Readability)
  • Spark SQL 엔진이 최적화하기에 더 좋다. (Optimization)
    • Catalyst, Optimizer와 Project Tungsten
  • SQL이 포팅이 쉽고, 접근 권한도 확인하기 쉽다. 만약 권한이 없다면 오류를 내면 된다. (Interoperability/Data Management)

3) Spark SQL 사용 방법

  • 데이터 프레임 기반으로 테이블 뷰를 생성한다.
    • createOrReplaceTempView: Session이 살아 있는 동안 존재
    • createOrReplaceGlobalTempView: Spark 드라이버가 살아 있는 동안 존재
  • Spark Session의 SQL 함수로 결과를 데이터 프레임으로 받는다.
  • SparkSession을 사용해서 외부 데이터 베이스와 연결이 필요할 경우 SparkSession으로 read 함수를 호출한다. (결과는 데이터 프레임)
df_user_session_channel = spark.read \
 .format("jdbc") \
 .option("driver", "com.amazon.redshift.jdbc42.Driver") \
 .option("url", "jdbc:redshift://HOST:PORT/DB?user=ID&password=PASSWORD") \
 .option("dbtable", "raw_data.user_session_channel") \
 .load()


2. JOIN

1) 최적화 관점에서 본 조인의 종류

  • Spark SQL 사용 시 최적화가 필요해진다. 데이터셋의 크기가 크면 Partition이 여러 개가 발생하게 되고 Shuffle 과정에서 Data Skew가 발생할 수도 있고, 그렇게 되면 속도가 늦어진다. 즉, Performance 측면에서는 Shuffle이 적어야 한다.
  • Shuffle JOIN
    • 일반 조인 방식
    • Bucket JOIN: Join key를 바탕으로 새로 Partition을 미리 만들어 두고 JOIN 하는 방식. (Shuffling이 발생하지 않음)
  • Broadcast JOIN
    • 큰 데이터와 작은 데이터 간의 조인 (둘을 비교했을 때 하나의 데이터가 작아야 함)
    • 데이터 프레임 하나가 충분히 작으면 작은 데이터 프레임을 다른 데이터 프레임이 있는 서버로 뿌리는 것.
    • 이때 작다의 기준을 잡아 주는 파라미터가 존재한다. spark.sql.autoBroadcastJoinThreshold로 결정한다.

2) Shuffle JOIN

  • 이렇게 두 개의 데이터프레임이 shuffling을 하면서 새로운 Partition으로 복사되며 합쳐지는 것을 Shuffle JOIN이라고 한다.
  • 이렇게 된다면 엄청난 이동이 발생하게 되며 이동 후에 Partition의 크기가 달라질 수도 있다.
  • 일반적으로는 이 JOIN이 사용된다.
  • 이때 Left DataFrame과 Right DataFrame의 데이터를 조회 혹은 로딩해 올 때부터 미리 Partition을 만들어 두고 JOIN KEY가 같은 데이터끼리 JOIN을 하는 것이 Bucket JOIN이다. (Bucket JOIN은 Shuffling이 필요하지 않다.)

3) Broadcast JOIN

  • 누가 봐도 차이가 확연한 엄청 큰 데이터프레임과 작은 데이터프레임을 두고, 작은 데이터프레임 전체를 큰 데이터프레임의 Partition이 있는 서버로 복사하여 Shuffling의 양을 최소화하는 JOIN 방식이다.


3. UDF (User Defined Function)

  • 데이터프레임의 경우 .withColumn 함수와 같이 사용하는 것이 일반적이다.
  • Spark SQL에서도 사용 가능하다. (DataFrame에서 사용되는 함수이기 때문에 SQL에서 사용 시 함수를 등록하는 방법이 다르다.)
  • Aggregation 용 UDAF(User Defined Aggregation Function)도 존재한다.
  • 이는 Pyspark에서는 지원되지 않아 Scala나 Java를 사용해야 한다.

1) 특정 컬럼을 대문자로 바꾸고 싶을 때

  • UDF 사용
import pyspark.sql.functions as F
from pyspark.sql.types import *

upperUDF = F.udf(lambda z:z.upper())
df.withColumn("Curated Name", upperUDF("Name"))
  • SQL 사용
def upper(s):
 return s.upper()
 
upperUDF = spark.udf.register("upper", upper)
spark.sql("SELECT upper('aBcD')").show()

df.createOrReplaceTempView("test")
spark.sql("""SELECT name
				  , upper(name) "Curated Name" 
              FROM test
          """).show()

2) a와 b 컬럼 값 더하기

  • UDF 사용
data = [
 {"a": 1, "b": 2},
 {"a": 5, "b": 5}
]

df = spark.createDataFrame(data)
df.withColumn("c", F.udf(lambda x, y: x + y)("a", "b"))
  • SQL 사용
def plus(x, y):
 return x + y
 
plusUDF = spark.udf.register("plus", plus)
spark.sql("SELECT plus(1, 2)").show()

df.createOrReplaceTempView("test")
spark.sql("SELECT a, b, plus(a, b) c FROM test").show()

3) Aggregation 사용

  • UDF
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf(FloatType())
def average(v: pd.Series) -> float:
 return v.mean()
 

averageUDF = spark.udf.register('average', average)
  • SQL
spark.sql('SELECT average(b) FROM test').show()
df.agg(averageUDF("b").alias("count")).show()

4. Spark 데이터베이스와 테이블

1) Catalog와 Table

  • Catalog
    • 테이블과 뷰에 관한 메타 데이터 관리
    • 인메모리 기반의 카탈로그를 제공
    • 하지만 세션이 끝나면 해당 카탈로그는 사라지기 때문에 디스크 기반의 메타 스토어가 필요했음
    • 그래서 Hive 호환 카탈로그를 제공 Persistent
  • 메모리 기반 테이블/뷰
    • 임시 테이블
  • 스토리지 기반 테이블
    • HDFS와 Parquet 포맷을 사용
    • Hive와 호환되는 메타스토어 사용
    • 두 종류의 테이블이 존재하는데 Spark가 실제 데이터와 메타 데이터를 모두 관리하는 Managed TableSpark에서 메타 데이터만 관리하는 Unmanaged(External) Table이 있음

2) Managed Table

  • 성능적인 면이나 여러 방면에서 Managed Table이 좋기 때문에 가능한 Managed Table을 쓰는 게 좋다.
  • 두 가지를 사용해 테이블 생성 가능
    • dataframe.saveAsTable("테이블명")
    • SQL 문법 사용 (CREATE TABLE, CTAS)
  • spark.sql.warehouse.dir가 가리키는 위치에 데이터가 저장되며 PARQUET가 기본 포맷이다.

3) External Table

  • HDFS에 존재하는 데이터에 스키마를 정의해서 사용한다. (LOCATION이라는 property 사용)
  • 메타데이터만 카탈로그에 기록된다. External Table이 사라지더라도 데이터는 유지된다.
CREATE TABLE TABLE_NAME(
	  COL1
    , COL2
    , COL3
) 
USING PARQUET
LOCATION 'hdfs_path'

🔎 어려웠던 내용 & 새로 알게 된 내용



✍ 회고

profile
송의 개발 LOG
post-custom-banner

0개의 댓글