SPARK_HOME
및 JAVA_HOME
관련 환경 변수 세팅이 필요하기 때문에 Google Colab에서 실습하는 것을 추천드립니다.pip install pyspark
SparkSession
을 먼저 생성해주어야 한다.from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F
spark = SparkSession.builder \
.master("local") \
.appName("velog") \
.getOrCreate()
df = spark.read.option('header', 'true').csv([CSV 파일 경로])
df = spark.read.csv([CSV 파일 경로], header=True)
df = spark.read.parquet([PARQUET 파일 경로])
df.show(Row 수) # Row 수가 주어지지 않았을 경우 기본 20개의 row를 출력
df.show(truncate=False) # dataframe의 데이터가 길더라도 전체를 다 보여준다. Default는 True
df.show(vertical=True) # row에 대해 컬럼을 세로로 피벗해서 보여준다. Default는 False
df.printSchema()
df.dtypes
df.columns
# 0-1 사이로 사용자가 정한 fraction 비율을 바탕으로 샘플 데이터를 추출해서 보여준다.
df.sample(
withReplacement=False, # 복원 추출 여부
fraction=0.1, # 추출할 데이터의 비율
seed=42 # 랜덤 시드
).show()
df.first() # 첫번째 Row 만 Row 형식으로 출력
df.collect() # 컬럼의 Row들을 list 형식으로 출력
df.limit(n).show() # 컬럼의 첫 n개의 Row들을 list 형식으로 출력
first_row = df.first()
first_row.Name, first_row.age, first_row.Experience
df.select('Name', 'age').show()
df.select(['Name', 'age']).show()
df_drop = df.drop('Experience')
# withColumn('컬럼 이름', 조건)
df.withColumn('add_col', F.lit("")) # 데이터가 없는 'add_col' 컬럼을 추가한다.
# 컬럼 이름을 age -> new_age 로 변경
df.withColumnRenamed('age', 'new_age')
# 컬럼 이름 여러개 변환
df.withColumnRenamed('col1','colA').withColumnRenamed('col2','colB')....
# age 컬럼을 INT -> STRING 으로 타입 변경
type_ = T.StringType()
df.withColumn('age', F.col('age').cast(type_))
# 컬럼들의 스키마 결정
schema = T.StructType(
[
T.StructField('col_a', T.StringType(), True),
T.StructField('col_b', T.StringType(), True),
T.StructField('col_c', T.StringType(), True),
]
)
data = [
('apple', 'banana', 'tomato'),
('apple3', 'banana3', 'tomato3'),
('apple4', 'banana4', 'tomato4')
]
df_new = spark.createDataFrame(data, schema)
---
+------+-------+-------+
| col_a| col_b| col_c|
+------+-------+-------+
| apple| banana| tomato|
|apple3|banana3|tomato3|
|apple4|banana4|tomato4|
+------+-------+-------+
# 1
spark.read.option('mode', 'DROPMALFORMED').csv("파일 경로", header=True, inferSchema=True)
# PERMISSIVE
# FAILFAST
# 2
option_dict = {
"header" : "true",
"inferSchema" : "true",
"mode" : "DROPMALFORMED", # PERMISSIVE, FAILFAST
}
spark.read.options(**option_dict).csv("파일 경로")
# 일반적인 저장 방법
df.write.mode('overwrite').csv("경로/파일명.parquet")
# error
# ignore
# append
# 하나의 파일로 저장
df.coalesce(1).write.mode('overwrite').csv("경로/파일명.parquet")
# 모든 컬럼을 대상으로 결측치 있는 row 삭제
df.dropna()
# thresh : row의 값들을 보았을 때, 정상값이 thresh개 보다 적게 있는 row를 drop
df.dropna(thresh=2).show()
# 1
df.na.drop(how='all').show()
# any : null값이 하나라도 있으면 제거
# all : 모든 row가 null인 row만 제거
# 2 - 특정 컬럼에 대해서
df.na.drop(how='all', subset = ['age', 'Experience']).show()
# 1 - fillna()
df.fillna(value='컴퓨터', subset=['age', 'Experience'])
# 2 - na.fill()
df.na.drop(how='노트북', subset = ['Name', 'Salary'])
# 1 - 단일 필터
df.filter(F.col('codename') == '액세서리')
# 2 - 다중 필터1
df.filter(F.col('codename') == '액세서리').filter(F.col('payprice') < 50)
# 3 - 다중 필터2
# df.filter((조건1) & (조건2) | (조건3) .....)
df.filter((F.col('codename') == '액세서리') | (F.col('payprice') < 50))
# 4 - 필터 반대 조건(~)
df.filter(~(F.col('codename') != '액세서리'))
# 1 - distinct() : 모든 컬럼에 대해 중복 제거. 추가 옵션 X
df.distinct()
# 2 - dropDuplicates()/drop_duplicates() : 이름만 약간 다르지만 동일한 메소드
df.dropDuplicates(subset = ['codename', 'price']) # 중복 제거시 고려할 컬럼
# 1 - codename에 '헤어', '얼굴' 포함된 Row
df.filter(F.col('codename').isin(['헤어', '얼굴']))
# 2 - codename에 '헤어', '얼굴' 포함되지 않은 Row
df.filter(~F.col('codename').isin(['헤어', '얼굴']))
# isNull() -> Null값인 row 출력
df.filter(F.col('codename').isNull()).show()
# isNotNull() -> Null값이 아닌 row 출력
df.filter(F.col('codename').isNotNull()).show()
# '아바타'로 시작되는 데이터가 있는 Row 출력
df.filter(F.col('mascodename').like('아바타%'))
# payprice 컬럼의 데이터가 100이상, 200이하인 Row 출력
df.filter(F.col('payprice').between(100,200))
# codename 컬럼의 데이터 null이면 '-'로 치환하고, 아니라면 그대로
condition = F.when(F.col('codename').isNull(), '-').otherwise(F.col('codename'))
df.withColumn('new_codename', condition)
df.groupby(컬럼).agg(집계수식(컬럼).alias(별칭))
이러한 형식으로 작성이 가능하다.df_groupby = \
df.groupby('col_a') \
.agg(
F.sum(F.col('col_a')).alias('sum'), # 총 합
F.count(F.col('col_a')).alias('cnt'), # 전체 Row 개수
F.countDistinct(F.col('col_a')).alias('dcnt'), # 컬럼의 distinct한 값 개수
F.mean(F.col('col_a')).alias('mean'), # 평균값
F.avg(F.col('col_a')).alias('average'), # 평균값
F.stddev(F.col('col_a')).alias('setd'), # 표준편차
F.min(F.col('')).alias('min'), # 최솟값
F.max(F.col('')).alias('max'), # 최댓값
F.round(F.avg('col_a'), 2).alias('round2'), # 반올림
F.collect_list(F.col('col_b')).alias('cl'), # group by 후 특정 컬럼의 값들을 list로 묶어준다.(중복 포함)
F.collect_set(F.col('col_b')).alias('cs') # group by 후 특정 컬럼의 값들을 list로 묶어준다.(중복 제거)
)
df_groupby = \
df.groupby('col_a') \
.agg(
F.count(F.when('조건', '대상컬럼').otherwise('조건 불만족시 대상컬럼'))
)
# 1 - 기본 오름차순 정렬
df.orderBy('Salary')
df.orderBy(F.col('Salary').asc())
# 2 - 내림차순 정렬
df.orderBy('Salary', ascending=False)
df.orderBy(F.col('Salary').desc())
# 1 - 여러 컬럼 대상 정렬
df.orderBy('age', 'Salary')
# 2 - 여러 컬럼 + 정렬 순서 설정
df.orderBy(F.col('age'), F.col('Salary').desc())
df1.join(df2, join할 컬럼, join 종류)
의 형태로 코드가 구성된다.
# 1 - 이름이 다른 컬럼들 끼리의 Join
df_1.join(df_2, df_1.col_a == df_2.col_b, 'left')
# 2 - 이름이 동일한 컬럼들 끼리의 join
df_1.join(df_2, ['col_a'], 'left')
# 3 - 여러 컬럼들 끼리 join
df_1.join(df_2, (df_1.Name == df_2.Name) & (df_1.Salary == df_2.Salary))
# 4 - 이름이 동일한 여러 컬럼들 끼리 join
df_1.join(df_2, ['Name', 'Salary'], 'left')
# 1 - Union (테이블들의 컬럼이 반드시 동일)
df_1.union(df_2)
# 2 - UnionByName (테이블의 컬럼이 동일하지 않아도 됨)
df_1.unionByName(df_2, allowMissingColumns = True)
# 1 - zfill : 0으로 채운다.
df.select(
F.lpad(F.col('col_a'), 5, '0').alias('lpad_col_a'), # 5자리까지 0으로 채운다.
'col_b',
'col_c'
)
# 2 - trim : 좌우 공백 제거
df.withColumn(F.col('col_a'), F.trim(F.col('col_a')))
# 3 - regexp_replace : 값 변경
df.withColumn(F.col('col_a'), F.regexp_replace('col_a', 'Before', 'After'))
# 4 - 컬럼 순서 재정렬
df.select([원하는 컬럼 순서 나열])
# 6 - 컬럼 내용 합치기
df.withColumn('concat_ab', F.concat(F.col('col_a'), F.col('col_b')))
# 7 - datetime 컬럼으로 변환
df.withColumn(
'max_date',
F.to_date(F.max('proc_ymd'), "yyyyMMdd")
)
# 8 - 날짜 차이 계산
df.withColumn(
'date_diff',
F.datediff(
F.to_date(F.max('proc_ymd'), "yyyyMMdd"),
F.to_date(F.min('proc_ymd'), "yyyyMMdd")
)
)
# 9 - datetime 출력 형식 변환
df.withColumn('col_ymd', F.date_format(F.col('col_a'), 'yyyy-MM-dd'))
# 10 - Row에 대한 증가하는 고유 번호 부여
df.withColumn('id', F.monotonically_increasing_id())
https://sparkbyexamples.com/pyspark/pyspark-window-functions/
from pyspark.sql import window as W
# 1) window 변수 생성
w = \
W.Window \
.partitionBy("user") \ # 사용자들을 기준으로 그룹핑
.orderBy("price") # 그룹 내에서 정렬기준
# 2) 새로운 컬럼 생성
df = df.withColumn('price_rank', F.row_number().over(w))
apply()
와 비슷하다.returnType
의 기본값은 StringType()
으로, UDF의 결과물이 문자형이면 명시해주지 않아도 되지만, 문자형 이외의 타입으로 최종 결과물이 return 된다면 반드시 returnType
을 명시해주어야 한다.def double_string(val):
val = val * 2
return val
udf_a = F.udf(double_string, returnType = T.StringType())
df_udf = df.withColumn('double_col', udf_a(F.col('col_a')))
@udf(returnType = T.StringType())
def double_string(val):
val = val * 2
return val
df_udf = df.withColumn('double_col', double_string(F.col('col_a')))
change_cols = ['col_a', 'col_b', 'col_c', ...]
df.toDF(*change_cols)
# pyspark DataFrame --> Pandas DataFrame : toPandas()
pdf = df3.toPandas()
# pyspark DataFrame <-- Pandas DataFrame : spark.createDataFrame()
sdf = spark.createDataFrame(pdf)
describe()
: 숫자형 컬럼에 대한 기본적인 요약 통계량을 계산summary()
: 숫자형 컬럼에 대한 더 넓은 범위의 통계량을 계산# descirbe
df.describe().show()
df.describe("num1").show()
# summary
df.summary().show()
df.summary("count", "33%", "50%", "66%").show()
df.select("num1").summary("count", "33%", "50%", "66%").show()