PySpark Dataframe

Ryan·2025년 1월 18일

SQL/Python 분석

목록 보기
86/94

PySpark를 활용한 결측치 처리 및 데이터 정리

1. 결측치 확인하기

PySpark에서 결측치를 확인하려면 아래의 코드를 사용할 수 있습니다:

from pyspark.sql import functions as F

df.select(
    [F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]
).show()

주요 함수 설명

  • select: 특정 컬럼만 선택하여 연산을 수행합니다.
  • count: 데이터프레임의 행 개수를 계산합니다.
  • when: 조건문을 작성하여 특정 조건을 만족하는 데이터를 반환합니다.
  • isnull: 해당 값이 null인 경우 True를 반환합니다.
  • alias: 연산 결과에 새롭게 이름을 지정합니다.

2. 사용하지 않는 컬럼 제거하기

결측치가 많은 컬럼을 제거하려면 아래 코드를 참고하세요:

# StoreLocation과 CountyNumber에서 null 값이 많을 경우
# StoreLocation이 없어도 Address와 City 정보는 존재합니다.
df = df.drop("StoreLocation", "CountyNumber")

주요 함수 설명

  • drop: 명시된 컬럼을 제외한 데이터프레임을 반환합니다.

3. 사용할 수 없는 행 제거하기

결측치가 포함된 행을 제거하려면 아래 코드를 활용합니다:

# CountyNumber가 null이고, County가 존재하지 않는 행 제거
df = df.filter(
    (F.col("CountyNumber").isNotNull()) |
    (F.col("County").isNotNull())
)

주요 함수 설명

  • filter: 조건에 따라 데이터를 필터링합니다.
  • col: 특정 컬럼을 참조합니다.
  • isNotNull: 해당 값이 null이 아닌 경우 True를 반환합니다.

4. 결측치를 다른 값으로 대체하기

결측치를 평균값, 중간값 등으로 채우는 방식은 아래와 같습니다:

# 특정 컬럼의 결측치를 평균값으로 채우기
mean_value = df.select(F.avg("ColumnName")).first()[0]
df = df.fillna({"ColumnName": mean_value})

주요 함수 설명

  • fillna: 결측치를 특정 값으로 대체합니다.
  • avg: 컬럼의 평균값을 계산합니다.
  • first: 첫 번째 값을 반환합니다.

5. 시간 데이터 처리하기

시간 데이터를 변환하려면 다음 코드를 사용합니다:

# Date 컬럼을 MM/dd/yyyy 형식으로 변환
df = df.withColumn("Date", F.to_date(F.col("Date"), 'MM/dd/yyyy'))

주요 함수 설명

  • withColumn: 기존 컬럼을 업데이트하거나 새로운 컬럼을 추가합니다.
  • to_date: 문자열을 날짜 형식으로 변환합니다.

6. 카테고리 데이터 정리하기

카테고리 데이터에서 특정 id에 해당하는 데이터를 필터링하거나 결측치를 확인할 수 있습니다:

# Category ID가 1022200인 경우, CategoryName이 null인지 확인
df.filter(
    (F.col("Category") == 1022200) &
    (F.col("CategoryName").isNull())
).show()

7. 데이터프레임 최적화

컬럼명을 정리하고 데이터를 Parquet 형식으로 저장하려면 아래의 코드를 사용합니다:

# 컬럼명 클리닝
def clean_column_name(col_name):
    return col_name.replace(" ", "_").replace(",", "").replace("-", "_")

for col in df.columns:
    df = df.withColumnRenamed(col, clean_column_name(col))

# Parquet 형식으로 저장
df.write.format("parquet").save("data_parquet")

정리

PySpark를 활용하면 대용량 데이터에서 결측치 처리, 컬럼 및 행 제거, 데이터 변환 등 다양한 작업을 효율적으로 수행할 수 있습니다.

0개의 댓글