Spark SQL의 목적
- 스파크 프로그래밍 내부에서 관계형 처리를 하기 위해
- 스키마의 정보를 이용해 자동으로 최적화를 하기 위해
- 외부 데이터셋을 사용하기 쉽게하기 위해
Spark SQL 소개
- 스파크 위에 구현된 하나의 패키지
- 3개의 주요 API
- 2개의 백엔드 컴포넌트
- Catalyst -쿼리 최적화 엔진
- Tungsten - 시리얼라이저
DataFrame
- Spark Core에 RDD가 있다면 Spark SQL엔 DataFrame이 있다
- DataFrame은 테이블 데이터셋이라고 보면 된다
- 개념적으론 RDD에 스키마가 적용된 것으로 보면 된다
SparkSession
- Spark Core에 SparkContext가 있다면 Spark SQL엔 SparkSession이 있다
sprak = SparkSession.builder.appName("test-app").getOrCreate()
DataFrame 만들기
- RDD에서 스키마를 정의한 다음 변형을 하거나
- CSV, JSON 등의 데이터를 받아오면 된다
RDD로부터 DataFrame 만들기
- Schema를 자동으로 유추해서 DataFrame 만들기
- Schema를 사용자가 정의하기
lines = sc.textFile("example.csv")
data = lines.map(lambda x: x.split(","))
preprocessed = data.map(lambda x: Row(name=x[0], price=int(x[1])))
df = spark.createDataFrame(preprocessed)
schema = StructType(
StructField("name", StringType(), True)
StructField("price", StringType(), True)
)
spark.createDataFrame(preprocessed, schema).show()
파일로부터 DataFrame 만들기
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test-app").getOrCreate()
dataframe = sprak.read.json('dataset/nyt2.json')
dataframe_txt = spark.read.text('text_data.txt')
dataframe_csv = spark.read.csv('csv_data.csv')
dataframe_parquet = spark.read.load('parquet_data.parquet')
createOrReplace TempView
- DataFrame을 하나의 데이터베이스 테이블처럼 사용하려면 createOrReplaceTempView() 함수로 temporary view를 만들어줘야한다
data.createOrReplaceTempView("mobility_data")
spark.sql("SELECT pickup_datetime FROM mobility_data LIMIT 5").show()
Spark 에서 사용할 수 있는 SQL문
- Hive Query Language와 거의 동일
- Select
- From
- Where
- Count
- Having
- Group By
- Order By
- Sort By
- Distinct
- Join
Python 에서 Spark SQL 사용하기
- SparkSession
- Spark 세션으로 불러오는 데이터는 DataFrame
- SQL문을 사용해서 쿼리가 가능하다
- 함수를 사용해서 쿼리가 가능하다
- DataFrame을 RDD로 변환해 사용할수도 있다
- rdd=df.rdd.map(tuple)
- (하지만 RDD를 덜 사용하는 쪽이 좋다)
DataFrame의 이점
- MLLib이나 Spark Streaming 같은 다른 스파크 모듈들과 사용하기 편하다
- 개발하기 편하다
- 최적화도 알아서 된다
Datasets
- Type이 있는 DataFrame
- PySpark에선 크게 신경쓰지 않아도 된다
Summary
- Spark SQL
- DataFrame
- Datasets