대규모 주류 판매 데이터 분석 - PySpark로 데이터 읽어오기

dpwl·2024년 6월 14일
0

Data Analysis with SQL

목록 보기
101/120

1. PySpark로 데이터 읽어오기

# 아까 생성한 spark_session 객체인 spark를 이용해 데이터프레임 객체 생성하기
df = spark.read.csv(
    path='Iowa_Liquor_Sales.csv',
    header=True,
    inferSchema=True
    )
  • csv 포맷 말고도 JSON, Parquet, Avro, ORC, JDBC 등 다양한 파일 형식 읽기를 지원함
  • header: 열 이름이 데이터 내에 포함되어 있으면 True
  • inferschema: 스키마 자동 설정 (=True)
# 데이터프레임 객체가 가지고 있는 스키마 한눈에 보기 (컬럼명, 데이터 타입, 결측치 여부)
df.printSchema()

# 결과값:
# root
#  |-- Invoice/Item Number: string (nullable = true)
#  |-- Date: string (nullable = true)
#  |-- Store Number: integer (nullable = true)
#  |-- Vendor Name: string (nullable = true)
#  |-- Item Number: string (nullable = true)
#  |-- Item Description: string (nullable = true)
#  |-- Pack: integer (nullable = true)
#  |-- Bottle Volume (ml): integer (nullable = true)
#  |-- State Bottle Cost: double (nullable = true)
#  |-- State Bottle Retail: double (nullable = true)
#  |-- Bottles Sold: integer (nullable = true)
#  |-- Sale (Dollars): double (nullable = true)
#  |-- Volume Sold (Liters): double (nullable = true)
#  |-- Volume Sold (Gallons): double (nullable = true)
df.columns

# 결과값:
# ['Invoice/Item Number',
#  'Date',
#  'Store Number',
#  'Vendor Name',
#  'Item Number',
#  'Item Description',
#  'Pack',
#  'Bottle Volume (ml)',
#  'State Bottle Cost',
#  'State Bottle Retail',
#  'Bottles Sold',
#  'Sale (Dollars)',
#  'Volume Sold (Liters)',
#  'Volume Sold (Gallons)']
# parquet file 컬럼명으로 들어갈 수 없는 문자열 제거해주기
def replace_word(col_name):
  replace_words = {
      " ": "",
      "(": "_",
      ")": ""
  }
  for word in replace_words:
    col_name = col_name.replace(word, replace_words[word])
  return col_name
for col in df.columns:
  df = df.withColumnRenamed(col, replace_word(col))
df.columns

# 결과값:
# ['Invoice/ItemNumber',
#  'Date',
#  'StoreNumber',
#  'VendorName',
#  'ItemNumber',
#  'ItemDescription',
#  'Pack',
#  'BottleVolume_ml',
#  'StateBottleCost',
#  'StateBottleRetail',
#  'BottlesSold',
#  'Sale_Dollars',
#  'VolumeSold_Liters',
#  'VolumeSold_Gallons']
df.write.format("parquet").save(
    path = "data_parquet",
    header=True)

1.1 PySpark가 지원하는 다양한 파일 형식

1) csv

  • 단순한 값 목록
  • 각 항목은 쉼표 또는 다른 구분 문자로 나타냄
  • 첫 번째 헤더 필드 행은 선택

예시:

Film,Genre,RottenTomatoesRatings,AudienceRatings,Budget(million),Yearofrelease
(500) Days of Summer,Comedy,87,81,8,2009
"10,000 B.C.",Adventure,9,44,105,2008
12 Rounds,Action,30,52,20,2009
127 Hours,Adventure,93,84,18,2010
17 Again,Comedy,55,70,20,2009
2012,Action,39,63,200,2009
27 Dresses,Comedy,40,71,30,2008
30 Days of Night,Horror,50,57,32,2007
30 Minutes or Less,Comedy,43,48,28,2011

2) json

  • 사람이 읽을 수 있는 텍스트를 사용하여 데이터를 저장하고 전송
  • 데이터 공유를 위한 개방형 표준 파일 형식

예시:

{"데이터이름": 값}
{
    "name": "Shara",
    "age": 45,
    "weight": 53.4
}

3) JDBC

  • 자바에서 데이터베이스에 접속할 수 있도록 하는 자바 API
  • 즉, DB의 데이터를 불러와 데이터프레임으로 처리

연결 예시:

df = spark.read.jdbc(
    "jdbc:mysql://localhost:3306/db_name",
    "table",
    properties={
      "user": "root",
      "password": "root",
      "driver":"com.mysql.cj.jdbc.Driver"
      }
    )

1.2 빅데이터를 위한 파일 포맷

Avro, Parquet, ORC

  • 기계가 읽을수 있는 바이너리 포맷
  • 여러개의 디스크로 나뉘어질수 있으며 이 특징으로 인해 확장성과 동시처리가 가능

1) Avro

  • row (행) 기반으로 저장
  • 따라서, 모든 필드에 접근해야 할때 유용 (큰 분량을 쓰는데 유리)
  • 가장 compatible한 플랫폼: Kafka, Druid

2) Parquet

  • Column (열) 기반 저장
  • 따라서, 특정 필드 (열)에 자주 접근해야할 때 유용 (큰 분량을 읽어서 분석해야하는 환경에 최적화)
  • 가장 compatible한 플랫폼: Impala, Spark

3) ORC

  • Column (열) 기반 저장
  • 따라서, 특정 필드 (열)에 자주 접근해야할 때 유용 (큰 분량을 읽어서 분석해야하는 환경에 최적화)
  • 가장 compatible한 플랫폼: Hive, Presto

1.3 스키마란?

  • 데이터 베이스에서 데이터가 구성되는 방식과 서로 다른 엔티티(entities)간의 관계를 설명

예시: 이름, 타입, primary 키 여부를 설정

CREATE TABLE movies (
    id INTEGER PRIMARY KEY,
    title TEXT,
    director TEXT,
    year INTEGER,
    length_minutes INTEGER
);
profile
거북선통통통통

0개의 댓글