Spark에서는 Dataframe 사용이 권장되며, Spark RDD에 스키마를 입힌 것이 Dataframe이라고 알면 된다. 즉 Spark 기본인 RDD에서 출발하여 더 발전된 형태로 활용할 수 있는 것이 Spark Dataframe이다.
SparkContext
SparkSession
Spark Dataframe의 경우 자동으로 연산 최적화를 수행한다.
RDD | Dataframe |
---|---|
- 데이터의 구조를 모르기 때문에 데이터를 다루는 것을 개발자에게 의존하게 됨 - Map, filter 등 유저가 만든 function을 그대로 수행하기 때문에 개발된 function에 따라 성능이 크게 영향을 받을 수 있음 | - 데이터의 구조가 정해져 있고 이미 알고 있으므로 어떤 태스크를 수행할지 정의만 하면 됨 - Spark 내부 엔진이 최적화를 알아서 해줌 |
# RDD의 경우 사용자의 function 사용에 따라 성능이 달라진다.
RDD.map(A).filter(B).reduceByKey(C).take(100)
RDD.map(A).reduceByKey(C).filter(B).take(100)
스파크 인스턴스 생성하기
spark = SparkSession.builder.appName("test-app").getOrCreate()
from pyspark.sql import SparkSession
# 스키마 자동으로 유추하기
lines = sc.textFile("example.csv")
data = lines.map(lambda x: x.split(","))
preprocessed = data.map(lambda x: Row(name=x[0], price=int(x[1])))
df = spark.createDataFrame(preprocessed) # infer
# 스키마 사용자가 정의하기
schema = StructType(
StructField("name", StringType(), True),
StructField("price", StringType(), True)
)
spark.createDataFrame(preprocessed, schema).show() # specify
Sql 문 안에서 사용할 수 있는 사용자 정의 함수 공식 docx
import pyspark.sql.types as T
# 함수 선언
def squared(n):
return n*n
# UDF 정의
spark.udf. register("squared", squared, T.LongType())
# UDF실행
spark.sql("SELECT price, squared(price) FROM transactions").show()
# 공식 문서 예시
import random
random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()
import pyspark.sql.functions as F
@F.udf("long")
def squared(n):
return n*n
# UDF실행
spark.sql("SELECT price, squared(price) FROM transactions").show()
# 공식 문서 예시
from pyspark.sql.types import IntegerType
slen = udf(lambda s: len(s), IntegerType())
@udf
def to_upper(s):
if s is not None:
return s.upper()
@udf(returnType=IntegerType())
def add_one(x):
if x is not None:
return x + 1
df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()
실시간 빅데이터 처리를 위한 Spark & Flink (패스트캠퍼스 강의)