[데이터 엔지니어링 데브코스 2기] TIL-14주차 빅데이터 처리 시스템, Hadoop, Spark (3)

이재호·2024년 1월 17일
0

1. Spark SQL


SQL은 빅데이터 세상에서 중요함 !

  • 데이터 분야에서 일하기 위해서는 필수적인 기술.
  • 구조화된 데이터를 다루기 위해 SQL은 필히 사용됨.
  • 모든 대용량 데이터 웨어하우스는 SQL 기반.
  • Spark도 Spark SQL로 지원하고 있음.

Spark SQL이란,

  • 데이터 프레임 작업을 SQL로 처리 가능.
  • 데이터 프레임에 테이블 이름 지정 후 sql 함수 사용 가능. (판다스의 pandasql 모듈의 sqldf 함수도 동일)
  • HQL(Hive-Query-Language)와도 호환이 됨.

Spark SQL vs. DataFrame.

  • 만약 Spark SQL로 작업이 가능하다면 Spark SQL 사용하는 것이 좋음.
  • SQL이 가독성이 더 좋고, 코딩 능력이 필요하지 않음.
  • SQL이 최적화가 더 쉬움.
  • SQL이 포팅이 더 쉽고, 접근 권한 체크도 쉬움.

Spark SQL 사용법.

  • 데이터 프레임을 기반으로 테이블 뷰 생성.
    - createOrReplaceTempView : SparkSession이 살아 있는 동안 존재.
    • createOrReplaceGlobalTempView : Spark 드라이버가 살아 있는 동안 존재.
  • SparkSession.sql("")로 SQL 실행.

SparkSession와 외부 데이터베이스 연결.

  • SparkSession.read 로 외부 데이터베이스의 파일을 데이터프레임으로 읽어 옴.

Aggregation 함수.

  • DataFrame이 아닌 SQL로 작성하는 편이 좋음.
  • Group By. - SUM, MIN, ...
  • Window. - ROW_NUMBER, ...
  • Rank.

JOIN.

  • 두 개 이상의 공통 필드를 갖는 테이블들을 머지.
  • 각 테이블을 LEFT, RIGHT 테이블로 봄.
  • INNER, FULL, CROSS, LEFT, RIGHT, SELF 조인 종류가 있음.

최적화 관점에서의 조인.

  • Shuffle JOIN: 일반 조인 방식, Bucket JOIN(조인 키를 바탕으로 새로 파티션을 만들어 여러 개의 파티션을 하나의 파티션으로 조인하는 방식.)
  • Broadcast JOIN: 큰 데이터와 작은 데이터 간의 조인. 작은 데이터 프레임을 큰 데이터 프레임으로 통합. spark.sql.autoBraodcastJoinThreshold 파라미터로 임계값 선정.

2. UDF(User-Defined-Function)


UDF 사용.

  • 데이터 프레임(or Spark SQL)의 경우: .withColumn 함수와 함께 사용.
  • Aggregation용 UDAF(User-Defined-Aggregation-Function): SUM, AVG와 같은 Aggregation 함수를 만드는 것. PySpark에서는 지원이 되지 않으므로 Scalar/Java를 사용해야 함.

UDF - DataFrame 1.

import pyspark.sql.functions as F
from pyspark.sql.types import *

upperUDF = F.udf(lambda z: z.upper())
df.withColumn("Curated Name", upperUDF("Name"))

UDF - DataFrame 2.

data = [
	{"a": 1, "b": 2},
    {"a": 5, "b": 5}
]

df = spark.createDataFrame(data)
df.withColumn("c", F.udf(lambda x, y: x + y)("a" ,"b"))

UDF - SQL 1.

def upper(s):
	return s.upper()
    
# 테스트
upperUDF = spark.udf.register("upper", upper) # upper라는 이름으로 등록 후, upperUDF 객체에 저장.
spark.sql("SELECT upper('aBcD')").show()

# DataFrame 기반 SQL에 적용.
df.createOrReplaceTempView("test")
spark.sql("""SELECT name, upper(name) "Curated Name" FROM test""").show()

UDF - SQL 2.

def plus(x, y):
	return x + y

plusUDF = spark.udf.register("plus", plus)
spark.sql("SELECT plus(1, 2)").show()

df.createOrReplaceTempView("test") # df는 DataFrame 2의 df임.
spark.sql("SELECT a, b, plus(a, b) c FROM test").show()

UDF - Pandas UDF Scalar Func. 가장 추천하는 방식(벌크 입력 가능).

from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf(StringType())
def upper_udf2(s: pd.Series) -> pd.Series: # 레코드를 하나씩 입력 받지 않고, 레코드 집합으로 받아 옴.
	return s.str.upper()

upperUDF = spark.udf.register("upper_udf", upper_udf2) # upper_udf2 func를 upper_udf로 등록.

df.select("Name", upperUDF("Name")).show()
spark.sql("""SELECT name, upper_udf(name) `Curated Name` FROM test""").show()

UDF - DataFrame/SQL에 Aggregation 적용.

from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf(FloatType())
def average(v: pd.Series) -> float:
	return v.mean()

averageUDF = spark.udf.register('average', average)

spark.sql('SELECT average(b) FROM test').show()
df.agg(averageUDF("b").alias("count")).show()

3. Hive 메타스토어


Spark 데이터베이스와 테이블.

  • catalog: 테이블과 뷰에 관한 메타 데이터 관리. (SparkSession 밑에 있는 카탈로그. 메모리 기반), 세션이 끝나면 사라짐.
    - Hive와 호환되는 카탈로그(디스크 기반) 제공. 세션이 끝나도 사라지지 않음.
  • 테이블 관리 방식.
    - 테이블들은 데이터베이스 밑에 테이블로 존재함. ex) database.table
    • 테이블의 성격에 따라 데이터베이스를 여러 개로 만들어두는 것이 좋음.
  • 메모리 기반 테이블/뷰가 기본적으로 사용됨.
  • 스토리지 기반 테이블
    - 기본적으로 HDFS와 Parquet 포맷을 사용.
    • Hive와 호환이 되는 메타스토어 사용.
    • Hive와 마찬가지로 두 종류의 테이블이 존재함.
      • Managed Table: Spark이 실제 데이터와 메타 데이터 모두 관리함.
      • Unmanaged(External) Talbe: Spark이 메타 데이터만 관리.
  • SparkSession 생성 시, .enableHiveSupport() 옵션을 추가해 줌.

Managed Table.

  • dataframe.saveAsTable("table name")
  • SQL: "CREATE TABLE, CTAS"
  • spark.sql.warehouse.dir이 가리키는 위치에 데이터가 저장됨. (PARQUET이 기본 데이터 포맷)
  • 성능이 좋다는 장점이 있음.
  • Spark 테이블로 처리한다는 것도 장점임(JDBC/ODBC 등으로 Spark을 연결해서 접근 가능.)

Unmanaged(External) Table.

  • 이미 HDFS에 존재하는 데이터에 스키마를 정의해서 사용. (LOCATION이란 프로퍼티 사용.)
  • 메타데이터만 카탈로그에 기록되며, 데이터는 이미 존재함. 따라서, External table을 drop해도 데이터는 사라지지 않음.

4. Unit Test


Unit Test란,

  • 코드의 특정 기능(함수)를 테스트하기 위해 작성되는 코드.
  • 정해진 입력 -> 정해진 결과 테스트.
  • CI/CD를 사용하려면 전체 코드의 테스트 커버리지가 굉장히 중요해 짐.
  • 각 언어별로 정해진 테스트 프레임웤을 사용하는 것이 일반적임.
    - Java: JUnit
    • Python: unittest

Unit Test 사용.

  • 앞에서의 upper_udf_f UDF, load_gender, get_gender_count를 테스트.
```python
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import *
import pandas as pd

@pandas_udf(StringType())
def upper_udf_f(s: pd.Series) -> pd.Series:
	return s.str.upper()

upperUDF = spark.udf.register("upper_udf", upper_udf_f)

# 테스트용 함수.
def load_gender(spark, file_path):
	return spark.read.option("header", True).csv(file_path)

def get_gender_count(spark, df, field_to_count):
	df.createOrReplaceTempView("namegender_test")
    return spark.sql(f"SELECT {field_to_count}, COUNT(1) count FROM namegender_test GROUP BY 1")

################################################
# unittest
from unittest import TestCase

# upper_udf_f, load_gender, get_gender_count를 테스트할 예정.
# 원래는 별개의 파일로 존재하여 이를 임포트하여 테스트함.

class UtilsTestCase(TestCase):
	spark = None
    
    # 자원 할당.
    @classmethod
    def setUpClass(cls) -> None:
    	cls.spark = SparkSession.builder \
        	.appName("") \
            .getOrCreate()
	
    def test_datefile_loading(self):
    	sample_df = load_gender(self.spark, "name_gender.csv")
        result_count = sample_df.count()
        self.assertEqual(result_count, 100, "Record count shoud be 100")
	
    # 위처럼 테스트 코드 작성.
    ...
    
    # 자원 반환.
    @classmethod
    def tearDownClass(cls) -> None:
    	cls.spark.stop()

	import unittest
    unittest.main(argv=[''], verbosity=2, exit=False) # 테스트 수행.

```
profile
천천히, 그리고 꾸준히.

0개의 댓글