데이터의 양이 정말 방대해지면 데이터 로드하고 통계치 하나 계산하는 데에만 몇 시간씩 이어진다고 함.. EDA하고 시각화하고 인사이트까지 뽑아야 하는데.. 시간이 너무 소요되는 것임
따라서, 대용량의 데이터를 빠르고 효율적으로 처리하는 방법이 필요해짐
과정별로 사용할 툴

분산 클러스터 컴퓨팅 오픈소스 프레임워크
시스템의 성능을 향상시키기 위해, 계산 부하량을 여러 노드에서 병렬 처리하도록 구성하는 방식

Cluster Manager :
Driver Process:
Executor:
즉, Spark의 병렬성은 Partition와 executor의 개수로 결정됨

Lazy Evaluation :
특정 연산 명령이 내려진 즉시 데이터를 수정하지 않고, 원시 데이터에 적용할 Transformation만 실행 (액션 전까지 전체 데이터 흐름을 최적화할 수 있다는 장점)
예를 들어서, 어떤 원시 데이터에 대해 A라는 계산과 B라는 계산과 C라는 계산을 해서, 결과물을 얻고자 함. 이때, A, B, C 계산을 순차적으로 모두 진행하는 것이 아니라, 전체적인 계산 과정을 최적화한 후에 결과물을 얻는다는 것임
Action :
카탈리스트의 논리 계획 4단계

Python 환경에서 Apache Spark를 사용할 수 있게 해주는 API

대용량 정형 데이터 처리를 위한 SQL 인터페이스를 지원함 (SQL 쿼리 사용 가능)
Pandas API를 지원하여 Pandas 문법 사용 가능함
DataFrame 형식으로 데이터를 표현함 (RDBMS의 table과 유사한 2차원 구조로, 사용하기 용이함)
분류, 회귀, 차원 축소 등 다양한 머신러닝 기능 제공함
Kaggle의 Iowa Liquor Sales (아이오와주 주류 판매 데이터)

아래 코드 참고해서,

!mkdir -p ~/.kaggle/
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download -d wethanielaw/iowa-liquor-sales-20230401
!unzip iowa-liquor-sales-20230401.zip
이 데이터에는 한 점포(store)당 여러 개의 매출 데이터가 존재하는데,
도시별 매출 평균을 집계할 때 점포를 무시하면 안 됨!!!
예를 들어서
한 점포에 여러 개의 매출 데이터가 있을때,
점포를 무시하고 도시별로만 평균을 집계하면
City1의 점포는 3개인데 데이터가 4개라서, 평균을 3이 아니라 4로 나누게 되고,
매출이 더 축소됨!
그래서 일단 점포별 합계(=점포별 총 매출)를 구한 후,
이 데이터에 대해서 평균을 구해야 함
메모장으로 직접 예시 들어봄

df = spark.read.csv(
path='Iowa_Liquor_Sales.csv', header=True, inferSchema=True
)


# 기본적으로 20개의 데이터 노출
df.show()
# 5개의 데이터 노출
df.show(5)
df.write.format('parquet').save(
path= 'data_parquet', # 해당 폴더에 파일을 분산하여 저장함. 폴더가 없는 경우 새로 생성함
header=True
)
df_parquet.show(5)
데이터 개수 세기
df_parquet.count()
데이터 스키마 확인하기
df_parquet.printSchema()
from pyspark.sql import functions as F
컬럼별 결측치 개수 세기
(
df_parquet
.select([
F.count(F.when(F.isnull(c), c)).alias(c) for c in df_parquet.columns
])
).show(5)
# alias = sql의 as와 같이 컬럼명을 지정해주는 역할. 여기선 컬럼명을 그대로 사용
# F.when(조건, 값) = 조건을 만족하면 값을 반환해라
# count 개수 세기
# 즉, columns의 각 컬럼 c에 대해서 null인 데이터의 개수를 셈
df_parquet.filter(F.col('StoreLocation').isNull()).show(5)
when
filter
df = df_parquet.drop('StoreLocation', 'CountyNumber')
지역에 대한 컬럼에 대해서만 null 개수 계산
region_cols = ['Address', 'City', 'ZipCode', 'County']
df.select(
[F.count(F.when(F.isnull(c), c)).alias(c) for c in region_cols]
).show()
결측치가 가장 적은 컬럼 : City !!!
City 컬럼이 null인 행을 제거하기
df = df.filter(F.col('City').isNotNull())
city로만 지역 정보를 표시하자. 즉, Address, ZipCode, County 컬럼은 삭제하기
df = df.drop('Address', 'ZipCode', 'County')
from pyspark.sql import Window as W
(
df
.withColumn(
'CategoryName_cnt',
F.size(
F.collect_set('CategoryName').over(W.partitionBy('Category'))
) # Category별로 고유한 CategoryName 개수를 세어서, CategoryName_cnt 컬럼에 넣어준다.
)
.filter(F.col('CategoryName_cnt') >= 2)
# CategoryName_cnt의 개수가 2 이상, 즉 한 Category에 여러 개의 CategoryName을 가진 경우만을 보여준다.
).show()

한 Category에 여러 개의 CategoryName을 가진 경우가 존재함.
따라서, Category를 기반으로 CategoryName을 채워주려면 특정 기준을 따라야 함.
여기선 Category별로 가장 최근 날짜이면서, null이 아닌 값 중 가장 첫 번째로 등장하는 CategoryName을 덮어씌워주기
df = (
df
.withColumn(
'CategoryName',
F.first(
F.col('CategoryName'), ignorenulls=True
).over(W.partitionBy(F.col('Category')).orderBy(F.col('Date').desc()))
)
)
date가 string으로 되어있어서, 형변환 필요
df = df.withColumn('Date', F.to_date(F.col('Date'), 'MM/dd/yyyy'))


Plotly 설치
%pip install plotly==5.11.0
pyspark dataframe을 pandas dataframe으로 변환
sample_pd = df.filter(F.col('City')=='WAUKEE').toPandas()

Hover Label 꾸미기 + 날짜 범위 슬라이더 추가하기
fig = px.line(WAUKEE_pd,
x='Date',
y='BottlesSold',
title='Bottles Sold amount at WAUKEE'
, text = 'Sale_Dollars') #아래 update_trace에서 text로 사용할 컬럼 지정!!!
fig.update_layout(
hoverlabel_bgcolor='white',
hoverlabel_font_size = 10,
hoverlabel_font_color = 'black',
hoverlabel_font_family = 'Rockwell'
)
# hover label의 내용 작성
# html 스타일
fig.update_traces(hovertemplate = '총 매출: %{text}달러 <br>'
'날짜: %{x} <br>'+
'판매량: %{y}개')
# 날짜 범위 슬라이더 생성
fig.update_layout(xaxis=dict(rangeslider_visible=True))
fig.show()

드롭다운 추가하기
import plotly.graph_objects as go
fig = go.Figure()
# 1번 그래프
fig.add_trace(go.Line(
name="BottlesSold",
x=WAUKEE_pd["Date"],
y=WAUKEE_pd["BottlesSold"]
))
# 2번 그래프
fig.add_trace(go.Line(
name="Sale_Dollars",
x=WAUKEE_pd["Date"],
y=WAUKEE_pd["Sale_Dollars"]
))
fig.update_layout(
updatemenus=[
dict(
type="dropdown",
direction="down",
buttons=list([
dict(label="Both",
method="update",
args=[{"visible": [True, True]},
{"title": "BottlesSold & SaleDollars"}]),
dict(label="BottlesSold",
method="update",
args=[{"visible": [True, False]},
{"title": "BottlesSold",}]),
dict(label="Sale_Dollars",
method="update",
args=[{"visible": [False, True]},
{"title": "Sale_Dollars",}]),
]),
),
]
)
fig.show()

연도별 총 매출 + 총 상점 수
pd1 = (
df
.withColumn('Year', F.year('Date').cast('long'))
.filter(F.col('Year') < 2023) # 2023년은 정보가 일부만 존재하므로 제외
.groupBy('Year')
.agg(
F.sum('Sale_Dollars').cast('long').alias('SaleDollars_sum'), # 총 매출
F.countDistinct(F.col('StoreNumber').cast('string')).alias('Store_cnt') # 총 상점 수
).orderBy('Year') # 연도순
).toPandas()
시각화
import plotly.express as px
fig = px.line(x=pd1['Year'], y=pd1['SaleDollars_sum'], title='매출 추이')
fig.show()
fig = px.line(x=pd1['Year'], y=pd1['Store_cnt'], title='상점 수 추이')
fig.show()
pd2 = (
df
.withColumn("Year", F.year("Date").cast("long")) #년/월/일에서 년도 정보만 추출
.filter(F.col("Year") < 2023) # 2023년은 정보가 일부만 있으므로 제외
.groupBy("Year") # 년도별로 group화
.agg(
F.sum("Sale_Dollars").cast("long").alias("SaleDollars_sum"), # SaleDollars 총합
F.countDistinct(F.col("StoreNumber").cast("string")).alias("Store_cnt") # StoreNumber 고유 갯수
)
.withColumn("SaleDollars_sum_bef", F.lag("SaleDollars_sum").over(W.orderBy("Year")))
# Year를 기준으로 이전 년도 SaleDollars_sum 값을 반환
# 성장률 계산하기
.withColumn(
"SaleDollars_growth_rate",
100*(F.col("SaleDollars_sum")-F.col("SaleDollars_sum_bef"))/F.col("SaleDollars_sum_bef")
)
).toPandas()
시각화
fig = px.line(x=pd2["Year"], y=pd2["SaleDollars_growth_rate"],title="매출 성장률 추이")
# 위 그래프와 동일
fig.show()
pd3 = (
df
.withColumn("Year", F.year("Date").cast("long")) #년/월/일에서 년도 정보만 추출
.filter(F.col("Year") < 2023) # 2023년은 정보가 일부만 있으므로 제외
.groupBy("Year", 'City') # 년도별 + 도시별 group화
.agg(
F.sum("Sale_Dollars").cast("long").alias("SaleDollars_sum"), # SaleDollars 총합
F.countDistinct(F.col("StoreNumber").cast("string")).alias("Store_cnt") # StoreNumber 고유 갯수
)
.withColumn("SaleDollars_sum_bef", F.lag("SaleDollars_sum").over(W.orderBy("Year")))
# Year를 기준으로 이전 년도 SaleDollars_sum 값을 반환
# 성장률 계산하기
.withColumn(
"SaleDollars_growth_rate",
100*(F.col("SaleDollars_sum")-F.col("SaleDollars_sum_bef"))/F.col("SaleDollars_sum_bef")
).orderBy('Year')
).toPandas()
시각화
fig = px.line(
pd3,
x = "Year", y = "SaleDollars_sum", color = 'City', hover_data = ['City'])
fig.update_layout(title_text = "지역 별 매출 추이")
fig.show()
fig = px.line(
pd3,
x = "Year", y = "SaleDollars_growth_rate", color = 'City', hover_data = ['City'])
fig.update_layout(title_text = "지역 별 매출 성장률 추이")
fig.show()
top10_city = (
pd3[pd3.Year == 2022]
.nlargest(n=10, columns='SaleDollars_sum')
)['City'].tolist()
# 2022년의 총 매출 상위 10개인 데이터의 'City' 열만 가져와서 list화
top_10_city_loc = pd3.City.isin(top10_city) # City가 top10 안에 있는 경우, true 반환
fig = px.line(
pd3.loc[top_10_city_loc],
x = "Year", y = "SaleDollars_growth_rate", color = 'City', hover_data = ['City'])
fig.update_layout(title_text = "지역 별 매출 성장률 추이")
fig.show()
pd4 = (
df
.withColumn("Year", F.year("Date").cast("long")) #년/월/일에서 년도 정보만 추출
.filter(F.col("Year") < 2023) # 2023년은 정보가 일부만 있으므로 제외
.groupBy("Year", "City", "StoreNumber") # 년도별, 지역별, 점포별로 group화
.agg(
F.sum("SaleDollars").cast("long").alias("SaleDollars_sum"), # SaleDollars 총합
)
# 연도별, 도시별, 점포별로 group화하여 구한 총 매출에 대해서,
# 연도와 도시별로만 그룹화하여 점포별 평균 매출을 구함
.groupBy("Year", "City")
.agg(
F.avg("SaleDollars_sum").alias("Store_SaleDollars_avg"), # 점포 별 매출 평균
F.sum("SaleDollars_sum").alias("SaleDollars_sum") # 총 점포 매출
)
.orderBy("Year") # 년도로 정렬
).toPandas()
시각화
fig = px.line(
pd4,
x = "Year", y = "Store_SaleDollars_avg", color = 'City', hover_data = ['City'])
fig.update_layout(title_text = "지역 별 점포 평균 매출 추이")
fig.show()
fig = px.scatter(
pd4[pd4.Year == 2022],
x = "SaleDollars_sum", y = "Store_SaleDollars_avg", color = 'City', hover_data = ['City'])
fig.update_layout(title_text = "지역 별 점포 평균 매출 & 총 매출 관계")
# 수평선 그리기
fig.add_hline(y= 400000,
line_color='red',)
# 수직선 그리기
fig.add_vline(x=3500000,
line_color='red')
fig.show()

일정 수준 매출이 보장되는 지역 + 가게의 수익이 좋은 상위 5개의 지역을 추천 예정
즉, 영업이익 = BottlesSold * (StateBottleRetail - StateBottleCost)
udf란?
# 영업 이익을 계산하는 udf 만들기
from pyspark.sql.functions import udf
from pyspark.sql import types as T
def calculate_gross_profit(unit_sales, unit_cost, sales_amt):
gross_profit = sales_amt * (unit_sales - unit_cost)
return gross_profit
calculate_gross_profit_udf = udf(
calculate_gross_profit, # udf로 만들 함수
T.DoubleType() # return type
)
(
df
.withColumn(
"gross_profit",
calculate_gross_profit_udf( # udf 적용
df.StateBottleRetail,
df.StateBottleCost,
df.BottlesSold
)
)
.select("StateBottleRetail", "StateBottleCost", "BottlesSold", "gross_profit")
).show()
pd1 = (
df
.withColumn("Year", F.year("Date").cast("long")) #년/월/일에서 년도 정보만 추출
.filter(F.col("Year") < 2023) # 2023년은 정보가 일부만 있으므로 제외
.withColumn(
"gross_profit",
calculate_gross_profit_udf( # udf 적용
df.StateBottleRetail,
df.StateBottleCost,
df.BottlesSold
)
)
.groupBy("Year", "City", "StoreNumber")
.agg(
F.sum("gross_profit").alias("gross_profit")
)
.groupBy("Year", "City")
.agg(
F.avg("gross_profit").alias("gross_profit_avg")
)
.orderBy("Year")
).toPandas()
시각화
import plotly.express as px
fig = px.line(
pd1,
x = "Year", y = "gross_profit_avg", color = 'City', hover_data = ['City'])
fig.update_layout(title_text = "지역 별 점포 영업이익 추이")
fig.show()
all_cities = pd1['City'].unique().tolist()
our_cities = ["MOUNT VERNON", "DEWITT", "WINDSOR HEIGHTS", "CORALVILLE", "DES MOINES"]
color_dict = {}
for city in all_cities:
if city in our_cities:
color_dict[city] = "red"
else:
color_dict[city] = "gray"
# 우리가 뽑은 5개의 지역만 다른 색으로 보기
fig = px.line(
pd1,
x = "Year", y = "gross_profit_avg", color = 'City',hover_data = ['City'],
color_discrete_map=color_dict # 색상 지정
)
fig.update_layout(title_text = "지역 별 점포 영업이익 추이")
fig.show()

pd2 = (
df
.withColumn("Year", F.year("Date").cast("long")) #년/월/일에서 년도 정보만 추출
.filter(F.col("Year") == 2022) # 2022년도를 기준으로
.withColumn(
"gross_profit",
calculate_gross_profit( # udf 적용
df.StateBottleRetail,
df.StateBottleCost,
df.BottlesSold
)
)
.groupBy("City", "StoreNumber")
.agg(
F.sum("SaleDollars").alias("SaleDollars"),
F.sum("gross_profit").alias("gross_profit")
)
.groupBy("City")
.agg(
F.avg("SaleDollars").alias("SaleDollars_avg"),
F.avg("gross_profit").alias("gross_profit_avg")
)
).toPandas()
시각화
fig = px.scatter(
pd2,
x = "SaleDollars_avg", y = "gross_profit_avg", color = 'City', hover_data = ['City'])
fig.update_layout(title_text = "지역 별 점포 평균 매출 & 평균 영업이익 관계")
fig.show()

🔵 흥미로웠던 점:
pyspark를 처음 배우지만, 사용되는 함수의 개념은 sql과 많이 비슷해서 강의를 통해 언어를 읽고 이해할 수 있었다. 다만 읽는 것과 쓰는 것은 다르기 때문에, 직접 응용을 통해 체득하는 게 중요할 것 같다.
🔵 다음 학습 계획:
MySQL로 지역별 주류 판매 데이터를 분석할 것입니다.