import numpy as np
tmp = sales_df[sales_df['price_each'].str.strip().replace('', np.nan).fillna('0').astype(float)]
tmp[tmp['price_each'] == 0]
→ KeyError 발생
Boolean Indexing을 사용할 때는 대괄호 [] 안에 들어가는 데이터의 형태가 중요하다.
1. 대괄호 [] 내부의 데이터 타입
> `sales_df[...]` 내부에는 True 또는 False로 이루어진 리스트가 들어가야 한다.
PySpark #pyspark
Apache Spark : 대규모 데이터를 빠르게 처리하도록 설계된 오픈 소스 분산 컴퓨팅 시스템
python workflow에서 병렬 계산으로 대용량 데이터셋을 효율적으로 처리하며, 배치 처리, 실시간 스트리밍, 머신러닝, 데이터 분석, SQL query에 적합하다.
pyspark dataframe은 다른 DF와 유사하지만 PySpark에 최적화되어 있다.
spark.read.csv(file_name, [column_names]).printSchema() : DataFrame의 구조 확인.count() : DataFrame 행 개수 세기.groupBy(), agg() → SQL 유사 집계filter() : SQL의 where처럼 동작select() : SQL의 selectpandas는 단일 compute instance에서 동작하는 반면, PySpark는 여러 인스턴스에 데이터를 분산해 처리 속도와 확장성을 확보한다.
결측치 처리
na.drop() : null 값이 있는 행 삭제.where(col('columnName').isNotNull()) : null 값이 아닌 행만 뽑기.na.fill({'column': value) : null → 특정 값으로 대체column 작업
.withColumn() : 계산/기존 컬럼 기반 새 컬럼 추가df = df.withColumn('age_plus_5', df['age'] + 5)withColumnRenamed() : column명 변경df = df.withCOlumnRenamed('age', 'years')drop() : 불필요한 컬럼 제거UNION 연산 : 구조가 같은 두 DataFrame을 위아래로 쌓아 하나로 만드는 도구이다.
df_union = df1.union(df2)Array와 Map
ArrayType(StringType(), False)MapType(StringType(), StringType()StructType & StructField
StructType : 제목과 데이터 타입의 묶음 → DataFrame 전체의 구조 : 여러 개의 StructField를 리스트 형태로 담고 있다.
StructField
Pyspark는 데이터의 type를 추론(Inference)할 수 있지만, 데이터의 양이 많으면 type을 알아내기 위해 데이터를 다 흝어봐야 하기 때문에 시간이 오래 걸린다. → schema를 미리 정해줌으로써 시간을 단축한다.
RDD(Resilient Distributed Dataset)
PySpark → 병렬화를 통한 대규모 데이터 처리를 수행하는 능력이 뛰어나다.
병렬화 : 데이터를 클러스터의 여러 노드로 나누어 데이터와 연산을 분산시킨다. Spark에서 정의된 연산은 자동으로 분산되어, 대규모 데이터셋을 효율적으로 처리할 수 있다.
작업은 워커 노드에 할당되어 병렬로 데이터를 처리하고, 마지막에 결과를 합친다.
RDD : 클러스터 전반에 분산된 데이터 컬렉션을 표현하는 Spark의 핵심 빌딩 블록
map(), filter()같은 연산으로 새로운 RDD를 만들 수 있다.collect() 같은 액션도 지원한다.DataFrame → SQL 실행
df.createOrReplaceTempviwe("people")