from pyspark.sql import functions as F
from pyspark.sql import types as T
path= '/FileStore/tables/2010_12_01.csv'
df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load(path)
df.printSchema()
root
-- InvoiceNo: string (nullable = true)
-- StockCode: string (nullable = true)
-- Description: string (nullable = true)
-- Quantity: integer (nullable = true)
-- InvoiceDate: string (nullable = true)
-- UnitPrice: double (nullable = true)
-- CustomerID: double (nullable = true)
-- Country: string (nullable = true)
df.createOrReplaceTempView
Out[5]: <bound method DataFrame.createOrReplaceTempView of DataFrame[InvoiceNo: string, StockCode:
string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double,
CustomerID: double, Country: string]>
df.show(3)
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
536365| 85123A|WHITE HANGING HEA...| 6|2010-12-01 08:26:00| 2.55| 17850.0|United Kingdom|
536365| 71053| WHITE METAL LANTERN| 6|2010-12-01 08:26:00| 3.39| 17850.0|United Kingdom|
536365| 84406B|CREAM CUPID HEART...| 8|2010-12-01 08:26:00| 2.75| 17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 3 rows
스파크 데이터 타입으로 변환하기
- lit함수를 적용하여 다른언어의 데이터 타입을 스파크 데이터 타입에 맞게 변환
df.select(F.lit(5), F.lit('five'), F.lit(5.0))
Out[7]: DataFrame[5: int, five: string, 5.0: double]
불리언 데이터 타입 다루기
- 불리언은 모든 필터링 작업의 기반
- 불리언 구문은 and, or, true, false로 구성
- 불리언 구문을 사용해 true 또는 false로 평가되는 논리 문법을 만듦
- 불리언 식에는 일치 조건, 비교 연산 조건을 사용 가능
df.where(F.col('invoiceno') != 536365).select('invoiceno', 'description').show(5, False)
+---------+-----------------------------+
invoiceno|description |
+---------+-----------------------------+
536366 |HAND WARMER UNION JACK |
536366 |HAND WARMER RED POLKA DOT |
536367 |ASSORTED COLOUR BIRD ORNAMENT|
536367 |POPPY'S PLAYHOUSE BEDROOM |
536367 |POPPY'S PLAYHOUSE KITCHEN |
+---------+-----------------------------+
only showing top 5 rows
priceFilter = F.col('unitprice')>600
descripFilter = F.instr(df.Description, 'POSTAGE')>=1
df.where(df.StockCode.isin('DOT')).where(priceFilter|descripFilter).show()
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
536544| DOT|DOTCOM POSTAGE| 1|2010-12-01 14:32:00| 569.77| null|United Kingdom|
536592| DOT|DOTCOM POSTAGE| 1|2010-12-01 17:06:00| 607.49| null|United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
수치형 데이터 타입 다루기
- pow: 거듭제곱
- round: 반올림
- bound: 내림
- corr: 피어슨 상관계수
- describe: 요약 통계 계산
- count
- mean
- stddev
- min
- max
fabricatedQuantity = F.pow(F.col('Quantity') * F.col('UnitPrice'), 2)+5
df.select('customerid', fabricatedQuantity.alias('realQuantity')).show(2)
+----------+------------------+
customerid| realQuantity|
+----------+------------------+
17850.0|239.08999999999997|
17850.0| 418.7156|
+----------+------------------+
only showing top 2 rows
df.select(F.round(F.lit("2.5")), F.bround(F.lit("2.5")))
Out[11]: DataFrame[round(2.5, 0): double, bround(2.5, 0): double]
df.select(F.corr('Quantity', 'UnitPrice')).show()
+-------------------------+
corr(Quantity, UnitPrice)|
+-------------------------+
-0.04112314436835551|
+-------------------------+
df.describe().show()
+-------+-----------------+------------------+--------------------+------------------+-------------------+------------------+------------------+--------------+
summary| InvoiceNo| StockCode| Description| Quantity| InvoiceDate| UnitPrice| CustomerID| Country|
+-------+-----------------+------------------+--------------------+------------------+-------------------+------------------+------------------+--------------+
count| 3108| 3108| 3098| 3108| 3108| 3108| 1968| 3108|
mean| 536516.684944841|27834.304044117645| null| 8.627413127413128| null| 4.151946589446603|15661.388719512195| null|
stddev|72.89447869788873|17407.897548583845| null|26.371821677029203| null|15.638659854603892|1854.4496996893627| null|
min| 536365| 10002| 4 PURPLE FLOCK D...| -24|2010-12-01 08:26:00| 0.0| 12431.0| Australia|
max| C536548| POST|ZINC WILLIE WINKI...| 600|2010-12-01 17:35:00| 607.49| 18229.0|United Kingdom|
+-------+-----------------+------------------+--------------------+------------------+-------------------+------------------+------------------+--------------+
StatFunctions 패키지
- doc 링크
- 다양한 통곗값을 계산할 때 사용하는 DataFrame 메서드
- stat 속성을 사용해 접근할 수 있음
olName= 'UnitPrice'
quantileProbs = [0.5]
relError=0.05
df.stat.approxQuantile(olName, quantileProbs, relError)
Out[14]: [2.51]
df.stat.corr('Quantity', 'UnitPrice')
Out[15]: -0.04112314436835551
display(df.stat.crosstab('StockCode', 'Quantity').limit(10))
StockCode_Quantity -1 -10 -12 -2 -24 -3 -4 -5 -6 -7 1 10 100 11 12 120 128 13 14 144 15 16 17 18 19 192 2 20 200 21 216 22 23 24 25 252 27 28 288 3 30 32 33 34 36 384 4 40 432 47 48 480 5 50 56 6 60 600 64 7 70 72 8 80 9 96
22578 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
21327 0 0 0 0 0 0 0 0 0 0 2 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
22064 0 0 0 0 0 0 0 0 0 0 1 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
21080 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1
22219 0 0 0 0 0 0 0 0 0 0 0 0 0 0 3 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
21908 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
22818 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0
15056BL 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0
72817 0 0 0 0 0 0 0 0 0 0 1 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
22545 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
df.select(F.monotonically_increasing_id()).show(5)
+-----------------------------+
monotonically_increasing_id()|
+-----------------------------+
0|
1|
2|
3|
4|
+-----------------------------+
only showing top 5 rows
문자열 데이터 타입 다루기
- 데이터 추출
- 데이터 치환
- 문자열 존재 여부
- 대/소문자 변환 처리
- etc.
display(df.select('Description').limit(10))
Description
WHITE HANGING HEART T-LIGHT HOLDER
WHITE METAL LANTERN
CREAM CUPID HEARTS COAT HANGER
KNITTED UNION FLAG HOT WATER BOTTLE
RED WOOLLY HOTTIE WHITE HEART.
SET 7 BABUSHKA NESTING BOXES
GLASS STAR FROSTED T-LIGHT HOLDER
HAND WARMER UNION JACK
HAND WARMER RED POLKA DOT
ASSORTED COLOUR BIRD ORNAMENT
display(df.select(F.initcap(F.col('Description'))).limit(10))
initcap(Description)
White Hanging Heart T-light Holder
White Metal Lantern
Cream Cupid Hearts Coat Hanger
Knitted Union Flag Hot Water Bottle
Red Woolly Hottie White Heart.
Set 7 Babushka Nesting Boxes
Glass Star Frosted T-light Holder
Hand Warmer Union Jack
Hand Warmer Red Polka Dot
Assorted Colour Bird Ornament
display(df.select(F.lower('Description'), F.upper('Description')).limit(10))
lower(Description) upper(Description)
white hanging heart t-light holder WHITE HANGING HEART T-LIGHT HOLDER
white metal lantern WHITE METAL LANTERN
cream cupid hearts coat hanger CREAM CUPID HEARTS COAT HANGER
knitted union flag hot water bottle KNITTED UNION FLAG HOT WATER BOTTLE
red woolly hottie white heart. RED WOOLLY HOTTIE WHITE HEART.
set 7 babushka nesting boxes SET 7 BABUSHKA NESTING BOXES
glass star frosted t-light holder GLASS STAR FROSTED T-LIGHT HOLDER
hand warmer union jack HAND WARMER UNION JACK
hand warmer red polka dot HAND WARMER RED POLKA DOT
assorted colour bird ornament ASSORTED COLOUR BIRD ORNAMENT
target= ' HELLO '
display(df.select(F.trim(F.lit(target)), F.ltrim(F.lit(target)), F.rtrim(F.lit(target)), F.rpad(F.lit('HELLO'),10, '-'), F.lpad(F.lit('HELLO'),10,'-')).limit(5))
trim( HELLO ) ltrim( HELLO ) rtrim( HELLO ) rpad(HELLO, 10, -) lpad(HELLO, 10, -)
HELLO HELLO HELLO HELLO----- -----HELLO
HELLO HELLO HELLO HELLO----- -----HELLO
HELLO HELLO HELLO HELLO----- -----HELLO
HELLO HELLO HELLO HELLO----- -----HELLO
HELLO HELLO HELLO HELLO----- -----HELLO
정규 표현식
regex_string="BLACK|WHITE|RED|GREEN|BLUE"
display(df.select(F.regexp_replace(F.col('description'), regex_string, 'COLOR').alias('color_clean'),F.col('description')).limit(10))
color_clean description
COLOR HANGING HEART T-LIGHT HOLDER WHITE HANGING HEART T-LIGHT HOLDER
COLOR METAL LANTERN WHITE METAL LANTERN
CREAM CUPID HEARTS COAT HANGER CREAM CUPID HEARTS COAT HANGER
KNITTED UNION FLAG HOT WATER BOTTLE KNITTED UNION FLAG HOT WATER BOTTLE
COLOR WOOLLY HOTTIE COLOR HEART. RED WOOLLY HOTTIE WHITE HEART.
SET 7 BABUSHKA NESTING BOXES SET 7 BABUSHKA NESTING BOXES
GLASS STAR FROSTED T-LIGHT HOLDER GLASS STAR FROSTED T-LIGHT HOLDER
HAND WARMER UNION JACK HAND WARMER UNION JACK
HAND WARMER COLOR POLKA DOT HAND WARMER RED POLKA DOT
ASSORTED COLOUR BIRD ORNAMENT ASSORTED COLOUR BIRD ORNAMENT
display(df.select(F.translate(F.col('description'), 'LEET' , '1337'), F.col('description')).limit(5))
translate(description, LEET, 1337) description
WHI73 HANGING H3AR7 7-1IGH7 HO1D3R WHITE HANGING HEART T-LIGHT HOLDER
WHI73 M37A1 1AN73RN WHITE METAL LANTERN
CR3AM CUPID H3AR7S COA7 HANG3R CREAM CUPID HEARTS COAT HANGER
KNI773D UNION F1AG HO7 WA73R BO7713 KNITTED UNION FLAG HOT WATER BOTTLE
R3D WOO11Y HO77I3 WHI73 H3AR7. RED WOOLLY HOTTIE WHITE HEART.
extract_str = "(BLACK|WHITE|RED|GREEN|BLUE)"
display(df.select(F.regexp_extract(F.col('description'), extract_str, 1).alias('color_clean'), F.col('description')).limit(5))
color_clean description
WHITE WHITE HANGING HEART T-LIGHT HOLDER
WHITE WHITE METAL LANTERN
CREAM CUPID HEARTS COAT HANGER
KNITTED UNION FLAG HOT WATER BOTTLE
RED RED WOOLLY HOTTIE WHITE HEART.
containsBlack = F.instr(F.col('description'), 'BLACK') >=1
containsWhite= F.instr(F.col('description'), 'WHITE') >=1
display(df.withColumn('hasSimpleColor', containsBlack | containsWhite)\
.where('hasSimpleColor').select('description').limit(5))
description
WHITE HANGING HEART T-LIGHT HOLDER
WHITE METAL LANTERN
RED WOOLLY HOTTIE WHITE HEART.
WHITE HANGING HEART T-LIGHT HOLDER
WHITE METAL LANTERN
- 위에는 값을 두 개만 사용해서 간단하지만 값의 개수가 늘어난다면 복잡해질 것임
simpleColors = ['black', 'white','red', 'green', 'blue']
def color_locator(col, color_string):
return F.locate(color_string.upper(), col).cast(T.BooleanType()).alias('is_'+color_string)
selectedCol = [color_locator(df['description'], c) for c in simpleColors]
selectedCol.append(F.expr('*'))
selectedCol
Out[27]: [Column<'CAST(locate(BLACK, description, 1) AS BOOLEAN) AS `is_black`'>,
Column<'CAST(locate(WHITE, description, 1) AS BOOLEAN) AS `is_white`'>,
Column<'CAST(locate(RED, description, 1) AS BOOLEAN) AS `is_red`'>,
Column<'CAST(locate(GREEN, description, 1) AS BOOLEAN) AS `is_green`'>,
Column<'CAST(locate(BLUE, description, 1) AS BOOLEAN) AS `is_blue`'>,
Column<'unresolvedstar()'>]
display(df.select(*selectedCol).where(F.expr('is_white or is_red')).select('description').limit(10))
description
WHITE HANGING HEART T-LIGHT HOLDER
WHITE METAL LANTERN
RED WOOLLY HOTTIE WHITE HEART.
HAND WARMER RED POLKA DOT
RED COAT RACK PARIS FASHION
ALARM CLOCK BAKELIKE RED
SET/2 RED RETROSPOT TEA TOWELS
RED TOADSTOOL LED NIGHT LIGHT
HAND WARMER RED POLKA DOT
WHITE HANGING HEART T-LIGHT HOLDER
날짜와 타임스탬프 데이터 타입 다루기
- 스파크는 특정 날짜 포맷을 명시하지 않아도 자체적으로 식별해서 데이터를 읽을 수 있음
- 스파크는 두 가지 종류의 시간 관련 정보만 집중적으로 관리함
- 달력 형태의 날짜(date)
- 날짜와 시간 정보를 모두 가지는 타임스탬프(timestamp)
- TimeStampType클래스는 초 단위 정밀도까지만 지원
- 밀리세컨드나 마이크로세컨드 단위를 다룬다면 Long 데이터 타입으로 데이터를 변환해서 처리
- 그 이상의 정밀도는 TimeStampType으로 변환될 때 제거됨
- 위 같은 문제를 피하려면 파싱이나 변환 작업이 필요
dateDF= spark.range(10).withColumn('today', F.current_date())\
.withColumn('now', F.current_timestamp())
dateDF.createOrReplaceTempView('dateTable')
dateDF.printSchema()
root
-- id: long (nullable = false)
-- today: date (nullable = false)
-- now: timestamp (nullable = false)
문자열 -> 날짜 변환
dateDF.withColumn('birthday', F.to_date(F.lit('1996-07-10'))).show(1)
+---+----------+--------------------+----------+
id| today| now| birthday|
+---+----------+--------------------+----------+
0|2021-07-19|2021-07-19 21:49:...|1996-07-10|
+---+----------+--------------------+----------+
only showing top 1 row
dateDF.select(F.to_date(F.lit('2021-20-12'))).show(1)
+-------------------+
to_date(2021-20-12)|
+-------------------+
null|
+-------------------+
only showing top 1 row
날짜 포맷 지정
dateformat = 'yyyy-dd-MM'
dateDF.select(F.to_date(F.lit('2021-20-12'), dateformat)).show(1)
+-------------------------------+
to_date(2021-20-12, yyyy-dd-MM)|
+-------------------------------+
2021-12-20|
+-------------------------------+
only showing top 1 row
dateDF.select(F.to_timestamp(F.lit('2021-20-12'), dateformat)).show(1)
+------------------------------------+
to_timestamp(2021-20-12, yyyy-dd-MM)|
+------------------------------------+
2021-12-20 00:00:00|
+------------------------------------+
only showing top 1 row
- to_date는 필요에 따라 날짜 포맷을 지정할 수 있지만,
- to_timestamp는 반드시 날짜 포맷을 지정해야한다는 것이 차이점
날짜 연산
- date_sub
- date_add
- months_between
- datediff
dateDF.select(F.date_sub(F.col('today'),5), F.date_add(F.col('today'),5)).show(1)
+------------------+------------------+
date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
2021-07-14| 2021-07-24|
+------------------+------------------+
only showing top 1 row
dateDF.select(F.months_between(F.col('today'), F.lit('2020-07-18'))).show(1)
+---------------------------------------+
months_between(today, 2020-07-18, true)|
+---------------------------------------+
12.03225806|
+---------------------------------------+
only showing top 1 row
dateDF.select(F.datediff(F.col('today'), F.lit('2020-07-18'))).show(1)
+---------------------------+
datediff(today, 2020-07-18)|
+---------------------------+
366|
+---------------------------+
only showing top 1 row
NULL 값 다루기
- 스파크에서는 빈 문자열이나 대체 값 대신 null값을 사용해야 최적화를 수행할 수 있음
- 그래서 빈 데이터를 표현할 땐 항상 null값을 사용하는 것이 좋음
- null값을 허용하지 않는 컬럼을 선언해도 강제성은 없음(해당 컬럼에 null값이 들어갈 수 있음)
- nullable 속성은 스파크 SQL 옵티마이저가 해당 컬럼을 제어하는 동작을 단순하게 도울 뿐
- DataFrame의 하위 패키지인 .na를 사용하는 것이 DataFrame에서 null값을 다루는 기본 방식
- null값을 다루는 두 가지 방법
- 명시적으로 null값 제거
- 전역 또는 컬럼 단위로 null값을 특정 값으로 채워 넣기
coalesce
- 인수로 지정한 여러 컬럼 중 null이 아닌 첫번 째 값 반환
df.select(F.col('description'), F.col('customerid'),\
F.coalesce(F.col('description'), F.col('customerid'))).show(5)
+--------------------+----------+---------------------------------+
description|customerid|coalesce(description, customerid)|
+--------------------+----------+---------------------------------+
WHITE HANGING HEA...| 17850.0| WHITE HANGING HEA...|
WHITE METAL LANTERN| 17850.0| WHITE METAL LANTERN|
CREAM CUPID HEART...| 17850.0| CREAM CUPID HEART...|
KNITTED UNION FLA...| 17850.0| KNITTED UNION FLA...|
RED WOOLLY HOTTIE...| 17850.0| RED WOOLLY HOTTIE...|
+--------------------+----------+---------------------------------+
only showing top 5 rows
drop
- null값을 가진 로우를 제거하는 가장 간단한 함수
- 함수의 인수
- 'any': 로우의 컬럼값 중 하나라도 null값을 가지면 해당 로우 제거
- 'all': 로우의 모든 컬럼 값이 null이거나 NaN이면 해당 로우 제거
df.na.drop()
Out[39]: DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int,
InvoiceDate: string, UnitPrice: double, CustomerID: double, Country: string]
fill
- fill함수를 사용해 하나 이상의 컬럼을 특정 값으로 채울 수 있다.
df.na.fill('all', subset=['stockcode', 'invoiceno'])
Out[40]: DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int,
InvoiceDate: string, UnitPrice: double, CustomerID: double, Country: string]
fill_cols_vals ={'stockcode':5, 'description':'no val'}
df.na.fill(fill_cols_vals)
Out[41]: DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int,
InvoiceDate: string, UnitPrice: double, CustomerID: double, Country: string]
replace
- 조건에 따라 다른 값으로 대체하기 위한 함수
- 변경하고자 하는 값과 원래의 값의 데이터 타입이 같아야함
df.na.replace([''], ['unknown'],'description')
Out[42]: DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int,
InvoiceDate: string, UnitPrice: double, CustomerID: double, Country: string]
정렬하기
- null값이 표시되는 기준을 지정할 수 있음
- asc_nulls_first / last
- desc_nulls_first / last
df.orderBy(F.asc_nulls_first('description')).show(5)
+---------+---------+-----------+--------+-------------------+---------+----------+--------------+
InvoiceNo|StockCode|Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country|
+---------+---------+-----------+--------+-------------------+---------+----------+--------------+
536414| 22139| null| 56|2010-12-01 11:52:00| 0.0| null|United Kingdom|
536550| 85044| null| 1|2010-12-01 14:34:00| 0.0| null|United Kingdom|
536545| 21134| null| 1|2010-12-01 14:32:00| 0.0| null|United Kingdom|
536546| 22145| null| 1|2010-12-01 14:33:00| 0.0| null|United Kingdom|
536547| 37509| null| 1|2010-12-01 14:33:00| 0.0| null|United Kingdom|
+---------+---------+-----------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows
df.orderBy(F.asc_nulls_last('description')).show(5)
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
536522| 72800B| 4 PURPLE FLOCK D...| 2|2010-12-01 12:49:00| 2.55| 15012.0|United Kingdom|
536409| 22900| SET 2 TEA TOWELS...| 1|2010-12-01 11:45:00| 2.95| 17908.0|United Kingdom|
536412| 22900| SET 2 TEA TOWELS...| 2|2010-12-01 11:49:00| 2.95| 17920.0|United Kingdom|
536412| 22900| SET 2 TEA TOWELS...| 2|2010-12-01 11:49:00| 2.95| 17920.0|United Kingdom|
536415| 22900| SET 2 TEA TOWELS...| 3|2010-12-01 11:57:00| 2.95| 12838.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows
복합 데이터 타입 다루기
- 구조체(struct)
- 배열(array)
- 맵(map)
구조체
complexDF = df.select(F.struct("description", "invoiceno").alias('complex'))
complexDF.createOrReplaceTempView('complexDF')
complexDF.printSchema()
root
-- complex: struct (nullable = false)
|-- description: string (nullable = true)
|-- invoiceno: string (nullable = true)
complexDF.select("complex.description").show(2)
+--------------------+
description|
+--------------------+
WHITE HANGING HEA...|
WHITE METAL LANTERN|
+--------------------+
only showing top 2 rows
complexDF.select(F.col('complex').getField('description')).show(2)
+--------------------+
complex.description|
+--------------------+
WHITE HANGING HEA...|
WHITE METAL LANTERN|
+--------------------+
only showing top 2 rows
배열
df.select(F.split(F.col('description'),' ')).show(2)
+-------------------------+
split(description, , -1)|
+-------------------------+
[WHITE, HANGING, ...|
[WHITE, METAL, LA...|
+-------------------------+
only showing top 2 rows
df.select(F.split(F.col('description'),' ').alias('array_col')).selectExpr('array_col[0]').show(2)
+------------+
array_col[0]|
+------------+
WHITE| WHITE|
+------------+
only showing top 2 rows
tmp = df.select(F.split(F.col('description'),' ').alias('array_col'))
size
tmp.select(F.size(F.col('array_col'))).show(2)
+---------------+
size(array_col)|
+---------------+
5|
3|
+---------------+
only showing top 2 rows
array_contains
tmp.select(F.array_contains(F.col('array_col'), 'WHITE')).show(2)
+--------------------------------+
array_contains(array_col, WHITE)|
+--------------------------------+
true|
true|
+--------------------------------+
only showing top 2 rows
explode
tmp.withColumn('exploded', F.explode(F.col('array_col'))).show(10)
+--------------------+--------+
array_col|
exploded|
+--------------------+--------+
[WHITE, HANGING, ...| WHITE|
[WHITE, HANGING, ...| HANGING|
[WHITE, HANGING, ...| HEART|
[WHITE, HANGING, ...| T-LIGHT|
[WHITE, HANGING, ...| HOLDER|
[WHITE, METAL, LA...| WHITE|
[WHITE, METAL, LA...| METAL|
[WHITE, METAL, LA...| LANTERN|
[CREAM, CUPID, HE...| CREAM|
[CREAM, CUPID, HE...| CUPID|
+--------------------+--------+
only showing top 10 rows
맵
tmp_map = df.select(F.create_map(F.col('description'), F.col('invoiceno')).alias('complex_map'))
tmp_map.show(2)
+--------------------+
complex_map|
+--------------------+
{WHITE HANGING HE...|
{WHITE METAL LANT...|
+--------------------+
only showing top 2 rows
tmp_map.selectExpr("complex_map['WHITE METAL LANTERN']").show(2)
+--------------------------------+
complex_map[WHITE METAL LANTERN]|
+--------------------------------+
null|
536365|
+--------------------------------+
only showing top 2 rows
explode
tmp_map.selectExpr("explode(complex_map)").show(2)
+--------------------+------+
key| value|
+--------------------+------+
WHITE HANGING HEA...|536365|
WHITE METAL LANTERN|536365|
+--------------------+------+
only showing top 2 rows
JSON 다루기
jsonDF = spark.range(1).selectExpr("""'{"myJSONKey":{"myJSONValue":[1,2,3]}}' as jsonString""")
jsonDF.select(F.get_json_object(F.col('jsonString'), "$.myJSONKey.myJSONValue[1]").alias('column'),\
F.json_tuple(F.col('jsonString'),'myJSONKey')).show(2)
+------+--------------------+
column| c0|
+------+--------------------+
2|{"myJSONValue":[1...|
+------+--------------------+
to_json
df.selectExpr("(invoiceno, description) as myStruct").select(F.to_json(F.col('myStruct'))).show(2)
+--------------------+
to_json(myStruct)|
+--------------------+
{"invoiceno":"536...|
{"invoiceno":"536...|
+--------------------+
only showing top 2 rows
from_json
- json문자열을 다시 객체로 변환
- 파라미터로 반드시 스키마를 지정해야함
parseSchema = T.StructType((
T.StructField("InvoiceNo", T.StringType(), True),
T.StructField("Description", T.StringType(), True),
))
df.selectExpr("(InvoiceNo, Description) as myStruct")\
.select(F.to_json(F.col('myStruct')).alias('new_json'))\
.select(F.from_json(F.col("new_json"), parseSchema), F.col('new_json')).show(2)
+--------------------+--------------------+
from_json(new_json)| new_json|
+--------------------+--------------------+
{536365, WHITE HA...|
{"InvoiceNo":"536...|
{536365, WHITE ME...|
{"InvoiceNo":"536...|
+--------------------+--------------------+
only showing top 2 rows
사용자 정의 함수(User Defined Function, UDF)
- 스파크의 가장 강력한 기능 중 하나
- 레코드별로 데이터를 처리하는 함수이므로 독특한 포맷이나 도메인에 특화된 언어를 사용하지 않음
- 여러 언어로 개발할 수 있으므로 매우 강력함
- 스칼라, 자바, 파이썬으로 udf 개발 가능
- 근데 언어별로 성능에 영향을 미칠 수 있음
파이썬 UDF 처리 과정
- 과정
1. 함수 직렬화 후 워커에 전달
- 직렬화: 파이썬이 이해할 수 있는 포맷으로 모든 데이터를 변환
2. 스파크에서 파이썬 프로세스 실행 후 데이터 전송
3. 파이썬에서 처리 결과 반환
- 파이썬으로 udf개발 시 문제점
- 파이썬으로 데이터를 전달하기 위해 직렬화하는 과정에서 부하 발생
- 데이터가 파이썬으로 전달되면 스파크에서 워커 메모리를 관리할 수 없음
- JVM과 파이썬이 동일한 머신에서 메모리 경합을 하면 자원에 제약이 생겨 워커가 비정상적으로 종료될 가능성이 있음
- 그래서 자바나 스칼라로 UDF를 개발하는 것이 더 좋음
udf_df = spark.range(5).toDF('num')
1. 함수 정의
def power3(double_val):
return double_val**3
power3(2.0)
Out[90]: 8.0
2. 사용자 정의 함수 등록
power3udf = F.udf(power3)
3. 적용
udf_df.select(power3udf(F.col('num'))).show()
+-----------+
power3(num)|
+-----------+
0|
1|
8|
27|
64|
+-----------+
SQL함수 등록
- udf를 스파크 SQL 함수로 등록하면 모든 언어와 sql에서 해당 udf를 사용할 수 있음
- 스칼라로 개발된 udf를 sql함수로 등록하면 이를 파이썬에서 우회적으로 사용 가능
- 하지만 dataframe함수 대신 sql표현식으로 사용 가능
spark.udf.register('power3', power3, T.DoubleType())
Out[102]: <function __main__.power3(double_val)>
- 스파크는 파이썬의 데이터 타입과는 다른 자체 데이터 타입을 사용하므로 함수를 정의할 때 반환 타입을 지정하는게 좋다.
udf_df.selectExpr('power3(num)').show()
+-----------+
power3(num)|
+-----------+
null|
null|
null|
null|
null|
+-----------+
- 읭? null값이 나오는 이유
- range메서드가 int 타입의 데이터를 만들었기 때문
- 파이썬에서 int 타입을 사용해서 연산했다면 float 타입(spark에서는 double 타입과 동일)으로 변환할 수 없으므로 null 반환
spark.udf.register('power3_int', power3, T.IntegerType())
Out[106]: <function __main__.power3(double_val)>
udf_df.selectExpr('power3_int(num)').show()
+---------------+
power3_int(num)|
+---------------+
0|
1|
8|
27|
64|
+---------------+