컬럼 선택, 추가, 수정, 행 필터링, 정렬까지 DataFrame을 다루는 핵심 문법 정리
DataFrame에서 컬럼을 가리킬 때 세 가지 방법이 있다. 결과는 모두 동일하다.
from pyspark.sql.functions import col
events_df.device # 점(.) 표기법
events_df["device"] # 딕셔너리 표기법
col("device") # col() 함수 (가장 많이 씀)
언제 col()을 써야 하냐면:
# 단순 컬럼 이름만 쓸 땐 문자열도 됨
events_df.select("user_id", "device")
# 연산, 메서드 체이닝, 중첩 구조 접근할 땐 col() 필요
col("ecommerce.purchase_revenue_in_usd") * 100 # 연산
col("event_timestamp").desc() # 메서드
col("geo.city").alias("city") # 중첩 구조
col()로 가져온 컬럼에 다양한 연산과 메서드를 붙일 수 있다.
# 수학 연산
col("price") * 100
col("quantity") + col("extra_quantity")
# 비교 연산 (Python에서 == 는 ===와 다름에 주의)
col("device") == "Android"
col("price") >= 200
# 메서드
col("event_timestamp").desc() # 내림차순 정렬용
col("price").cast("int") # 타입 변환
col("email").isNotNull() # NULL 아닌 것
col("email").isNull() # NULL인 것
col("device").isin("iOS", "Android") # 특정 값 포함 여부
col("ecommerce.purchase_revenue_in_usd") * 100 # 중첩 구조 접근
실제 사용 예시:
rev_df = (events_df
.filter(col("ecommerce.purchase_revenue_in_usd").isNotNull())
.withColumn("purchase_revenue",
(col("ecommerce.purchase_revenue_in_usd") * 100).cast("int"))
.withColumn("avg_purchase_revenue",
col("ecommerce.purchase_revenue_in_usd") / col("ecommerce.total_item_quantity"))
.sort(col("avg_purchase_revenue").desc())
)
display(rev_df)
# 문자열로 컬럼 이름만 지정
devices_df = events_df.select("user_id", "device")
# col()로 중첩 구조 접근 + alias로 이름 변경
from pyspark.sql.functions import col
locations_df = events_df.select(
"user_id",
col("geo.city").alias("city"), # geo 구조체 안의 city를 꺼내서 "city"로 이름 변경
col("geo.state").alias("state")
)
# SQL 문법을 문자열로 그대로 쓸 수 있음
apple_df = events_df.selectExpr(
"user_id",
"device in ('macOS', 'iOS') as apple_user" # SQL IN 절 → true/false 컬럼 생성
)
display(apple_df)
select()와 차이:
select() → col() 객체나 컬럼 이름 문자열 사용
selectExpr() → SQL 표현식 문자열 그대로 사용 (더 직관적인 경우 있음)
# 문자열로 여러 컬럼 한번에 제거
anonymous_df = events_df.drop("user_id", "geo", "device")
# col()로도 가능
no_sales_df = events_df.drop(col("ecommerce"))
# 새 컬럼 추가 (mobile 컬럼이 없으면 추가)
mobile_df = events_df.withColumn(
"mobile",
col("device").isin("iOS", "Android") # iOS나 Android면 true, 아니면 false
)
# 기존 컬럼 덮어쓰기 (같은 이름으로 지정하면 교체됨)
purchase_quantity_df = events_df.withColumn(
"purchase_quantity",
col("ecommerce.total_item_quantity").cast("int") # double → int로 타입 변환
)
purchase_quantity_df.printSchema()
withColumn 규칙:
같은 이름 지정 → 기존 컬럼 덮어씀
다른 이름 지정 → 새 컬럼 추가됨
# "geo" 컬럼 이름을 "location"으로 변경
location_df = events_df.withColumnRenamed("geo", "location")
filter()와 where()는 완전히 동일하다.
# 문자열 SQL 표현식으로 필터링
purchases_df = events_df.filter("ecommerce.total_item_quantity > 0")
# col()로 필터링
revenue_df = events_df.filter(col("ecommerce.purchase_revenue_in_usd").isNotNull())
# 여러 조건 동시에 (AND는 &, OR는 |, 각 조건은 괄호로 묶어야 함)
android_df = events_df.filter(
(col("traffic_source") != "direct") & (col("device") == "Android")
)
주의: Python에서 AND/OR 쓸 때
# 틀린 방법 (and, or 쓰면 에러)
.filter(col("a") == 1 and col("b") == 2) # ❌
# 맞는 방법 (&, | 사용 + 각 조건 괄호 필수)
.filter((col("a") == 1) & (col("b") == 2)) # ✅ AND
.filter((col("a") == 1) | (col("b") == 2)) # ✅ OR
# 모든 컬럼 기준으로 완전히 중복된 행 제거
display(events_df.distinct())
# 특정 컬럼 기준으로 중복 제거 (dropDuplicates만 가능)
distinct_users_df = events_df.dropDuplicates(["user_id"])
# → user_id가 같은 행이 여러 개면 첫 번째만 남김
distinct()와 dropDuplicates() 차이:
distinct() → 모든 컬럼 기준, 완전히 같은 행만 제거
dropDuplicates(["컬럼"]) → 지정한 컬럼 기준으로 중복 제거 (다른 컬럼 값은 달라도 됨)
limit_df = events_df.limit(100)
# 오름차순 (기본값)
increase_df = events_df.sort("event_timestamp")
# 내림차순 (.desc() 붙이기)
decrease_df = events_df.sort(col("event_timestamp").desc())
# 여러 컬럼 기준 정렬 (리스트로)
multi_df = events_df.orderBy(["user_first_touch_timestamp", "event_timestamp"])
# 여러 컬럼, 각각 다른 방향으로
mixed_df = events_df.sort(
col("user_first_touch_timestamp").desc(),
col("event_timestamp") # 이건 오름차순
)
| 메서드 | 용도 | 예시 |
|---|---|---|
select("a", "b") | 컬럼 선택 | .select("user_id", "device") |
select(col("a").alias("b")) | 컬럼 선택 + 이름 변경 | col("geo.city").alias("city") |
selectExpr("a in (...) as b") | SQL 표현식으로 선택 | "device in ('iOS') as apple" |
drop("a", "b") | 컬럼 제거 | .drop("user_id", "geo") |
withColumn("새이름", 식) | 컬럼 추가/교체 | .withColumn("mobile", col("device").isin(...)) |
withColumnRenamed("기존", "새이름") | 컬럼 이름 변경 | .withColumnRenamed("geo", "location") |
| 메서드 | 용도 | 예시 |
|---|---|---|
filter("조건") | 문자열 조건 필터링 | .filter("price > 0") |
filter(col("a").isNotNull()) | col 조건 필터링 | .filter(col("email").isNotNull()) |
filter((조건1) & (조건2)) | AND 조건 | (col("a")==1) & (col("b")==2) |
filter((조건1) \| (조건2)) | OR 조건 | (col("a")==1) \| (col("b")==2) |
distinct() | 전체 중복 제거 | .distinct() |
dropDuplicates(["컬럼"]) | 특정 컬럼 기준 중복 제거 | .dropDuplicates(["user_id"]) |
limit(n) | 상위 n개만 | .limit(100) |
sort("컬럼") | 오름차순 정렬 | .sort("price") |
sort(col("컬럼").desc()) | 내림차순 정렬 | .sort(col("price").desc()) |