데이터 병렬 처리가 가능하려면 우선 데이터를 분산시키고, 그렇게 나눠진 데이터를 각각 따로 처리해야 한다.
(Excutor의 수)*(Excutor당 CPU의 수)
sort
, group by
, filter
, map
, join
, ...sort
, group by
과 같은 연산은 파티션 간에 데이터 이동이 불가피하게 일어남! => 셔플링group by
와 같은 aggregation이나 sort
)sort
: range partition 사용aggregation
: hashing partition 사용spark script 작동 원리
parallelize
함수로 RDD 변환collect
로 파이썬 데이터로 변환 가능spark.executor.memory
(기본값 : 1g)spark.executor.cores
(YARN에서는 기본값 : 1)spark.driver.memory
(기본값 : 1g)spark.sql.shuffle.partitions
(기본값 : 200; 최대값을 설정하는 것)$SPARK_HOME/conf/spark_defaults.conf
spark.read(Dataframe)
를 사용하여 df로 로드Dataframe.write(DataframeWriter)
를 사용하여 df를 저장local[n]
지정# 1. builder를 사용하여 만들기
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark Tutorial')\
.config("spark.some.config.option1", "some-value") \
.config("spark.some.config.option2", "some-value") \
.getOrCreate()
# SparkConf를 사용하여 만들기
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.app.name", "PySpark Tutorial")
conf.set("spark.master", "local[*]")
spark = SparkSession.builder\
.config(conf=conf) \
.getOrCreate()
# session은 이미 만들었다고 가정한다.
name_list = ['{"name":"danee"}', ...]
# Python -> RDD
# lazy executor이므로 연산이 실행되어야 표출
rdd = spark.sparkContext.parallelize(name_list)
# 저장된 RDD를 불러와서 json 형태로 변환
parsed_rdd = rdd.map(lambda x : json.loads(x))
# spark에서 불러오기
parsed_rdd.collect()
# Python -> Spark df
from pyspark.sql.types import StringType
# spark.createDataFrame(list, 원소 타입)
df = spark.createDataFrame(name_list, StringType())
# RDD -> Spark df
df_parsed_rdd = parsed_rdd.toDF()
# spark df schema 확인
df.prictSchema()
# df 확인
df.show(n, truncate=True)
df.head(n)
# CSV -> Spark df
## 헤더 없을 때
## 컬럼 이름이 없으므로 spark에서 임의로 설정
df = spark.read.csv("~~.csv")
df = spark.read.format("csv").load("~~.csv")
## spark df 컬럼 이름 사용자가 지정
## 단, csv의 컬럼 수와 동일한 수의 컬럼명을 넣어야 한다.
df = spark.read.csv("~~.csv")\
.toDF('col1', 'col2', ... , 'colN')
## 헤더 있을 때
df = spark.read.option("header", True).csv("~~.csv")
# 타입 지정
## Spark에서 자동 지정
df = spark.read\
.option('inferSchema', 'true')
.csv('~~.csv')\
.toDF('col1', 'col2', ... , 'colN')
## 사용자가 직접 지정
## StructType('col_name', type, null 여부)
from pyspark.sql.types import StringType, IntegerType, ...
from pyspark.sql.types import StructType, StructField
schema = StrucType([
StructField('col1', StringType(), True),
StructField('col2', IntegerType(), False),
...
])
df = spark.read.schema(schema).csv('~~.csv')
# Spark df -> CSV
# 분산 처리할 만큼 큰 데이터를 처리한다고 가정(블록 단위로 데이터 저장)
# 주어진 이름과 동일한 폴더를 만들고, 그 폴더 안에 블록 단위로 part_로 시작하는 파일로써 저장한다.
## 아래의 2개는 같은 것
## 기존에 같은 이름의 폴더가 있다면 에러
df.write.csv('~~~')
df.write.format('csv').save('~~~')
## 기존에 같은 이름의 폴더가 있다면 덮어쓰기
df.write.mode('overwrite').csv('~~~')
# 컬럼 선택하기 (아래 2가지는 모두 같은 결과)
stationTemp = df[['station', 'temp']]
stationTemp = df.selct('station', 'temp')
# 컬럼 삭제하기
df_new = df.drop('col')
# 필터링 방법 (아래 3가지는 모두 같은 결과)
minTemps = df.filter(df.measure == 'min')
minTemps = df.where(df.measure == 'min')
minTemps = df.where("df.measure = 'min'")
# groupby
minTempsByStation = minTemps.groupBy('station').min('temp')
# 연산 진행한 컬럼명 변경하기
## 1. withColumnRenamed('기존의 이름', '변경 이름') 사용
## 한번에 하나의 컬럼만 바꿀 수 있다.
df_group = df.groupBy('col1').sum('col2')\
.withColumnRenamed('기존의 이름', '변경 이름')
## 2. .agg() 사용하기
## 여러개의 컬럼을 한번에 바꿀 수 있다.
import pyspark.sql.functions as f
df_group = df.groupBy('col1')\
.agg(f.sum('col2').alias('new_sum'),
f.max('col3').alias('new_max'),
...)
# 기존의 데이터를 사용해 새로운 컬럼 만들기
# withColumn(('수정/추가할 컬럼', UDF(채우고 싶은 데이터 처리 함수)))
df_new = df\
.withColumn('col1', sum('col2'))
# 정규표현식 사용하기
# regexp_extract(컬럼명, '정규표현식', 표현식 위치)
# 표현식 위치의 인덱스는 1에서 시작
from pyspark.sql.function import *
regex_str = r'~~(\S+) ~~~ (\d+) ~~~'
df_new = df\
.withColumn('col1', regexp_extract('origin', regex_str, 1))
.withColumn('col2', regexp_extract('origin', regex_str, 2))
# 하나의 row가 array일 때, 원소 하나를 1개의 row로 펼치기
# f.explode(df.컬럼명)
import pyspark.sql.functions as f
df_new = df.select(f.explode(df.col1).alias('explode col1'))
# sorting
import pyspark.sql.functions as f
## 내림차순
df.sort(f.desc('col'))
df.orderby('col', ascending=False)
## 오름차순
df.sort(f.asc('col'))
df.orderby('col', ascending=True)
# join
# 종류 : inner, left, right, outer, semi, anti
join_exp = left_df.id == right_df.id
join_df = left_df.join(right_df, join_exp, "inner")
앞서 보였던 예시 중 Spark Dataframe으로 처리하는 것 보다 SQL로 처리하는 것이 더 효율적인 경우가 대부분이다.
SQL로 처리할 수 있다면 SQL로 처리하고 그 외의 것들을 Dataframe으로 처리하도록 하자.
(예. 정규표현식을 dataframe으로!)
# dataframe을 sql로 처리하기 위해 임시 테이블명을 만든다.
df.createOrReplaceTempView('table_nm')
# sql 실행
df_new = df.sql('''SELECT *
FROM table_nm
WHERE ~~~
GROUP BY ~~~
''')
# Spark에서 사용하는 table 리스트 확인
# Hive가 연동되어 있다면 Hive의 테이블도 보여준다.
spark.catalog.listTables()
분량 진짜 너무 많다... 다 듣다가 기절하는 줄 알았다...
이 글을 쓰는데만 1시간이 넘게 걸렸다.... 이게 무슨 일이지?! 하지만 정말로 데이터 전처리하는 기분이라서 재미있었다.
생각보다 python의 dataframe과는 함수가 다르기 때문에 pyspark를 사용하여 만들기에는 시간이 걸릴 것 같다. 좀더 다양한 데이터로 이것저것 테스트하고 싶다.