Polars(2)

손호준·2023년 11월 15일

Lazy API

lazy API를 사용하면 각 쿼리를 한줄 씩 실행하지 않고 전체 쿼리를 엔드 투 엔드로 처리한다. Polars를 최대한 활용하기 위해 lazy API를 사용하는 것이 중요하다.

  • lazy API는 Polars에 자동 쿼리 최적화를 적용 하게한다.(with query optimizer)
  • lazy API는 스트리밍을 통해 메모리 데이터셋 보다 큰 작업을 가능하게 한다.
  • lazy API는 데이터 전처리 전에 스키마 에러를 잡을 수 있다.

이제 파일이나 DataFrame에 어떻게 lazy API를 사용하는지 살펴보자.

이상적인 경우에, query optimizer가 파일에서 읽는 데이터의 양을 줄이는데 도움이 되므로 바로 lazy API를 사용한다.
아래 예시에서는 csv 데이터로부터 lazy쿼리를 만들고, 변형한다. pl.scan_csv쿼리를 시작하면 lazy API를 사용한다.

#파이썬 예시
q1 = (
    pl.scan_csv(f"docs/data/reddit.csv")
    .with_columns(pl.col("name").str.to_uppercase())
    .filter(pl.col("comment_karma") > 0)
)

pl.scan_ 함수는 CSV, IPC, Parquet, JSON 같은 타입들에 사용가능하다.

이 쿼리에서는 csv파일을 읽고, name 열을 대문자로 변환하고, comment_karma열에 필터를 적용하는데..
이 시점에서 lazy 쿼리는 실행되지 않는다!

lazy API에 접근하는 또다른 방법은, 이미 메모리에 생성된 DataFrame.lazy를 호출하는 것이다.

q3 = pl.DataFrame({"foo": ["a", "b", "c"], "bar": [0, 1, 2]}).lazy()

.lazy를 호출함으로써 DataFramelazyFrame으로 변환한다.

최적화

lazy API를 쓴다면, Polars는 쿼리에 최적화를 실행할 것이다. 그 중 일부는 미리 실행되고, 일부는 구체화된 데이터가 들어오는 시점에 맞춰 결정된다.

최적화설명실행 횟수
Predicate pushdown가능한 한 빨리/스캔 수준에서 필터를 적용1회
Projection pushdown스캔 수준에서 필요한 열만 선택1회
Slice pushdown스캔 수준에서 필요한 슬라이스만 로드, sliced된 결과에 대해 구체화하지x (eg. join.head(10)1회
Common subplan elimination쿼리 계획의 여러 하위 트리에서 사용되는 캐시 하위 트리 및 파일 검색1회
Simplify expressions지속적인 폴딩, 값비싼 작업을 더 빠른 대안으로 교체하는 등의 다양한 최적화정해진 시점 까지
Join ordering메모리 부족을 줄이기 위해 먼저 실행해야 하는 join 분기를 예측1회
Type coercion필요한 최소 메모리에서 실행되도록 유형을 강제 변환정해진 시점 까지
Cardinality estimation전략적으로 최적의 그룹을 결정하기 위해 카디널리티 추정0/n회; 쿼리에따라 다름

스키마

DataFrame 이나 LazyFrame의 스키마는 그것들의 열 이름과 데이터 타입을 설정한다. .schema로 스키마를 볼 수 있다.

q3 = pl.DataFrame({"foo": ["a","b","c"], "bar": [0,1,2]}).lazy()
print(q3.schema)
OrderedDict([('foo', Utf8), ('bar', Int64)])

스키마는 lazy API에서 중요한 역할을 한다.

lazy API의 이점 중 하나는, 데이터가 전처리되기 전에 Polars가 스키마를 체크하게 한다. 이 체크는 lazy 쿼리가 실행될 때 발생한다.

pl.DataFrame({"foo": ["a", "b", "c"], "bar": [0, 1, 2]}).lazy().with_columns(
    pl.col("bar").round(0)
)

.round표현식은 부동소수점 dtype이 있는 열에서만 유효하다. .round를 정수형 열에서 호출하는 것은, 쿼리를 collect와 함께 evaluate할 때InvalidOperationError에러를 발생시킨다는 것을 의미한다. 이 스키마 체크는 collect를 호출하고 데이터가 전처리되기 전에 발생한다.

반면, 이 쿼리를 Eager 모드에서 실행하면 모든 이전 단계에서 데이터가 처리된 후에만 오류를 발견할 수 있다. lazy 쿼리를 실행하면 Polars는 파이프라인에서 실제로 데이터를 처리하는 시간이 많이 걸리는 단계 전에 모든 발생 가능한 InvalidOperationError 을 확인한다.

lazy API는 스키마를 반드시 알아야 한다.

lazy API에서 Polars query optimizer는 쿼리 계획의 모든 단계에서 스키마를 추론할 수 있어야 한다. 즉, 스키마를 미리 알 수 없는 작업은 lazy API와 함께 사용할 수 없다. 스키마를 미리 알 수 없는 대표적인 작업으로 .pivot이 있다. .pivot에서는 새로운 열 이름을 여러 열 중 하나의 데이터로부터 가져온다. 열 이름들을 미리 알 수 없으므로 .pivot은 lazy API에서 사용할 수 없다.

Lazy API에서 사용할 수 없는 작업 처리

파이프라인에 lazy API에서 사용할 수 없는 작업이 포함되어 있는 경우 일반적으로 다음을 수행하는 것이 가장 좋다.

  • 해당 시점(non-lazy 작업 전)까지 lazy 모드로 파이프라인을 실행한다.
  • 파이프라인을 .collect와 함께 실행해, DataFrame을 구체화 한다
  • DataFrame에 non-lazy 작업을 한다.
  • 출력을 다시 LazyFrame 로 변환(.lazy)하고 lazy 모드로 계속한다.

아래는 lazy API를 사용할 수 없는 작업 처리의 예시이다.

  • create a simple DataFrame
  • convert it to a LazyFrame with .lazy
  • do a transformation using .with_columns
  • execute the query before the pivot with .collect to get a DataFrame
  • do the .pivot on the DataFrame
  • convert back in lazy mode
  • do a .filter
  • finish by executing the query with .collect to get a DataFrame
lazy_eager_query = (
    pl.DataFrame(
        {
            "id": ["a", "b", "c"],
            "month": ["jan", "feb", "mar"],
            "values": [0, 1, 2],
        }
    )
    .lazy()
    .with_columns((2 * pl.col("values")).alias("double_values"))
    .collect()
    .pivot(
        index="id", columns="month", values="double_values", aggregate_function="first"
    )
    .lazy()
    .filter(pl.col("mar").is_null())
    .collect()
)
print(lazy_eager_query)
shape: (2, 4)
┌─────┬──────┬──────┬──────┐
│ id  ┆ jan  ┆ feb  ┆ mar  │
│ --- ┆ ---  ┆ ---  ┆ ---  │
│ str ┆ i64  ┆ i64  ┆ i64  │
╞═════╪══════╪══════╪══════╡
│ a   ┆ 0    ┆ null ┆ null │
│ b   ┆ null ┆ 2    ┆ null │
└─────┴──────┴──────┴──────┘
profile
Rustacean🦀

0개의 댓글