PySpark에서 결측치를 확인하려면 아래의 코드를 사용할 수 있습니다:
from pyspark.sql import functions as F
df.select(
[F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]
).show()
null인 경우 True를 반환합니다.결측치가 많은 컬럼을 제거하려면 아래 코드를 참고하세요:
# StoreLocation과 CountyNumber에서 null 값이 많을 경우
# StoreLocation이 없어도 Address와 City 정보는 존재합니다.
df = df.drop("StoreLocation", "CountyNumber")
결측치가 포함된 행을 제거하려면 아래 코드를 활용합니다:
# CountyNumber가 null이고, County가 존재하지 않는 행 제거
df = df.filter(
(F.col("CountyNumber").isNotNull()) |
(F.col("County").isNotNull())
)
null이 아닌 경우 True를 반환합니다.결측치를 평균값, 중간값 등으로 채우는 방식은 아래와 같습니다:
# 특정 컬럼의 결측치를 평균값으로 채우기
mean_value = df.select(F.avg("ColumnName")).first()[0]
df = df.fillna({"ColumnName": mean_value})
시간 데이터를 변환하려면 다음 코드를 사용합니다:
# Date 컬럼을 MM/dd/yyyy 형식으로 변환
df = df.withColumn("Date", F.to_date(F.col("Date"), 'MM/dd/yyyy'))
카테고리 데이터에서 특정 id에 해당하는 데이터를 필터링하거나 결측치를 확인할 수 있습니다:
# Category ID가 1022200인 경우, CategoryName이 null인지 확인
df.filter(
(F.col("Category") == 1022200) &
(F.col("CategoryName").isNull())
).show()
컬럼명을 정리하고 데이터를 Parquet 형식으로 저장하려면 아래의 코드를 사용합니다:
# 컬럼명 클리닝
def clean_column_name(col_name):
return col_name.replace(" ", "_").replace(",", "").replace("-", "_")
for col in df.columns:
df = df.withColumnRenamed(col, clean_column_name(col))
# Parquet 형식으로 저장
df.write.format("parquet").save("data_parquet")
PySpark를 활용하면 대용량 데이터에서 결측치 처리, 컬럼 및 행 제거, 데이터 변환 등 다양한 작업을 효율적으로 수행할 수 있습니다.