# 아까 생성한 spark_session 객체인 spark를 이용해 데이터프레임 객체 생성하기
df = spark.read.csv(
path='Iowa_Liquor_Sales.csv',
header=True,
inferSchema=True
)
# 데이터프레임 객체가 가지고 있는 스키마 한눈에 보기 (컬럼명, 데이터 타입, 결측치 여부)
df.printSchema()
# 결과값:
# root
# |-- Invoice/Item Number: string (nullable = true)
# |-- Date: string (nullable = true)
# |-- Store Number: integer (nullable = true)
# |-- Vendor Name: string (nullable = true)
# |-- Item Number: string (nullable = true)
# |-- Item Description: string (nullable = true)
# |-- Pack: integer (nullable = true)
# |-- Bottle Volume (ml): integer (nullable = true)
# |-- State Bottle Cost: double (nullable = true)
# |-- State Bottle Retail: double (nullable = true)
# |-- Bottles Sold: integer (nullable = true)
# |-- Sale (Dollars): double (nullable = true)
# |-- Volume Sold (Liters): double (nullable = true)
# |-- Volume Sold (Gallons): double (nullable = true)
df.columns
# 결과값:
# ['Invoice/Item Number',
# 'Date',
# 'Store Number',
# 'Vendor Name',
# 'Item Number',
# 'Item Description',
# 'Pack',
# 'Bottle Volume (ml)',
# 'State Bottle Cost',
# 'State Bottle Retail',
# 'Bottles Sold',
# 'Sale (Dollars)',
# 'Volume Sold (Liters)',
# 'Volume Sold (Gallons)']
# parquet file 컬럼명으로 들어갈 수 없는 문자열 제거해주기
def replace_word(col_name):
replace_words = {
" ": "",
"(": "_",
")": ""
}
for word in replace_words:
col_name = col_name.replace(word, replace_words[word])
return col_name
for col in df.columns:
df = df.withColumnRenamed(col, replace_word(col))
df.columns
# 결과값:
# ['Invoice/ItemNumber',
# 'Date',
# 'StoreNumber',
# 'VendorName',
# 'ItemNumber',
# 'ItemDescription',
# 'Pack',
# 'BottleVolume_ml',
# 'StateBottleCost',
# 'StateBottleRetail',
# 'BottlesSold',
# 'Sale_Dollars',
# 'VolumeSold_Liters',
# 'VolumeSold_Gallons']
df.write.format("parquet").save(
path = "data_parquet",
header=True)
1) csv
예시:
Film,Genre,RottenTomatoesRatings,AudienceRatings,Budget(million),Yearofrelease
(500) Days of Summer,Comedy,87,81,8,2009
"10,000 B.C.",Adventure,9,44,105,2008
12 Rounds,Action,30,52,20,2009
127 Hours,Adventure,93,84,18,2010
17 Again,Comedy,55,70,20,2009
2012,Action,39,63,200,2009
27 Dresses,Comedy,40,71,30,2008
30 Days of Night,Horror,50,57,32,2007
30 Minutes or Less,Comedy,43,48,28,2011
2) json
예시:
{"데이터이름": 값}
{
"name": "Shara",
"age": 45,
"weight": 53.4
}
3) JDBC
연결 예시:
df = spark.read.jdbc(
"jdbc:mysql://localhost:3306/db_name",
"table",
properties={
"user": "root",
"password": "root",
"driver":"com.mysql.cj.jdbc.Driver"
}
)
Avro, Parquet, ORC
1) Avro
2) Parquet
3) ORC
예시: 이름, 타입, primary 키 여부를 설정
CREATE TABLE movies (
id INTEGER PRIMARY KEY,
title TEXT,
director TEXT,
year INTEGER,
length_minutes INTEGER
);