SQL은 빅데이터 세상에서 중요함 !
- 데이터 분야에서 일하기 위해서는 필수적인 기술.
- 구조화된 데이터를 다루기 위해 SQL은 필히 사용됨.
- 모든 대용량 데이터 웨어하우스는 SQL 기반.
- Spark도 Spark SQL로 지원하고 있음.
Spark SQL이란,
- 데이터 프레임 작업을 SQL로 처리 가능.
- 데이터 프레임에 테이블 이름 지정 후 sql 함수 사용 가능. (판다스의 pandasql 모듈의 sqldf 함수도 동일)
- HQL(Hive-Query-Language)와도 호환이 됨.
Spark SQL vs. DataFrame.
- 만약 Spark SQL로 작업이 가능하다면 Spark SQL 사용하는 것이 좋음.
- SQL이 가독성이 더 좋고, 코딩 능력이 필요하지 않음.
- SQL이 최적화가 더 쉬움.
- SQL이 포팅이 더 쉽고, 접근 권한 체크도 쉬움.
Spark SQL 사용법.
- 데이터 프레임을 기반으로 테이블 뷰 생성.
- createOrReplaceTempView : SparkSession이 살아 있는 동안 존재.
- createOrReplaceGlobalTempView : Spark 드라이버가 살아 있는 동안 존재.
- SparkSession.sql("")로 SQL 실행.
SparkSession와 외부 데이터베이스 연결.
- SparkSession.read 로 외부 데이터베이스의 파일을 데이터프레임으로 읽어 옴.
Aggregation 함수.
- DataFrame이 아닌 SQL로 작성하는 편이 좋음.
- Group By. - SUM, MIN, ...
- Window. - ROW_NUMBER, ...
- Rank.
JOIN.
- 두 개 이상의 공통 필드를 갖는 테이블들을 머지.
- 각 테이블을 LEFT, RIGHT 테이블로 봄.
- INNER, FULL, CROSS, LEFT, RIGHT, SELF 조인 종류가 있음.
최적화 관점에서의 조인.
- Shuffle JOIN: 일반 조인 방식, Bucket JOIN(조인 키를 바탕으로 새로 파티션을 만들어 여러 개의 파티션을 하나의 파티션으로 조인하는 방식.)
- Broadcast JOIN: 큰 데이터와 작은 데이터 간의 조인. 작은 데이터 프레임을 큰 데이터 프레임으로 통합. spark.sql.autoBraodcastJoinThreshold 파라미터로 임계값 선정.
UDF 사용.
- 데이터 프레임(or Spark SQL)의 경우: .withColumn 함수와 함께 사용.
- Aggregation용 UDAF(User-Defined-Aggregation-Function): SUM, AVG와 같은 Aggregation 함수를 만드는 것. PySpark에서는 지원이 되지 않으므로 Scalar/Java를 사용해야 함.
UDF - DataFrame 1.
import pyspark.sql.functions as F
from pyspark.sql.types import *
upperUDF = F.udf(lambda z: z.upper())
df.withColumn("Curated Name", upperUDF("Name"))
UDF - DataFrame 2.
data = [
{"a": 1, "b": 2},
{"a": 5, "b": 5}
]
df = spark.createDataFrame(data)
df.withColumn("c", F.udf(lambda x, y: x + y)("a" ,"b"))
UDF - SQL 1.
def upper(s):
return s.upper()
# 테스트
upperUDF = spark.udf.register("upper", upper) # upper라는 이름으로 등록 후, upperUDF 객체에 저장.
spark.sql("SELECT upper('aBcD')").show()
# DataFrame 기반 SQL에 적용.
df.createOrReplaceTempView("test")
spark.sql("""SELECT name, upper(name) "Curated Name" FROM test""").show()
UDF - SQL 2.
def plus(x, y):
return x + y
plusUDF = spark.udf.register("plus", plus)
spark.sql("SELECT plus(1, 2)").show()
df.createOrReplaceTempView("test") # df는 DataFrame 2의 df임.
spark.sql("SELECT a, b, plus(a, b) c FROM test").show()
UDF - Pandas UDF Scalar Func. 가장 추천하는 방식(벌크 입력 가능).
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(StringType())
def upper_udf2(s: pd.Series) -> pd.Series: # 레코드를 하나씩 입력 받지 않고, 레코드 집합으로 받아 옴.
return s.str.upper()
upperUDF = spark.udf.register("upper_udf", upper_udf2) # upper_udf2 func를 upper_udf로 등록.
df.select("Name", upperUDF("Name")).show()
spark.sql("""SELECT name, upper_udf(name) `Curated Name` FROM test""").show()
UDF - DataFrame/SQL에 Aggregation 적용.
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(FloatType())
def average(v: pd.Series) -> float:
return v.mean()
averageUDF = spark.udf.register('average', average)
spark.sql('SELECT average(b) FROM test').show()
df.agg(averageUDF("b").alias("count")).show()
Spark 데이터베이스와 테이블.
- catalog: 테이블과 뷰에 관한 메타 데이터 관리. (SparkSession 밑에 있는 카탈로그. 메모리 기반), 세션이 끝나면 사라짐.
- Hive와 호환되는 카탈로그(디스크 기반) 제공. 세션이 끝나도 사라지지 않음.- 테이블 관리 방식.
- 테이블들은 데이터베이스 밑에 테이블로 존재함. ex) database.table
- 테이블의 성격에 따라 데이터베이스를 여러 개로 만들어두는 것이 좋음.
- 메모리 기반 테이블/뷰가 기본적으로 사용됨.
- 스토리지 기반 테이블
- 기본적으로 HDFS와 Parquet 포맷을 사용.
- Hive와 호환이 되는 메타스토어 사용.
- Hive와 마찬가지로 두 종류의 테이블이 존재함.
- Managed Table: Spark이 실제 데이터와 메타 데이터 모두 관리함.
- Unmanaged(External) Talbe: Spark이 메타 데이터만 관리.
- SparkSession 생성 시, .enableHiveSupport() 옵션을 추가해 줌.
Managed Table.
- dataframe.saveAsTable("table name")
- SQL: "CREATE TABLE, CTAS"
- spark.sql.warehouse.dir이 가리키는 위치에 데이터가 저장됨. (PARQUET이 기본 데이터 포맷)
- 성능이 좋다는 장점이 있음.
- Spark 테이블로 처리한다는 것도 장점임(JDBC/ODBC 등으로 Spark을 연결해서 접근 가능.)
Unmanaged(External) Table.
- 이미 HDFS에 존재하는 데이터에 스키마를 정의해서 사용. (LOCATION이란 프로퍼티 사용.)
- 메타데이터만 카탈로그에 기록되며, 데이터는 이미 존재함. 따라서, External table을 drop해도 데이터는 사라지지 않음.
Unit Test란,
- 코드의 특정 기능(함수)를 테스트하기 위해 작성되는 코드.
- 정해진 입력 -> 정해진 결과 테스트.
- CI/CD를 사용하려면 전체 코드의 테스트 커버리지가 굉장히 중요해 짐.
- 각 언어별로 정해진 테스트 프레임웤을 사용하는 것이 일반적임.
- Java: JUnit
- Python: unittest
Unit Test 사용.
- 앞에서의 upper_udf_f UDF, load_gender, get_gender_count를 테스트.
```python
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import *
import pandas as pd
@pandas_udf(StringType())
def upper_udf_f(s: pd.Series) -> pd.Series:
return s.str.upper()
upperUDF = spark.udf.register("upper_udf", upper_udf_f)
# 테스트용 함수.
def load_gender(spark, file_path):
return spark.read.option("header", True).csv(file_path)
def get_gender_count(spark, df, field_to_count):
df.createOrReplaceTempView("namegender_test")
return spark.sql(f"SELECT {field_to_count}, COUNT(1) count FROM namegender_test GROUP BY 1")
################################################
# unittest
from unittest import TestCase
# upper_udf_f, load_gender, get_gender_count를 테스트할 예정.
# 원래는 별개의 파일로 존재하여 이를 임포트하여 테스트함.
class UtilsTestCase(TestCase):
spark = None
# 자원 할당.
@classmethod
def setUpClass(cls) -> None:
cls.spark = SparkSession.builder \
.appName("") \
.getOrCreate()
def test_datefile_loading(self):
sample_df = load_gender(self.spark, "name_gender.csv")
result_count = sample_df.count()
self.assertEqual(result_count, 100, "Record count shoud be 100")
# 위처럼 테스트 코드 작성.
...
# 자원 반환.
@classmethod
def tearDownClass(cls) -> None:
cls.spark.stop()
import unittest
unittest.main(argv=[''], verbosity=2, exit=False) # 테스트 수행.
```