지역별 주류 판매 데이터 분석 - PySpark에서 MySQL 데이터 저장

dpwl·2024년 6월 16일
0

Data Analysis with SQL

목록 보기
109/120

1. PySpark에서 MySQL로 분석할 데이터 저장

# 상세하게 분석할 데이터만 뽑아내기
our_cities = ["MOUNT VERNON", "DEWITT", "WINDSOR HEIGHTS", "CORALVILLE", "DES MOINES"]

df = (
    df
    .filter(F.col("City").isin(our_cities)) # 상위 5개 지역만 필터링
    .filter(F.col("Year") == 2022)  # 최근 정보인 2022년도만 필터링
    )
df1 = (
    df
    .select(
        "Date", "City", "StoreNumber", "Category", "VendorNumber", "ItemNumber", # 분석에 필요한 column만 select
        "StateBottleCost", "StateBottleRetail", "BottlesSold", "SaleDollars"
        )
)
# 메타데이터로 저장
StoreNumber_meta = df.select("StoreNumber", "StoreName").distinct() # 중복값이 많기 때문에 중복 row 제외
Category_meta = df.select("Category", "CategoryName").distinct()
VendorNumber_meta = df.select("VendorNumber", "VendorName").distinct()
ItemNumber_meta = df.select("ItemNumber", "ItemDescription").distinct()
# 메타데이터로 저장한 StoreNumber_meta 확인
StoreNumber_meta.show()

# 결과값:
# +-----------+--------------------+
# |StoreNumber|           StoreName|
# +-----------+--------------------+
# |       9049|TYCOGA VINEYARD &...|
# |       4169|SUPER QUICK 2 / H...|
# |       2849|CVS PHARMACY #101...|
# +-----------+--------------------+
# csv 파일로 저장
df1.coalesce(1).write.option("header","true").format("csv").save("city_data_csv") # 분산되어 있는 데이터를 n개의 파티션으로 모아줌
StoreNumber_meta.coalesce(1).write.option("header","true").format("csv").save("StoreNumber_meta", mode='overwrite')
Category_meta.coalesce(1).write.option("header","true").format("csv").save("Category_meta", mode='overwrite')
VendorNumber_meta.coalesce(1).write.option("header","true").format("csv").save("VendorNumber_meta", mode='overwrite')
ItemNumber_meta.coalesce(1).write.option("header","true").format("csv").save("ItemNumber_meta", mode='overwrite')

PySpark의 연산 속도가 빠른 이유는 데이터들을 각각의 partition에 넣어서 분산 처리를 하기 때문이다.
PySpark를 통해 분산이 되어있는 데이터들을 csv 파일로 저장을 하게되면 분산이 되어있는 데이터마다 각각의 csv 파일로 저장이 된다. 이는 여려개의 csv 파일이 생성이 되는 의미이고, SQL에 해당 csv 파일들을 insert하는 작업이 번거롭게 된다.

따라서 다음과 같이 coalesce 함수를 사용하였다.

coalesce:

  • 분산되어 있던 데이터를 n개의 파티션으로 모아줌
  • 만약 coalesce를 사용하지 않고 저장한다면 여러개의 csv 파일로 저장될 것
  • n=1 설정으로 하나의 파일로 저장
df.coalesce(1).write.option("header","true").format("csv").save("파일_이름")
# 다운로드 받기
from google.colab import files

dir_names = ["city_data_csv", "ItemNumber_meta", "VendorNumber_meta", "StoreNumber_meta", "Category_meta"]

for dir_name in dir_names:
  download_list = os.listdir(f"./{dir_name}")
  for file_name in download_list:
    if file_name[-3:] == 'csv':
      files.download(f"./{dir_name}/" + file_name)
profile
거북선통통통통

0개의 댓글