Apache Spark Dataframe API

Yuni·2022년 9월 12일
1

Spark Dataframe 이란?

Spark에서는 Dataframe 사용이 권장되며, Spark RDD에 스키마를 입힌 것이 Dataframe이라고 알면 된다. 즉 Spark 기본인 RDD에서 출발하여 더 발전된 형태로 활용할 수 있는 것이 Spark Dataframe이다.

  • Spark Core -> RDD SparkContext
  • Spark SQL -> Dataframe SparkSession

RDD에서 발전된 형태

Spark Dataframe의 경우 자동으로 연산 최적화를 수행한다.

RDDDataframe
- 데이터의 구조를 모르기 때문에 데이터를 다루는 것을 개발자에게 의존하게 됨
- Map, filter 등 유저가 만든 function을 그대로 수행하기 때문에 개발된 function에 따라 성능이 크게 영향을 받을 수 있음
- 데이터의 구조가 정해져 있고 이미 알고 있으므로 어떤 태스크를 수행할지 정의만 하면 됨
- Spark 내부 엔진이 최적화를 알아서 해줌
# RDD의 경우 사용자의 function 사용에 따라 성능이 달라진다.
RDD.map(A).filter(B).reduceByKey(C).take(100)
RDD.map(A).reduceByKey(C).filter(B).take(100)

Spark SQL

스파크 인스턴스 생성하기
spark = SparkSession.builder.appName("test-app").getOrCreate()


from pyspark.sql import SparkSession

# 스키마 자동으로 유추하기
lines = sc.textFile("example.csv")
data = lines.map(lambda x: x.split(","))
preprocessed = data.map(lambda x: Row(name=x[0], price=int(x[1])))

df = spark.createDataFrame(preprocessed)  # infer

# 스키마 사용자가 정의하기
schema = StructType(
	StructField("name", StringType(), True),
    StructField("price", StringType(), True)
)
spark.createDataFrame(preprocessed, schema).show() # specify


User Defined Functions

Sql 문 안에서 사용할 수 있는 사용자 정의 함수 공식 docx

일반적인 UDF 선언

import pyspark.sql.types as T

# 함수 선언
def squared(n):
	return n*n

# UDF 정의 
spark.udf. register("squared", squared, T.LongType())
# UDF실행
spark.sql("SELECT price, squared(price) FROM transactions").show()

# 공식 문서 예시 
import random
random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()

보다 파이써닉하게

import pyspark.sql.functions as F

@F.udf("long")
def squared(n):
	return n*n
# UDF실행
spark.sql("SELECT price, squared(price) FROM transactions").show()

# 공식 문서 예시
from pyspark.sql.types import IntegerType
slen = udf(lambda s: len(s), IntegerType())
@udf
def to_upper(s):
    if s is not None:
        return s.upper()

@udf(returnType=IntegerType())
def add_one(x):
    if x is not None:
        return x + 1

df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()


출처

실시간 빅데이터 처리를 위한 Spark & Flink (패스트캠퍼스 강의)

profile
1차전직 DA 2차전직 DE

0개의 댓글