PySpark로 데이터 읽어오기

Ryan·2025년 1월 18일

SQL/Python 분석

목록 보기
85/94

PySpark 환경 설정 및 데이터 처리

1. PySpark 환경 세팅하기

PySpark 환경을 설정하려면 아래의 명령어를 차례로 실행합니다:

!apt-get install openjdk-8-jdk-headless -y
!wget https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar -xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install findspark
!pip install kaggle --upgrade

사용된 명령어 설명

  • apt-get: 시스템에서 패키지 설치, 검색, 업데이트 등을 수행합니다.
  • wget: 웹 상의 파일을 다운로드 받을 때 사용하는 명령어입니다.
  • tar: 여러 파일을 묶거나 압축을 풀 때 사용합니다.
  • pip: 파이썬에서 외부 라이브러리를 설치하거나 관리할 때 사용됩니다.

경로 설정

import os
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

findspark.init()

위 코드는 환경 변수에 경로를 설정한 뒤 findspark로 PySpark 환경을 초기화합니다.


2. PySpark 시작하기

아래는 SparkSession 객체를 생성하는 코드입니다:

from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("pyspark_test")
    .master("local[*]")
    .getOrCreate()
)
  • appName: 애플리케이션 이름 설정
  • master: 클러스터 설정. local[*]은 로컬 머신의 모든 코어를 사용하도록 설정합니다.

3. Kaggle 데이터 다운로드

Kaggle 데이터를 다운로드하려면 아래 단계를 따르세요:

  1. Kaggle 계정에서 API Token(kaggle.json) 파일을 다운로드합니다.
  2. 아래 명령어를 실행하여 다운로드한 데이터를 가져옵니다:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download -d ethanalex/iowa-liquor-sales-20230401
!unzip iowa-liquor-sales-20230401.zip

사용된 명령어 설명

  • mkdir: 디렉토리 생성 명령어
  • cp: 파일 복사
  • chmod: 파일 권한 변경
  • unzip: 압축 해제 명령어

4. Spark DataFrame으로 데이터 읽어오기

CSV 파일을 Spark DataFrame으로 불러옵니다:

df = spark.read.csv(
    path="Iowa_Liquor_Sales.csv",
    header=True,
    inferSchema=True
)
  • header: CSV 파일 첫 행을 컬럼 이름으로 사용 여부
  • inferSchema: 데이터 타입을 자동으로 추론 여부

또한 Pandas DataFrame으로 변환하려면 아래 코드를 실행합니다:

import pandas as pd

temp = pd.read_csv("Iowa_Liquor_Sales.csv")

5. 파일 포맷 비교 및 데이터 저장

CSV와 Parquet 성능 비교

아래 코드는 CSV와 Parquet 파일의 데이터 처리 시간을 비교합니다:

import time

%%time
df.count()

parquet_df = spark.read.parquet("data_parquet")
%%time
parquet_df.count()
  • count: 실제 데이터를 읽어오는 Action 연산
  • Spark는 Lazy 처리 방식으로 Transformation만 수행할 경우 실행되지 않습니다.

Parquet 파일 저장

df.write.format("parquet").save(
    path="data_parquet",
    header=True
)

6. 데이터 클리닝 및 최적화

컬럼명 클리닝

아래 함수는 데이터 컬럼명에서 허용되지 않는 문자를 제거합니다:

def replace_word(col_name):
    replace_words = {
        " ": "_",
        ",": "",
        "-": "_"
    }
    for k in replace_words:
        col_name = col_name.replace(k, replace_words[k])
    return col_name

for col in df.columns:
    df = df.withColumnRenamed(col, replace_word(col))

결과 다운로드

import os

download_list = os.listdir("./data_parquet")
for file_name in download_list:
    if file_name[-3:] != 'crc':
        files.download("./data_parquet/" + file_name)

0개의 댓글