특강
Spark vs. MapReduce
Spark | MapReduce |
---|---|
기본적으로 메모리 기반, 다양한 컴퓨팅 지원, pandas 데이터프레임과 개념적으로 동일한 데이터 구조 지원 | 디스크 기반, 하둡(YARN) 위에서만 동작, key-value 기반 데이터 구조만 지원 |
Spark 데이터 처리 흐름
셔플링: 파티션 간에 데이터 이동이 필요한 경우 발생
Spark DataFrame 실습
df = spark.read.format("csv")\
.option("inferSchema", "true")\
.load("1800.csv")\
.toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")
여기서 inferSchema 옵션은 원래 default가 False
. 직역하면 스키마를 추론한다 → Spark가 해당 DataFrame을 만들 때 앞의 레코드들을 샘플링 해서 본 뒤 type이 무엇인지 추측하는 것임.
from pyspark.sql.types import StringType, IntegerType, FloatType
from pyspark.sql.types import StructType, StructField
schema = StructType([ \
StructField("stationID", StringType(), True), \
StructField("date", IntegerType(), True), \
StructField("measure_type", StringType(), True), \
StructField("temperature", FloatType(), True)])
앞 코드에서는 추론해보라고 했지만 이 코드는 명시적으로 알려주는 코드.
from pyspark.sql.functions import col, column
stationTemps = minTemps.select(
"stationID",
col("stationID"),
column("stationID"),
minTemps.stationID
)
Window 함수: ROWS BETWEEN AND
SELECT value FROM rows_test;
SELECT
SUM(value) OVER(
order by value
rows between 2 preceding and 2 following -- 뜻: 자기 기준 앞 2개 & 뒤 2개. 숫자 대신 unbounded도 가능
) AS rolling_sum
FROM rows_test;
Parquet: Spark의 기본 파일 format. 하나의 데이터 블록은 하나의 Row Group으로 구성됨
spark.sql.autoBroadcastJoinThredshold
파라미터로 데이터프레임 하나가 충분히 작은지 여부 결정