Spark 완벽 가이드 ch6. 다양한 데이터 타입 다루기

Q·2023년 1월 11일
0

Spark 완벽 가이드

목록 보기
7/24
from pyspark.sql import functions as F
from pyspark.sql import types as T
path= '/FileStore/tables/2010_12_01.csv'
df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load(path)
df.printSchema()
root 
-- InvoiceNo: string (nullable = true) 
-- StockCode: string (nullable = true) 
-- Description: string (nullable = true) 
-- Quantity: integer (nullable = true) 
-- InvoiceDate: string (nullable = true) 
-- UnitPrice: double (nullable = true) 
-- CustomerID: double (nullable = true) 
-- Country: string (nullable = true)
df.createOrReplaceTempView
Out[5]: <bound method DataFrame.createOrReplaceTempView of DataFrame[InvoiceNo: string, StockCode: 
string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, 
CustomerID: double, Country: string]>
df.show(3)
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ 
InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country| 
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ 
536365| 85123A|WHITE HANGING HEA...| 6|2010-12-01 08:26:00| 2.55| 17850.0|United Kingdom| 
536365| 71053| WHITE METAL LANTERN| 6|2010-12-01 08:26:00| 3.39| 17850.0|United Kingdom| 
536365| 84406B|CREAM CUPID HEART...| 8|2010-12-01 08:26:00| 2.75| 17850.0|United Kingdom| 
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ 
only showing top 3 rows

스파크 데이터 타입으로 변환하기

  • lit함수를 적용하여 다른언어의 데이터 타입을 스파크 데이터 타입에 맞게 변환
df.select(F.lit(5), F.lit('five'), F.lit(5.0)) #int, string, double
Out[7]: DataFrame[5: int, five: string, 5.0: double]

불리언 데이터 타입 다루기

  • 불리언은 모든 필터링 작업의 기반
  • 불리언 구문은 and, or, true, false로 구성
  • 불리언 구문을 사용해 true 또는 false로 평가되는 논리 문법을 만듦
  • 불리언 식에는 일치 조건, 비교 연산 조건을 사용 가능
df.where(F.col('invoiceno') != 536365).select('invoiceno', 'description').show(5, False)
+---------+-----------------------------+ 
invoiceno|description | 
+---------+-----------------------------+ 
536366 |HAND WARMER UNION JACK | 
536366 |HAND WARMER RED POLKA DOT | 
536367 |ASSORTED COLOUR BIRD ORNAMENT| 
536367 |POPPY'S PLAYHOUSE BEDROOM | 
536367 |POPPY'S PLAYHOUSE KITCHEN | 
+---------+-----------------------------+ 
only showing top 5 rows
priceFilter = F.col('unitprice')>600
descripFilter = F.instr(df.Description, 'POSTAGE')>=1 #POSTAGE가 없으면 0 있으면 1부터(index가 1부터임)
df.where(df.StockCode.isin('DOT')).where(priceFilter|descripFilter).show() #isin메서드로 DOT값이 있는지 확인
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+ 
InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country| 
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+ 
536544| DOT|DOTCOM POSTAGE| 1|2010-12-01 14:32:00| 569.77| null|United Kingdom| 
536592| DOT|DOTCOM POSTAGE| 1|2010-12-01 17:06:00| 607.49| null|United Kingdom| 
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+

수치형 데이터 타입 다루기

  • pow: 거듭제곱
  • round: 반올림
  • bound: 내림
  • corr: 피어슨 상관계수
  • describe: 요약 통계 계산
    • count
    • mean
    • stddev
    • min
    • max
fabricatedQuantity = F.pow(F.col('Quantity') * F.col('UnitPrice'), 2)+5
df.select('customerid', fabricatedQuantity.alias('realQuantity')).show(2)
+----------+------------------+ 
customerid| realQuantity| 
+----------+------------------+ 
17850.0|239.08999999999997| 
17850.0| 418.7156| 
+----------+------------------+ 
only showing top 2 rows
df.select(F.round(F.lit("2.5")), F.bround(F.lit("2.5")))
Out[11]: DataFrame[round(2.5, 0): double, bround(2.5, 0): double]
df.select(F.corr('Quantity', 'UnitPrice')).show()
+-------------------------+ 
corr(Quantity, UnitPrice)| 
+-------------------------+ 
-0.04112314436835551| 
+-------------------------+
df.describe().show()
+-------+-----------------+------------------+--------------------+------------------+-------------------+------------------+------------------+--------------+ 
summary| InvoiceNo| StockCode| Description| Quantity| InvoiceDate| UnitPrice| CustomerID| Country| 
+-------+-----------------+------------------+--------------------+------------------+-------------------+------------------+------------------+--------------+ 
count| 3108| 3108| 3098| 3108| 3108| 3108| 1968| 3108| 
mean| 536516.684944841|27834.304044117645| null| 8.627413127413128| null| 4.151946589446603|15661.388719512195| null| 
stddev|72.89447869788873|17407.897548583845| null|26.371821677029203| null|15.638659854603892|1854.4496996893627| null| 
min| 536365| 10002| 4 PURPLE FLOCK D...| -24|2010-12-01 08:26:00| 0.0| 12431.0| Australia| 
max| C536548| POST|ZINC WILLIE WINKI...| 600|2010-12-01 17:35:00| 607.49| 18229.0|United Kingdom| 
+-------+-----------------+------------------+--------------------+------------------+-------------------+------------------+------------------+--------------+

StatFunctions 패키지

  • doc 링크
  • 다양한 통곗값을 계산할 때 사용하는 DataFrame 메서드
  • stat 속성을 사용해 접근할 수 있음
olName= 'UnitPrice'
quantileProbs = [0.5]
relError=0.05

#백분위수를 정확하게 계산하거나 근사치 계산
df.stat.approxQuantile(olName, quantileProbs, relError)
Out[14]: [2.51]
#피어슨 상관계수
df.stat.corr('Quantity', 'UnitPrice')
Out[15]: -0.04112314436835551
#crosstab: col1과 col2의 교차표(두 변수의 범주별 조합빈도수를 작성한 표)
display(df.stat.crosstab('StockCode', 'Quantity').limit(10))
StockCode_Quantity	-1	-10	-12	-2	-24	-3	-4	-5	-6	-7	1	10	100	11	12	120	128	13	14	144	15	16	17	18	19	192	2	20	200	21	216	22	23	24	25	252	27	28	288	3	30	32	33	34	36	384	4	40	432	47	48	480	5	50	56	6	60	600	64	7	70	72	8	80	9	96
22578	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	1	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0
21327	0	0	0	0	0	0	0	0	0	0	2	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0
22064	0	0	0	0	0	0	0	0	0	0	1	0	0	0	1	0	0	0	0	0	0	0	0	0	0	0	1	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0
21080	0	0	0	0	0	0	0	0	0	0	1	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	1	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	1
22219	0	0	0	0	0	0	0	0	0	0	0	0	0	0	3	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	1	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0
21908	0	0	0	0	0	0	0	0	0	0	1	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0
22818	0	0	0	0	0	0	0	0	0	0	0	0	0	0	1	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	1	0	0	0	0	0	0	0	0	0	0
15056BL	0	0	0	0	0	0	0	0	0	0	0	0	0	0	1	0	0	0	0	0	0	0	0	0	0	0	1	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	1	0	0	0	0	0	0	0	0	0	0
72817	0	0	0	0	0	0	0	0	0	0	1	0	0	0	1	0	0	0	0	0	0	0	0	0	0	0	1	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0
22545	0	0	0	0	0	0	0	0	0	0	1	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0	0
#모든 로우에 고유 ID 추가
df.select(F.monotonically_increasing_id()).show(5)
+-----------------------------+ 
monotonically_increasing_id()| 
+-----------------------------+ 
0| 
1| 
2| 
3| 
4| 
+-----------------------------+ 
only showing top 5 rows

문자열 데이터 타입 다루기

  • 데이터 추출
  • 데이터 치환
  • 문자열 존재 여부
  • 대/소문자 변환 처리
  • etc.
display(df.select('Description').limit(10))
Description
WHITE HANGING HEART T-LIGHT HOLDER
WHITE METAL LANTERN
CREAM CUPID HEARTS COAT HANGER
KNITTED UNION FLAG HOT WATER BOTTLE
RED WOOLLY HOTTIE WHITE HEART.
SET 7 BABUSHKA NESTING BOXES
GLASS STAR FROSTED T-LIGHT HOLDER
HAND WARMER UNION JACK
HAND WARMER RED POLKA DOT
ASSORTED COLOUR BIRD ORNAMENT
#각 문자열을 공백 기준으로 나눴을 때, 모든 첫 글자를 대문자로 변경
display(df.select(F.initcap(F.col('Description'))).limit(10))
initcap(Description)
White Hanging Heart T-light Holder
White Metal Lantern
Cream Cupid Hearts Coat Hanger
Knitted Union Flag Hot Water Bottle
Red Woolly Hottie White Heart.
Set 7 Babushka Nesting Boxes
Glass Star Frosted T-light Holder
Hand Warmer Union Jack
Hand Warmer Red Polka Dot
Assorted Colour Bird Ornament
#소문자, 대문자로 변환
display(df.select(F.lower('Description'), F.upper('Description')).limit(10))
lower(Description)	upper(Description)
white hanging heart t-light holder	WHITE HANGING HEART T-LIGHT HOLDER
white metal lantern	WHITE METAL LANTERN
cream cupid hearts coat hanger	CREAM CUPID HEARTS COAT HANGER
knitted union flag hot water bottle	KNITTED UNION FLAG HOT WATER BOTTLE
red woolly hottie white heart.	RED WOOLLY HOTTIE WHITE HEART.
set 7 babushka nesting boxes	SET 7 BABUSHKA NESTING BOXES
glass star frosted t-light holder	GLASS STAR FROSTED T-LIGHT HOLDER
hand warmer union jack	HAND WARMER UNION JACK
hand warmer red polka dot	HAND WARMER RED POLKA DOT
assorted colour bird ornament	ASSORTED COLOUR BIRD ORNAMENT
#문자열 주변 공백 제거 및 추가
target= '     HELLO     '
display(df.select(F.trim(F.lit(target)), F.ltrim(F.lit(target)), F.rtrim(F.lit(target)), F.rpad(F.lit('HELLO'),10, '-'), F.lpad(F.lit('HELLO'),10,'-')).limit(5))
trim( HELLO )	ltrim( HELLO )	rtrim( HELLO )	rpad(HELLO, 10, -)	lpad(HELLO, 10, -)
HELLO	HELLO	HELLO	HELLO-----	-----HELLO
HELLO	HELLO	HELLO	HELLO-----	-----HELLO
HELLO	HELLO	HELLO	HELLO-----	-----HELLO
HELLO	HELLO	HELLO	HELLO-----	-----HELLO
HELLO	HELLO	HELLO	HELLO-----	-----HELLO

정규 표현식

#regex_string에서 정의한 색과 관련된 단어 모두 COLOR로 치환
regex_string="BLACK|WHITE|RED|GREEN|BLUE"
display(df.select(F.regexp_replace(F.col('description'), regex_string, 'COLOR').alias('color_clean'),F.col('description')).limit(10))
color_clean	description
COLOR HANGING HEART T-LIGHT HOLDER	WHITE HANGING HEART T-LIGHT HOLDER
COLOR METAL LANTERN	WHITE METAL LANTERN
CREAM CUPID HEARTS COAT HANGER	CREAM CUPID HEARTS COAT HANGER
KNITTED UNION FLAG HOT WATER BOTTLE	KNITTED UNION FLAG HOT WATER BOTTLE
COLOR WOOLLY HOTTIE COLOR HEART.	RED WOOLLY HOTTIE WHITE HEART.
SET 7 BABUSHKA NESTING BOXES	SET 7 BABUSHKA NESTING BOXES
GLASS STAR FROSTED T-LIGHT HOLDER	GLASS STAR FROSTED T-LIGHT HOLDER
HAND WARMER UNION JACK	HAND WARMER UNION JACK
HAND WARMER COLOR POLKA DOT	HAND WARMER RED POLKA DOT
ASSORTED COLOUR BIRD ORNAMENT	ASSORTED COLOUR BIRD ORNAMENT
#정규 표현식을 만들지 않고 간단하게 문자 단위로 치환(L=1, E=3, T=7)
display(df.select(F.translate(F.col('description'), 'LEET' , '1337'), F.col('description')).limit(5))
translate(description, LEET, 1337)	description
WHI73 HANGING H3AR7 7-1IGH7 HO1D3R	WHITE HANGING HEART T-LIGHT HOLDER
WHI73 M37A1 1AN73RN	WHITE METAL LANTERN
CR3AM CUPID H3AR7S COA7 HANG3R	CREAM CUPID HEARTS COAT HANGER
KNI773D UNION F1AG HO7 WA73R BO7713	KNITTED UNION FLAG HOT WATER BOTTLE
R3D WOO11Y HO77I3 WHI73 H3AR7.	RED WOOLLY HOTTIE WHITE HEART.
# 정규식에 매칭되는 그룹 추출 (인덱스 지정 -> 1로하면 정규식에 처음으로 매칭되는 문자열을 추출)
extract_str = "(BLACK|WHITE|RED|GREEN|BLUE)"
display(df.select(F.regexp_extract(F.col('description'), extract_str, 1).alias('color_clean'), F.col('description')).limit(5))
color_clean	description
WHITE	WHITE HANGING HEART T-LIGHT HOLDER
WHITE	WHITE METAL LANTERN
CREAM CUPID HEARTS COAT HANGER
KNITTED UNION FLAG HOT WATER BOTTLE
RED	RED WOOLLY HOTTIE WHITE HEART.
#값 추출 없이 단순히 값의 존재 여부를 확인하고 싶을 때
containsBlack = F.instr(F.col('description'), 'BLACK') >=1
containsWhite= F.instr(F.col('description'), 'WHITE') >=1

display(df.withColumn('hasSimpleColor', containsBlack | containsWhite)\
.where('hasSimpleColor').select('description').limit(5))
description
WHITE HANGING HEART T-LIGHT HOLDER
WHITE METAL LANTERN
RED WOOLLY HOTTIE WHITE HEART.
WHITE HANGING HEART T-LIGHT HOLDER
WHITE METAL LANTERN
  • 위에는 값을 두 개만 사용해서 간단하지만 값의 개수가 늘어난다면 복잡해질 것임
#값의 개수가 늘어났을 때 해결 방법
simpleColors = ['black', 'white','red', 'green', 'blue']
def color_locator(col, color_string):
  #주어진 문자열(substr)과 처음으로 일치하는 위치를 반환하는 locate함수 -> 일치하는 부분이 없으면 0을 반환
  return F.locate(color_string.upper(), col).cast(T.BooleanType()).alias('is_'+color_string)

selectedCol = [color_locator(df['description'], c) for c in simpleColors]
selectedCol.append(F.expr('*'))
selectedCol
Out[27]: [Column<'CAST(locate(BLACK, description, 1) AS BOOLEAN) AS `is_black`'>, 
Column<'CAST(locate(WHITE, description, 1) AS BOOLEAN) AS `is_white`'>, 
Column<'CAST(locate(RED, description, 1) AS BOOLEAN) AS `is_red`'>, 
Column<'CAST(locate(GREEN, description, 1) AS BOOLEAN) AS `is_green`'>, 
Column<'CAST(locate(BLUE, description, 1) AS BOOLEAN) AS `is_blue`'>, 
Column<'unresolvedstar()'>]
display(df.select(*selectedCol).where(F.expr('is_white or is_red')).select('description').limit(10))
description
WHITE HANGING HEART T-LIGHT HOLDER
WHITE METAL LANTERN
RED WOOLLY HOTTIE WHITE HEART.
HAND WARMER RED POLKA DOT
RED COAT RACK PARIS FASHION
ALARM CLOCK BAKELIKE RED
SET/2 RED RETROSPOT TEA TOWELS
RED TOADSTOOL LED NIGHT LIGHT
HAND WARMER RED POLKA DOT
WHITE HANGING HEART T-LIGHT HOLDER

날짜와 타임스탬프 데이터 타입 다루기

  • 스파크는 특정 날짜 포맷을 명시하지 않아도 자체적으로 식별해서 데이터를 읽을 수 있음
  • 스파크는 두 가지 종류의 시간 관련 정보만 집중적으로 관리함
    • 달력 형태의 날짜(date)
    • 날짜와 시간 정보를 모두 가지는 타임스탬프(timestamp)
  • TimeStampType클래스는 초 단위 정밀도까지만 지원
    • 밀리세컨드나 마이크로세컨드 단위를 다룬다면 Long 데이터 타입으로 데이터를 변환해서 처리
    • 그 이상의 정밀도는 TimeStampType으로 변환될 때 제거됨
    • 위 같은 문제를 피하려면 파싱이나 변환 작업이 필요
dateDF= spark.range(10).withColumn('today', F.current_date())\
.withColumn('now', F.current_timestamp())
dateDF.createOrReplaceTempView('dateTable')
dateDF.printSchema()
root 
-- id: long (nullable = false) 
-- today: date (nullable = false) 
-- now: timestamp (nullable = false)

문자열 -> 날짜 변환

  • to_date
  • to_timestamp
#to_date함수로 문자열 -> 날짜 변환
dateDF.withColumn('birthday', F.to_date(F.lit('1996-07-10'))).show(1)
+---+----------+--------------------+----------+ 
id| today| now| birthday| 
+---+----------+--------------------+----------+ 
0|2021-07-19|2021-07-19 21:49:...|1996-07-10| 
+---+----------+--------------------+----------+ 
only showing top 1 row
#만약 날짜를 파싱할 수 없으면 에러 대신 null값 반환
dateDF.select(F.to_date(F.lit('2021-20-12'))).show(1)
+-------------------+ 
to_date(2021-20-12)| 
+-------------------+ 
null| 
+-------------------+ 
only showing top 1 row

날짜 포맷 지정

#날짜 포맷을 년-일-월로 지정한다면 2021-20-12도 표현 가능
dateformat = 'yyyy-dd-MM' #년-일-월
dateDF.select(F.to_date(F.lit('2021-20-12'), dateformat)).show(1)
+-------------------------------+ 
to_date(2021-20-12, yyyy-dd-MM)| 
+-------------------------------+ 
2021-12-20| 
+-------------------------------+ 
only showing top 1 row
dateDF.select(F.to_timestamp(F.lit('2021-20-12'), dateformat)).show(1)
+------------------------------------+ 
to_timestamp(2021-20-12, yyyy-dd-MM)| 
+------------------------------------+ 
2021-12-20 00:00:00| 
+------------------------------------+ 
only showing top 1 row
  • to_date는 필요에 따라 날짜 포맷을 지정할 수 있지만,
  • to_timestamp는 반드시 날짜 포맷을 지정해야한다는 것이 차이점

날짜 연산

  • date_sub
  • date_add
  • months_between
  • datediff
#오늘을 기준으로 5일 전후의 날짜 계산
dateDF.select(F.date_sub(F.col('today'),5), F.date_add(F.col('today'),5)).show(1)
+------------------+------------------+ 
date_sub(today, 5)|date_add(today, 5)| 
+------------------+------------------+ 
2021-07-14| 2021-07-24| 
+------------------+------------------+ 
only showing top 1 row
#두 날짜 사이의 개월 수 반환
dateDF.select(F.months_between(F.col('today'), F.lit('2020-07-18'))).show(1)

#to_date함수를 적용하지 않아도 자체적으로 식별
# dateDF.select(F.months_between(F.col('today'), F.to_date(F.lit('2020-07-18')))).show(1)
+---------------------------------------+ 
months_between(today, 2020-07-18, true)| 
+---------------------------------------+ 
12.03225806| 
+---------------------------------------+ 
only showing top 1 row
#두 날짜 사이의 일 수 반환
dateDF.select(F.datediff(F.col('today'), F.lit('2020-07-18'))).show(1)
+---------------------------+ 
datediff(today, 2020-07-18)| 
+---------------------------+ 
366| 
+---------------------------+ 
only showing top 1 row

NULL 값 다루기

  • 스파크에서는 빈 문자열이나 대체 값 대신 null값을 사용해야 최적화를 수행할 수 있음
    • 그래서 빈 데이터를 표현할 땐 항상 null값을 사용하는 것이 좋음
  • null값을 허용하지 않는 컬럼을 선언해도 강제성은 없음(해당 컬럼에 null값이 들어갈 수 있음)
    • nullable 속성은 스파크 SQL 옵티마이저가 해당 컬럼을 제어하는 동작을 단순하게 도울 뿐
  • DataFrame의 하위 패키지인 .na를 사용하는 것이 DataFrame에서 null값을 다루는 기본 방식
  • null값을 다루는 두 가지 방법
    1. 명시적으로 null값 제거
    2. 전역 또는 컬럼 단위로 null값을 특정 값으로 채워 넣기

coalesce

  • 인수로 지정한 여러 컬럼 중 null이 아닌 첫번 째 값 반환
df.select(F.col('description'), F.col('customerid'),\
          F.coalesce(F.col('description'), F.col('customerid'))).show(5)  
+--------------------+----------+---------------------------------+
description|customerid|coalesce(description, customerid)| 
+--------------------+----------+---------------------------------+ 
WHITE HANGING HEA...| 17850.0| WHITE HANGING HEA...| 
WHITE METAL LANTERN| 17850.0| WHITE METAL LANTERN| 
CREAM CUPID HEART...| 17850.0| CREAM CUPID HEART...| 
KNITTED UNION FLA...| 17850.0| KNITTED UNION FLA...| 
RED WOOLLY HOTTIE...| 17850.0| RED WOOLLY HOTTIE...| 
+--------------------+----------+---------------------------------+ 
only showing top 5 rows

drop

  • null값을 가진 로우를 제거하는 가장 간단한 함수
  • 함수의 인수
    • 'any': 로우의 컬럼값 중 하나라도 null값을 가지면 해당 로우 제거
    • 'all': 로우의 모든 컬럼 값이 null이거나 NaN이면 해당 로우 제거
#null값을 가진 모든 로우 제거
df.na.drop() #디폴트: how='any'  
Out[39]: DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, 
InvoiceDate: string, UnitPrice: double, CustomerID: double, Country: string]

fill

  • fill함수를 사용해 하나 이상의 컬럼을 특정 값으로 채울 수 있다.
df.na.fill('all', subset=['stockcode', 'invoiceno'])
Out[40]: DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, 
InvoiceDate: string, UnitPrice: double, CustomerID: double, Country: string]
#Map타입을 사용해서 다수의 컬럼에 적용할 수 있음
#key: 컬럼명 , value: null값을 대체할 값
fill_cols_vals ={'stockcode':5, 'description':'no val'}
df.na.fill(fill_cols_vals)
Out[41]: DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, 
InvoiceDate: string, UnitPrice: double, CustomerID: double, Country: string]

replace

  • 조건에 따라 다른 값으로 대체하기 위한 함수
  • 변경하고자 하는 값과 원래의 값의 데이터 타입이 같아야함
df.na.replace([''], ['unknown'],'description')
Out[42]: DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, 
InvoiceDate: string, UnitPrice: double, CustomerID: double, Country: string]

정렬하기

  • null값이 표시되는 기준을 지정할 수 있음
    • asc_nulls_first / last
    • desc_nulls_first / last
df.orderBy(F.asc_nulls_first('description')).show(5)
+---------+---------+-----------+--------+-------------------+---------+----------+--------------+ 
InvoiceNo|StockCode|Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country| 
+---------+---------+-----------+--------+-------------------+---------+----------+--------------+ 
536414| 22139| null| 56|2010-12-01 11:52:00| 0.0| null|United Kingdom| 
536550| 85044| null| 1|2010-12-01 14:34:00| 0.0| null|United Kingdom| 
536545| 21134| null| 1|2010-12-01 14:32:00| 0.0| null|United Kingdom| 
536546| 22145| null| 1|2010-12-01 14:33:00| 0.0| null|United Kingdom| 
536547| 37509| null| 1|2010-12-01 14:33:00| 0.0| null|United Kingdom| 
+---------+---------+-----------+--------+-------------------+---------+----------+--------------+ 
only showing top 5 rows
df.orderBy(F.asc_nulls_last('description')).show(5)
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ 
InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country| 
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ 
536522| 72800B| 4 PURPLE FLOCK D...| 2|2010-12-01 12:49:00| 2.55| 15012.0|United Kingdom| 
536409| 22900| SET 2 TEA TOWELS...| 1|2010-12-01 11:45:00| 2.95| 17908.0|United Kingdom| 
536412| 22900| SET 2 TEA TOWELS...| 2|2010-12-01 11:49:00| 2.95| 17920.0|United Kingdom| 
536412| 22900| SET 2 TEA TOWELS...| 2|2010-12-01 11:49:00| 2.95| 17920.0|United Kingdom| 
536415| 22900| SET 2 TEA TOWELS...| 3|2010-12-01 11:57:00| 2.95| 12838.0|United Kingdom| 
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+ 
only showing top 5 rows

복합 데이터 타입 다루기

  • 구조체(struct)
  • 배열(array)
  • 맵(map)

구조체

complexDF = df.select(F.struct("description", "invoiceno").alias('complex'))
complexDF.createOrReplaceTempView('complexDF')
complexDF.printSchema()
root 
-- complex: struct (nullable = false) 
|-- description: string (nullable = true) 
|-- invoiceno: string (nullable = true)
#조회 방법 1
complexDF.select("complex.description").show(2)
+--------------------+ 
description| 
+--------------------+ 
WHITE HANGING HEA...| 
WHITE METAL LANTERN| 
+--------------------+ 
only showing top 2 rows
#조회 방법 2
complexDF.select(F.col('complex').getField('description')).show(2)
+--------------------+ 
complex.description| 
+--------------------+ 
WHITE HANGING HEA...| 
WHITE METAL LANTERN| 
+--------------------+ 
only showing top 2 rows

배열

#배열 생성
df.select(F.split(F.col('description'),' ')).show(2)
+-------------------------+ 
split(description, , -1)| 
+-------------------------+ 
[WHITE, HANGING, ...| 
[WHITE, METAL, LA...| 
+-------------------------+ 
only showing top 2 rows
#특정 위치값 조회
df.select(F.split(F.col('description'),' ').alias('array_col')).selectExpr('array_col[0]').show(2)
+------------+ 
array_col[0]| 
+------------+ 
WHITE| WHITE| 
+------------+ 
only showing top 2 rows
tmp = df.select(F.split(F.col('description'),' ').alias('array_col'))

size

#배열의 길이 조회
tmp.select(F.size(F.col('array_col'))).show(2)
+---------------+ 
size(array_col)| 
+---------------+ 
5| 
3| 
+---------------+ 
only showing top 2 rows

array_contains

#배열에 특정 값(WHITE)이 존재하는지 확인
tmp.select(F.array_contains(F.col('array_col'), 'WHITE')).show(2)
+--------------------------------+ 
array_contains(array_col, WHITE)| 
+--------------------------------+ 
true| 
true| 
+--------------------------------+ 
only showing top 2 rows

explode

#배열의 모든 값을 로우로 변환
tmp.withColumn('exploded', F.explode(F.col('array_col'))).show(10)
+--------------------+--------+ 
array_col|
exploded| 
+--------------------+--------+ 
[WHITE, HANGING, ...| WHITE| 
[WHITE, HANGING, ...| HANGING| 
[WHITE, HANGING, ...| HEART| 
[WHITE, HANGING, ...| T-LIGHT| 
[WHITE, HANGING, ...| HOLDER| 
[WHITE, METAL, LA...| WHITE| 
[WHITE, METAL, LA...| METAL| 
[WHITE, METAL, LA...| LANTERN| 
[CREAM, CUPID, HE...| CREAM| 
[CREAM, CUPID, HE...| CUPID| 
+--------------------+--------+ 
only showing top 10 rows

#생성 (키-값 쌍)
tmp_map = df.select(F.create_map(F.col('description'), F.col('invoiceno')).alias('complex_map'))
tmp_map.show(2)
+--------------------+ 
complex_map| 
+--------------------+ 
{WHITE HANGING HE...| 
{WHITE METAL LANT...| 
+--------------------+ 
only showing top 2 rows
#조회 -> 해당 키가 없으면 NULL
tmp_map.selectExpr("complex_map['WHITE METAL LANTERN']").show(2)
+--------------------------------+ 
complex_map[WHITE METAL LANTERN]| 
+--------------------------------+ 
null| 
536365| 
+--------------------------------+ 
only showing top 2 rows

explode

#key, value 분해해서 컬럼으로 변환
tmp_map.selectExpr("explode(complex_map)").show(2)
+--------------------+------+ 
key| value| 
+--------------------+------+ 
WHITE HANGING HEA...|536365| 
WHITE METAL LANTERN|536365| 
+--------------------+------+ 
only showing top 2 rows

JSON 다루기

#json 컬럼 생성
jsonDF = spark.range(1).selectExpr("""'{"myJSONKey":{"myJSONValue":[1,2,3]}}' as jsonString""")
#조회: get_json_object, json_tuple
jsonDF.select(F.get_json_object(F.col('jsonString'), "$.myJSONKey.myJSONValue[1]").alias('column'),\
              F.json_tuple(F.col('jsonString'),'myJSONKey')).show(2)
+------+--------------------+ 
column| c0| 
+------+--------------------+ 
2|{"myJSONValue":[1...| 
+------+--------------------+

to_json

#to_json함수로 StructType -> json 변경
df.selectExpr("(invoiceno, description) as myStruct").select(F.to_json(F.col('myStruct'))).show(2)
+--------------------+ 
to_json(myStruct)| 
+--------------------+ 
{"invoiceno":"536...| 
{"invoiceno":"536...| 
+--------------------+ 
only showing top 2 rows

from_json

  • json문자열을 다시 객체로 변환
  • 파라미터로 반드시 스키마를 지정해야함
parseSchema = T.StructType((
  T.StructField("InvoiceNo", T.StringType(), True),
  T.StructField("Description", T.StringType(), True),
))
#필드명이랑 똑같이 대소문자 구분해야함 -> to_json하면 key값이 되기 때문
df.selectExpr("(InvoiceNo, Description) as myStruct")\
.select(F.to_json(F.col('myStruct')).alias('new_json'))\
.select(F.from_json(F.col("new_json"), parseSchema), F.col('new_json')).show(2)
+--------------------+--------------------+ 
from_json(new_json)| new_json| 
+--------------------+--------------------+ 
{536365, WHITE HA...|
{"InvoiceNo":"536...| 
{536365, WHITE ME...|
{"InvoiceNo":"536...| 
+--------------------+--------------------+ 
only showing top 2 rows

사용자 정의 함수(User Defined Function, UDF)

  • 스파크의 가장 강력한 기능 중 하나
  • 레코드별로 데이터를 처리하는 함수이므로 독특한 포맷이나 도메인에 특화된 언어를 사용하지 않음
  • 여러 언어로 개발할 수 있으므로 매우 강력함
    • 스칼라, 자바, 파이썬으로 udf 개발 가능
    • 근데 언어별로 성능에 영향을 미칠 수 있음

파이썬 UDF 처리 과정



- 과정 1. 함수 직렬화 후 워커에 전달 - 직렬화: 파이썬이 이해할 수 있는 포맷으로 모든 데이터를 변환 2. 스파크에서 파이썬 프로세스 실행 후 데이터 전송 3. 파이썬에서 처리 결과 반환
  • 파이썬으로 udf개발 시 문제점
    • 파이썬으로 데이터를 전달하기 위해 직렬화하는 과정에서 부하 발생
    • 데이터가 파이썬으로 전달되면 스파크에서 워커 메모리를 관리할 수 없음
      • JVM과 파이썬이 동일한 머신에서 메모리 경합을 하면 자원에 제약이 생겨 워커가 비정상적으로 종료될 가능성이 있음
      • 그래서 자바나 스칼라로 UDF를 개발하는 것이 더 좋음
udf_df = spark.range(5).toDF('num')

1. 함수 정의

def power3(double_val):
  return double_val**3
power3(2.0)
Out[90]: 8.0

2. 사용자 정의 함수 등록

power3udf = F.udf(power3)

3. 적용

udf_df.select(power3udf(F.col('num'))).show()
+-----------+ 
power3(num)| 
+-----------+ 
0| 
1| 
8| 
27| 
64| 
+-----------+

SQL함수 등록

  • udf를 스파크 SQL 함수로 등록하면 모든 언어와 sql에서 해당 udf를 사용할 수 있음
    • 스칼라로 개발된 udf를 sql함수로 등록하면 이를 파이썬에서 우회적으로 사용 가능
  • 하지만 dataframe함수 대신 sql표현식으로 사용 가능
#sql함수로 등록
spark.udf.register('power3', power3, T.DoubleType())
Out[102]: <function __main__.power3(double_val)>
  • 스파크는 파이썬의 데이터 타입과는 다른 자체 데이터 타입을 사용하므로 함수를 정의할 때 반환 타입을 지정하는게 좋다.
    • 필수는 아님
udf_df.selectExpr('power3(num)').show()
+-----------+ 
power3(num)| 
+-----------+ 
null| 
null| 
null| 
null| 
null| 
+-----------+
  • 읭? null값이 나오는 이유
    • range메서드가 int 타입의 데이터를 만들었기 때문
    • 파이썬에서 int 타입을 사용해서 연산했다면 float 타입(spark에서는 double 타입과 동일)으로 변환할 수 없으므로 null 반환
#해결 방법: 반환 데이터 타입 변경 
spark.udf.register('power3_int', power3, T.IntegerType())
Out[106]: <function __main__.power3(double_val)>
udf_df.selectExpr('power3_int(num)').show()
+---------------+ 
power3_int(num)| 
+---------------+ 
0| 
1| 
8| 
27| 
64| 
+---------------+
profile
Data Engineer

0개의 댓글