PySpark 6 - Query Optimization

no-glass-otacku·7일 전

MS data school

목록 보기
25/25

13 Query Optimization


📌 핵심 개념: Catalyst 옵티마이저

Spark는 코드를 실행하기 전에 Catalyst라는 최적화 엔진이 쿼리 계획을 자동으로 다듬어줌.

비유: 내가 요리 순서를 비효율적으로 짜도, 주방장(Catalyst)이 알아서 더 효율적인 순서로 바꿔서 실행하는 것


1. explain() — 쿼리 계획 확인

df.explain(True)
인자설명
True논리적 계획 + 물리적 계획 모두 출력
False / 생략물리적 계획만 출력

출력 순서:

== Parsed Logical Plan ==      ← 내가 쓴 코드 그대로
== Analyzed Logical Plan ==    ← 컬럼/타입 검증 후
== Optimized Logical Plan ==   ← Catalyst가 최적화한 버전  ← 여기가 핵심
== Physical Plan ==            ← 실제 실행될 계획

실무에서는 Optimized Logical PlanPhysical Plan을 주로 봄.
"내가 짠 코드가 실제로 어떻게 실행되나?" 확인할 때 사용.


2. 논리적 최적화 (Logical Optimization)

2-1. filter 자동 병합

filter를 여러 번 체이닝해도 Catalyst가 하나로 합쳐서 실행함.

# 내가 쓴 코드 (filter 8번)
limit_events_df = (df
    .filter(col("event_name") != "reviews")
    .filter(col("event_name") != "checkout")
    .filter(col("event_name") != "register")
    ...8...
)

# Catalyst가 실제로 실행하는 것 (filter 1번으로 합쳐짐)
# → Optimized Plan에서 확인 가능

그래서 이렇게 써도 성능 차이 없음:

# 한 번에 쓴 버전 — 가독성은 더 좋음
better_df = (df
    .filter(
        (col("event_name") != "reviews") &
        (col("event_name") != "checkout") &
        (col("event_name") != "register") &
        ...
    )
)

실무 팁: 성능은 같으니까 가독성 기준으로 선택하면 됨.
조건이 많으면 한 번에 쓰는 게 더 보기 좋음.


2-2. 중복 filter 자동 제거

같은 조건을 실수로 여러 번 써도 Catalyst가 중복을 제거하고 한 번만 실행함.

# 이렇게 실수로 써도
stupid_df = (df
    .filter(col("event_name") != "finalize")
    .filter(col("event_name") != "finalize")
    .filter(col("event_name") != "finalize")
    .filter(col("event_name") != "finalize")
    .filter(col("event_name") != "finalize")
)

# Catalyst가 filter 1번으로 줄여서 실행

비유: "문 잠갔어? 잠갔어? 잠갔어?" 라고 5번 물어봐도 실제로 문 확인은 한 번만 하는 것


3. 조건자 푸시다운 (Predicate Pushdown)

조건자(Predicate) = filter 조건
푸시다운(Pushdown) = 가능한 한 데이터 소스에 가까운 쪽으로 밀어 내리는 것

개념

일반적 실행 순서:
파일에서 전체 데이터 읽기 → 메모리에 올림 → filter 적용

푸시다운 후:
파일을 읽을 때 filter를 같이 적용 → 처음부터 필요한 데이터만 읽음

비유: 도서관에서 책 1000권 다 꺼낸 후 골라내기 vs. 처음부터 원하는 책만 꺼내기

언제 작동하나?

데이터 소스가 푸시다운을 지원할 때 자동으로 작동:

데이터 소스푸시다운 지원
Parquet
Delta Lake
JDBC (DB 연결)✅ (DB가 직접 필터링)
CSV❌ (전체 읽은 후 필터링)
JSON

explain()으로 확인하는 법

df.filter(col("event_name") == "purchase").explain(True)

Physical Plan에서 PushedFilters가 보이면 푸시다운 작동 중:

== Physical Plan ==
*(1) Filter (isnotnull(event_name) AND (event_name = purchase))
+- *(1) ColumnarToRow
   +- FileScan parquet [...]
      PushedFilters: [IsNotNull(event_name), EqualTo(event_name, purchase)]  ← 이게 보이면 OK

푸시다운이 작동 안 하는 경우

UDF(사용자 정의 함수)를 filter에 쓰면 푸시다운 불가:

from pyspark.sql.functions import udf

@udf("boolean")
def is_purchase(name):
    return name == "purchase"

# 이렇게 쓰면 푸시다운 안 됨 — Spark가 UDF 내부를 모르기 때문
df.filter(is_purchase(col("event_name")))

# 이렇게 써야 푸시다운 됨
df.filter(col("event_name") == "purchase")

실무 팁: filter 조건에는 가능하면 내장 함수를 쓰고, UDF는 최후의 수단으로.


4. 실무에서 explain() 활용하는 패턴

쿼리가 느릴 때 체크리스트

# 1. 쿼리 계획 확인
df.filter(...).join(...).groupBy(...).explain(True)

# 체크 포인트:
# - Physical Plan에 PushedFilters 있는가? (푸시다운 작동 여부)
# - Filter가 Join보다 앞에 있는가? (작은 데이터로 join해야 빠름)
# - 불필요한 컬럼을 끝까지 들고 가고 있지 않은가?

filter는 최대한 앞에

# 느린 패턴: join 후 filter
df1.join(df2, "user_id").filter(col("event_name") == "purchase")

# 빠른 패턴: filter 후 join (Catalyst가 자동으로 해주기도 하지만 명시적으로 쓰는 게 안전)
df1.filter(col("event_name") == "purchase").join(df2, "user_id")

select로 필요한 컬럼만 미리 추리기

# 컬럼 20개짜리 DataFrame을 끝까지 들고 다니지 말고
df.select("user_id", "event_name", "event_timestamp")  # 필요한 것만 먼저 추리기
  .filter(...)
  .join(...)

5. 빠른 참조

개념설명실무 포인트
CatalystSpark 내장 쿼리 최적화 엔진자동으로 작동, 의식할 필요 없음
explain(True)논리/물리 쿼리 계획 출력느린 쿼리 디버깅 시 사용
filter 병합여러 filter → 자동으로 1개로 합침가독성 기준으로 코드 작성해도 됨
중복 filter 제거같은 조건 여러 번 → 자동으로 1번으로 줄임복잡한 쿼리에서 실수해도 성능 영향 없음
Predicate Pushdownfilter를 데이터 읽는 시점으로 밀어 내림Parquet/Delta에서 자동. UDF 쓰면 비활성화
profile
이제 개발해야지...

0개의 댓글