PySpark 환경을 설정하려면 아래의 명령어를 차례로 실행합니다:
!apt-get install openjdk-8-jdk-headless -y
!wget https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar -xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install findspark
!pip install kaggle --upgrade
import os
import findspark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"
findspark.init()
위 코드는 환경 변수에 경로를 설정한 뒤 findspark로 PySpark 환경을 초기화합니다.
아래는 SparkSession 객체를 생성하는 코드입니다:
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("pyspark_test")
.master("local[*]")
.getOrCreate()
)
local[*]은 로컬 머신의 모든 코어를 사용하도록 설정합니다.Kaggle 데이터를 다운로드하려면 아래 단계를 따르세요:
kaggle.json) 파일을 다운로드합니다.!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download -d ethanalex/iowa-liquor-sales-20230401
!unzip iowa-liquor-sales-20230401.zip
CSV 파일을 Spark DataFrame으로 불러옵니다:
df = spark.read.csv(
path="Iowa_Liquor_Sales.csv",
header=True,
inferSchema=True
)
또한 Pandas DataFrame으로 변환하려면 아래 코드를 실행합니다:
import pandas as pd
temp = pd.read_csv("Iowa_Liquor_Sales.csv")
아래 코드는 CSV와 Parquet 파일의 데이터 처리 시간을 비교합니다:
import time
%%time
df.count()
parquet_df = spark.read.parquet("data_parquet")
%%time
parquet_df.count()
df.write.format("parquet").save(
path="data_parquet",
header=True
)
아래 함수는 데이터 컬럼명에서 허용되지 않는 문자를 제거합니다:
def replace_word(col_name):
replace_words = {
" ": "_",
",": "",
"-": "_"
}
for k in replace_words:
col_name = col_name.replace(k, replace_words[k])
return col_name
for col in df.columns:
df = df.withColumnRenamed(col, replace_word(col))
import os
download_list = os.listdir("./data_parquet")
for file_name in download_list:
if file_name[-3:] != 'crc':
files.download("./data_parquet/" + file_name)