Cloud상에서 데이터 분석을 할 때 가장 많은 시간을 요하는 작업은 두 가지라고 생각한다.
최종 시각화 이미지를 만들어내기 위해 어떠한 테이블들을 끌어와야 할지 원천 DataBase를 보며 결정할 때
이행해 온 테이블들을 어떤식으로 조립하여 붙여야 원하는 결과를 얻어낼 수 있을지 생각할 때
물론 데이터 성공적인 데이터 분석 프로젝트를 위해서는 중간에 많은 과정들과 수고들이 들어가지만 위의 두 가지 작업이 코드를 많이 치지는 않으면서 오랜 기간의 생각과 의논과 판단이 필요한 과정이라고 개인적으로는 생각한다.
특히 2번 작업을 할때는 거의 EMR에 세팅된 Pyspark 환경에서 작업을 하게되는데 파레토의 80:20 법칙같이 메인으로 쓰이는 메소드들을 주로 사용하게 되고 자주 사용하지 않는 메소드들은 아직까지도 손에 잘 익지 않아서 매 번 구글링을 하곤한다.
Pyspark로 테이블을 가공하며 종종 쓰이는 명령어들을 정리해보았다. 테이블은 parquet 형식으로 저장되어있으며 불러온 DataFrame의 이름은 'df'이며 아래와 같은 형상을 가지고 있다고 가정한다.
col_a | col_b | col_c |
---|---|---|
A | B | C |
... | ... | ... |
1) Read
df = spark.read.parquet('parquet file path')
2) Write
df.write.mode('overwrite').parquet('save file path')
- overwrite : 덮어쓰기
- append : 추가
- ignore : 파일 있으면 작업 안함
- 하나의 파일로 저장
df.coalesce(1).mode('overwrite').parquet('save file path')
3) Show
- df.show(Row수) : 기본적인 테이블 모습으로 보기
- df.show(n, vertical=True, Truncate=False) : n개 row에 대해 컬럼을 세로로 피벗해서 보여줌
- df.first() : 첫번째 Row 만 Row 형식으로 출력
- df.collect() : 컬럼의 Row들을 list 형식으로 출력
- df.limit(n).show() : 컬럼의 첫 n개의 Row들을 list 형식으로 출력
from pyspark.sql import SparkSession, Row
from pyspark.sql import types as T
from pyspark.sql import functions as F
1) 타입 확인
df.printSchema()
2) 타입 변경
_dtype = {
'col_a' : T.StringType(),
'col_b' : T.DoubleType(),
'col_c' : T.TimestampType()
}
for _col in df.columns:
df = df.withColumn(_col, F.col(_col).cast(_dtype[_col]))
3) DataFrame 생성
schema = T.StructType(
[
T.StructField("col_A", T.StringType(), True),
T.StructField("col_B", T.StringType(), True),
T.StructField("col_C", T.StringType(), True)
]
)
rows = [
Row(col1=, col2=, col3=),
Row(col1=, col2=, col3=),
Row(col1=, col2=, col3=),
...
]
df_new = spark.createDataFrame(rows, schema)
1) Filter
df.filter(F.col('col_a') == 'aa')
df.filter((조건 1) & (조건 2) & ...)
2) Drop Duplicates
df.dropDuplicates([중복제거 컬럼 list])
3) 포함 여부 확인 (isin)
df.filter(F.col('col_a').isin([....])) <--> df.filter(~F.col('col_a').isin([....]))
4) Null값 확인
df.filter(F.col('col_a').isNull()) <--> df.filter(F.col('col_a').isNotNull())
5) 비슷한 형태 확인 (like)
df.filter(F.col('col_a').like('문자 형태'))
6) When 조건문
> col_a의 데이터가 null이면 공백, 아니면 'O'로 채우는 조건
df.withColumn('when_a', F.when(F.col('col_a').isNull(), '').otherwise('O'))
1) 기본 agg 함수들
df_groupby = \
df.groupby('col_a') \
.agg(
F.sum(F.col('col_a')), # 총합
F.countDistinct(F.col('col_a')), # distinct한 개수만 세기
F.count(F.col('col_a')), # 전체 개수 세기
F.mean(F.col('col_a')), # 평균값
F.avg(F.col('col_a')), # 평균값
F.stddev(F.col('col_a')), # 표준편차
F.min(F.col('')).alias('min'), # 최솟값
F.max(F.col('')).alias('max'), # 최댓값
F.round(F.avg('col_a'), 2), # 반올림
F.collect_list(F.col('col_b')), # group by 후 특정 컬럼의 값들을 list로 묶어준다.(중복 O)
F.collect_set(F.col('col_b')) # group by 후 특정 컬럼의 값들을 list로 묶어준다.(중복 X)
)
# 각 함수의 뒤에 .alias('col_name') 을 붙여 컬럼 이름도 설정 가능.
2) agg + 조건(when-otherwise)
df_groupby = \
df.groupby('col_a') \
.agg(
F.count(F.when('조건', '대상컬럼').otherwise('조건 불만족시 대상컬럼'))
)
1) 기본
df.orderBy('col_a') < -- > df.orderBy('col_a', ascending=False)
2) 어려개
df.orderBy('col_a', 'col_b')
3) 여러개 + 정렬 순서 반대
df.orderBy(F.col('col_a'), F.col('col_b').desc())
1) zfill (채움)
df.select(
F.lpad(F.col('col_a'), 5, '0').alias('lpad_col_a'), # 5자리까지 0으로 채운다.
'col_b',
'col_c'
)
2) trim (좌우 공백 제거)
df = df.withColumn(F.col('col_a'), F.trim(F.col('col_a')))
3) Replace
df = df.withColumn(F.col('col_a'), F.regexp_replace('col_a', 'Before', 'After'))
4) 컬럼 순서 재정렬
df = df.select([원하는 컬럼 순서 나열])
5) 컬럼 이름 일괄 변환
1> df.withColumnRenamed('col1','colA').withColumnRenamed('col2','colB')....
2> col_list = [....]
df = df.toDF(*col_list)
6) 컬럼 내용 합치기
1> df.withColumn('concat_ab', F.concat(F.col('col_a'), F.col('col_b')))
2> df.withColumn('col_join', F.concat_ws("-", *[F.col(x) for x in df.columns if 'day' in x]))
- 컬럼명에 'day'가 들어있는 컬럼의 데이터들을 '-' 기준으로 합친다.
7) datetime 컬럼으로 변환 및 날짜 차이
1> df.withColumn(
'max_date',
F.to_date(F.max('col_a'), "yyyyMMdd")
)
2> df.withColumn(
'date_diff',
F.datediff(
F.to_date(F.max('proc_ymd'), "yyyyMMdd"),
F.to_date(F.min('proc_ymd'), "yyyyMMdd")
)
)
8) datetime 출력 형식 변환
df.withColumn('col_ymd', F.date_format(F.col('col_a'), 'yyyy-MM-dd'))
9) Row에 대한 증가하는 고유 번호 부여
df.withColumn('id', F.monotonically_increasing_id())
1) 이름이 다른 컬럼들 끼리의 Join
df_1.join(df_2, df_1.col_a == df_2.col_b, 'left')
2) 이름이 동일한 컬럼들 끼리의 join
df_1.join(df_2, ['col_a'], 'left')
3) 여러 컬럼들 끼리 join
df_1.join(df_2, () & () & (), 'left')
4) 이름이 동일한 여러 컬럼들 끼리 join
df_1.join(df_2, ['userid', 'idx'], 'left')
1) Union (테이블들의 컬럼이 반드시 동일)
df_1.union(df_2)
2) UnionByName (테이블의 컬럼이 동일하지 않아도 됨)
df_1.unionByName(df_2, allowMissingColumns = True)
from pyspark.sql import window as W
1) window 변수 생성
w = \
W.Window \
.partitionBy("user") \ # 사용자들을 기준으로 그룹핑
.orderBy("price") # 그룹 내에서 정렬기준
2) 새로운 컬럼 생성
df = df.withColumn('price_rank', F.row_number().over(w))
1) udf 변수 선언 방식
def double_string(val):
val = val * 2
return val
udf_a = F.udf(double_string)
df_udf = df.withColumn('double_col', udf_a(F.col('col_a')))
2) Decorator 사용 방식(변수 선언이 필요 없고 기존 함수명 그대로 사용 가능)
@udf(T.StringType())
def double_string(val):
val = val * 2
return val
df_udf = df.withColumn('double_col', double_string(F.col('col_a')))
1. coalesce와 repartition
두 함수 모두 df의 저장시 주로 사용되며 파티션의 개수 조정해주는 함수이다. 차이점에 대한 자세한 설명은 아래와 같다. (참고사이트)
두 메서드 모두 파티션의 크기를 나타내는 정수를 인자로 받아 파티션의 수를 조정한다는 점에서 공통점이 있지만 repartition()이 파티션 수를 늘리거나 줄이는 것을 모두 할 수 있는 반면 coalesce()는 줄이는 것만 가능하다.
repartition() 메서드로 파티션 변경 기능을 할 수 있음에도 coalesce() 메서드를 따로 두는 이유는 바로 처리 방식에 따른 성능 차이 때문이다.
repartition()은 셔플을 기반으로 동작을 수행하는 데 반해 coalesce()는 강제로 셔플을 수행하라는 옵션을 지정하지 않는 한 셔플을 사용하지 않기 때문이다. 따라서 데이터 필터링 등의 작업으로 데이터 수가 줄어들어 파티션의 수를 줄이고자 할 때는 상대적으로 성능이 좋은 coalesce()를 사용하고, 파티션 수를 늘여야 하는 경우에만 repartition() 메서드를 사용하는 것이 좋다.
df.repartition(5)
df2 = df.repartition("state")
df2 = df.repartition(5, "state")
df2 = df.repartition("state","department")
df.coalesce(2)
2. split, arrays_zip, approx_count_distinct
1. split(column, pattern, limit)
- pattern : split할 기준 문자열 패턴 ('-' > 하나의 패턴 / ['AB'] > A와 B를 기준으로 split)
- limit : split될 array의 길이 제한을 둘 수 있음 (-1 > 제한 없음)
df1 = df.withColumn('year', split(F.col('col_a'), '-').getItem(0)) \ # split 후 0번 index 가져옴
.withColumn('month', split(F.col('col_a'), '-').getItem(1)) \
.withColumn('day', split(F.col('col_a'), '-').getItem(2))
arrays_zip(col_a, col_b, col_c ....)
각 컬럼의 row에 동일한 길이의 list가 들어있을 때 이들을 zip 해주는 함수
df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
df = df.withColumn(arrays_zip(df.vals1, df.vals2).alias('zipped'))
df.show()
-RECORD 0-------------------------
vals1 | [1, 2, 3]
vals2 | [2, 3, 4]
sdf | [{1, 2}, {2, 3}, {3, 4}]
approx_count_distinct(col, rsd)
3. summary, describe
두 메서드 모두 dataframe을 이루는 컬럼들에 대한 산술적인 통계치를 보여준다.(count, mean, stddev, min, max 등)
summary가 조금 더 보여주는 정보들이 넓다
df.describe().show()
df.describe("num1").show()
df.summary().show()
df.summary("count", "33%", "50%", "66%").show()
df.select("num1").summary("count", "33%", "50%", "66%").show()
4. sample, take, limit
1. sample(fraction, seed)
- 0-1 사이로 사용자가 정한 fraction 비율을 바탕으로 샘플 데이터를 추출해서 보여준다.
5. persist, broadcast
1. persist -> 참고사이트
2. broadcast -> 참고사이트