13 Query Optimization
Spark는 코드를 실행하기 전에 Catalyst라는 최적화 엔진이 쿼리 계획을 자동으로 다듬어줌.
비유: 내가 요리 순서를 비효율적으로 짜도, 주방장(Catalyst)이 알아서 더 효율적인 순서로 바꿔서 실행하는 것
df.explain(True)
| 인자 | 설명 |
|---|---|
True | 논리적 계획 + 물리적 계획 모두 출력 |
False / 생략 | 물리적 계획만 출력 |
출력 순서:
== Parsed Logical Plan == ← 내가 쓴 코드 그대로
== Analyzed Logical Plan == ← 컬럼/타입 검증 후
== Optimized Logical Plan == ← Catalyst가 최적화한 버전 ← 여기가 핵심
== Physical Plan == ← 실제 실행될 계획
실무에서는 Optimized Logical Plan과 Physical Plan을 주로 봄.
"내가 짠 코드가 실제로 어떻게 실행되나?" 확인할 때 사용.
filter를 여러 번 체이닝해도 Catalyst가 하나로 합쳐서 실행함.
# 내가 쓴 코드 (filter 8번)
limit_events_df = (df
.filter(col("event_name") != "reviews")
.filter(col("event_name") != "checkout")
.filter(col("event_name") != "register")
...8개...
)
# Catalyst가 실제로 실행하는 것 (filter 1번으로 합쳐짐)
# → Optimized Plan에서 확인 가능
그래서 이렇게 써도 성능 차이 없음:
# 한 번에 쓴 버전 — 가독성은 더 좋음
better_df = (df
.filter(
(col("event_name") != "reviews") &
(col("event_name") != "checkout") &
(col("event_name") != "register") &
...
)
)
실무 팁: 성능은 같으니까 가독성 기준으로 선택하면 됨.
조건이 많으면 한 번에 쓰는 게 더 보기 좋음.
같은 조건을 실수로 여러 번 써도 Catalyst가 중복을 제거하고 한 번만 실행함.
# 이렇게 실수로 써도
stupid_df = (df
.filter(col("event_name") != "finalize")
.filter(col("event_name") != "finalize")
.filter(col("event_name") != "finalize")
.filter(col("event_name") != "finalize")
.filter(col("event_name") != "finalize")
)
# Catalyst가 filter 1번으로 줄여서 실행
비유: "문 잠갔어? 잠갔어? 잠갔어?" 라고 5번 물어봐도 실제로 문 확인은 한 번만 하는 것
조건자(Predicate) = filter 조건
푸시다운(Pushdown) = 가능한 한 데이터 소스에 가까운 쪽으로 밀어 내리는 것
일반적 실행 순서:
파일에서 전체 데이터 읽기 → 메모리에 올림 → filter 적용
푸시다운 후:
파일을 읽을 때 filter를 같이 적용 → 처음부터 필요한 데이터만 읽음
비유: 도서관에서 책 1000권 다 꺼낸 후 골라내기 vs. 처음부터 원하는 책만 꺼내기
데이터 소스가 푸시다운을 지원할 때 자동으로 작동:
| 데이터 소스 | 푸시다운 지원 |
|---|---|
| Parquet | ✅ |
| Delta Lake | ✅ |
| JDBC (DB 연결) | ✅ (DB가 직접 필터링) |
| CSV | ❌ (전체 읽은 후 필터링) |
| JSON | ❌ |
df.filter(col("event_name") == "purchase").explain(True)
Physical Plan에서 PushedFilters가 보이면 푸시다운 작동 중:
== Physical Plan ==
*(1) Filter (isnotnull(event_name) AND (event_name = purchase))
+- *(1) ColumnarToRow
+- FileScan parquet [...]
PushedFilters: [IsNotNull(event_name), EqualTo(event_name, purchase)] ← 이게 보이면 OK
UDF(사용자 정의 함수)를 filter에 쓰면 푸시다운 불가:
from pyspark.sql.functions import udf
@udf("boolean")
def is_purchase(name):
return name == "purchase"
# 이렇게 쓰면 푸시다운 안 됨 — Spark가 UDF 내부를 모르기 때문
df.filter(is_purchase(col("event_name")))
# 이렇게 써야 푸시다운 됨
df.filter(col("event_name") == "purchase")
실무 팁: filter 조건에는 가능하면 내장 함수를 쓰고, UDF는 최후의 수단으로.
# 1. 쿼리 계획 확인
df.filter(...).join(...).groupBy(...).explain(True)
# 체크 포인트:
# - Physical Plan에 PushedFilters 있는가? (푸시다운 작동 여부)
# - Filter가 Join보다 앞에 있는가? (작은 데이터로 join해야 빠름)
# - 불필요한 컬럼을 끝까지 들고 가고 있지 않은가?
# 느린 패턴: join 후 filter
df1.join(df2, "user_id").filter(col("event_name") == "purchase")
# 빠른 패턴: filter 후 join (Catalyst가 자동으로 해주기도 하지만 명시적으로 쓰는 게 안전)
df1.filter(col("event_name") == "purchase").join(df2, "user_id")
# 컬럼 20개짜리 DataFrame을 끝까지 들고 다니지 말고
df.select("user_id", "event_name", "event_timestamp") # 필요한 것만 먼저 추리기
.filter(...)
.join(...)
| 개념 | 설명 | 실무 포인트 |
|---|---|---|
| Catalyst | Spark 내장 쿼리 최적화 엔진 | 자동으로 작동, 의식할 필요 없음 |
explain(True) | 논리/물리 쿼리 계획 출력 | 느린 쿼리 디버깅 시 사용 |
| filter 병합 | 여러 filter → 자동으로 1개로 합침 | 가독성 기준으로 코드 작성해도 됨 |
| 중복 filter 제거 | 같은 조건 여러 번 → 자동으로 1번으로 줄임 | 복잡한 쿼리에서 실수해도 성능 영향 없음 |
| Predicate Pushdown | filter를 데이터 읽는 시점으로 밀어 내림 | Parquet/Delta에서 자동. UDF 쓰면 비활성화 |