[Polars 총정리 2부_심화] Polars로 데이터 분석을 더 빠르게!

NewNewDaddy·2024년 12월 26일
1

데이터 분석

목록 보기
5/6
post-thumbnail

1부 - Polars 기본 활용법

2부 - Polars 심화 활용법

🔹 0. INTRO

들어가기 앞서 아래 실습에 사용한 데이터 및 코드는 저의 Github에서 확인 가능합니다.

  • 앞서 1부에서는 polars 라이브러리를 활용해 데이터를 분석하는데 필요한 기본적인 python 메소드들을 전반적으로 알아 보았습니다.
  • 2부에서는 polars의 조금 더 심화적인 내용들을 다뤄볼까 합니다. SQL 쿼리를 활용한 Polars DataFrame 조회 방법부터 데이터 처리에서 중요한 파티셔닝 개념, 그리고 pandas와 차별되는 polars의 강력한 기능인 LazyFrame에 대하여 다뤄볼 예정입니다.

🔹 1. SQL

▪ 1-1) SQL로 데이터 조회하기

  • polars dataframe을 SQL 쿼리로 조회할 수 있다.
  • 아래와 같이 두 가지 방법으로 조회가 가능한데 pl.sql()을 사용하면 Lazy Evaluation 방식으로 조회가 되기 때문에 결과물을 얻기 위해선 collect() 메소드를 같이 사용해주어야 한다.
    1. df.sql() + self 이름으로 조회
    2. pl.sql() + df 이름으로 조회
import polars as pl

df = pl.read_csv("../dataset/emp.csv")

# 1 - df.sql() + self 이름으로 조회
df.sql("SELECT * FROM self LIMIT 5")

# 2 - pl.sql() + df 이름으로 조회
pl.sql("SELECT * FROM df LIMIT 5").collect()

▪ 1-2) SQLContext 객체 생성

  • 위와 같이 SQL 쿼리로 DataFrame을 원하는 형태로 조회하여 사용할 수도 있고, polars에 있는 SQLContext 객체를 통해 현재 작업 세션에 있는 DataFrame들을 더 유연하고 통합적으로 관리하고 조회할 수도 있다.
  • SQLContext 객체를 생성하여 작업을 하게되면 polars DataFrame으로 저장된 객체들을 테이블로 등록한 후 DB처럼 활용할 수 있다.
ctx = pl.SQLContext(
    register_globals=False, # 해당 메모리에 있는 dataframe을 다 테이블로 등록하는 설정
    eager=True # 지연 평가(lazy evaluation) 대신 즉시 실행(eager evaluation)을 사용할지 여부를 설정
)

▪ 1-3) SQLContext 객체에 테이블 등록

  • 현재 작업 세션 메모리에 저장되어 있는 DataFrame을 위에서 생성한 ctx 객체에 등록하여 관리 및 조회할 수 있다.
tp = pl.read_csv("../dataset/trainer_pokemon.csv")
pk = pl.read_csv("../dataset/pokemon.csv")

ctx.register_many(
    trainer_pokemon=tp, # 테이블 이름 = DF 이름
    pokemon=pk
)

▪ 1-4) 객체에 등록된 테이블 확인

ctx.tables()

---
['pokemon', 'trainer_pokemon']

▪ 1-5) 객체에 등록된 테이블 조회

  • SQLContext는 with문을 활용한 자원 관리가 가능하다.
with ctx:
    pk = ctx.execute("SELECT * FROM pokemon")
    display(pk)

▪ 1-6) 통합 코드

with pl.SQLContext(
    register_globals=True,
    eager=True # collect()를 사용하지 않아도 되는 dataframe 타입으로 바로 읽음
    ) as ctx:
    tdf = ctx.execute("SELECT * FROM tp LIMIT 5")
    display(tdf)

🔹 2. DATABASE와 연동

  • Polars Database Docs
  • polars 역시 pandas와 비슷하게 Database와 연동할 수 있는 메소드를 제공해주고 있다. 기본적으로 sqlalchemy라이브러리를 connector로 활용하여 연결을 하는데 그 전에 connectorx 라이브러리에 대한 설치도 추가적으로 필요하다.

    pip install sqlalchemy connectorx

  • sqlalchemy로 연결할 수 있는 DBMS는 다 연동이 가능한데 아래 예제에서는 MySQL에 대해서만 다뤄볼 예정입니다.

▪ 2-1) 테이블 읽기 - 1

import polars as pl
from sqlalchemy import create_engine

user_id = "velog"
password = "123456"
host = "localhost"
port = "3306"
database = "mysql"

mysql_conn = create_engine(f'mysql://{user_id}:{password}@{host}:{port}/{database}')

query = "SHOW DATABASES"

df = pl.read_database(
    query=query,
    connection=mysql_conn
    )

▪ 2-2) 테이블 읽기 - 2

uri = f'mysql://{user_id}:{password}@{host}:{port}/{database}'

df = pl.read_database_uri(
	query=query,
    uri=uri
    )

▪ 2-3) 테이블 쓰기

  • 연결된 DBMS의 테이블로 dataframe을 저장합니다.
df.write_database(
    table_name="table_name",
    connection=mysql_conn,
    if_table_exists="replace" # append, fail
    )

🔹 3. PARTITIONING

▪ 2-1) 파티션 나누어 출력

  • partition_by()의 대상 컬럼에 있는 값을 기준으로 테이블을 분리한 후 각 테이블의 shape과 데이터를 보여준다.
df.partition_by('Gender')

▪ 2-2) 파티션 반영하여 READ

  • 데이터가 아래와 같이 연도(yyyy), 월(mm), 일(dd) 디렉토리 구조로 저장되어 있다. 이 경우 데이터는 연도, 월, 일 기준으로 파티셔닝되어 저장된 형태라고 할 수 있는데 이러한 디렉토리 구조에서는 모든 파일을 개별적으로 읽을 필요 없이, 파티션 구조를 활용하여 상위 디렉토리만 지정해 데이터를 바로 읽어들일 수 있다.
  • 파티션의 기준이 되는 yyyy, mm, dd는 테이블의 컬럼으로 들어가게 된다.
    battle
      └── yyyy=2024
           └── mm=10
               ├── dd=01
               │   └── battle.parquet
               ├── dd=02
               │   └── battle.parquet
               ├── dd=03
               │   └── battle.parquet
               └── dd=04
                   └── battle.parquet
battle = pl.read_parquet(
    source='./battle/',
    hive_partitioning=True # 파티셔닝 반영하여 읽기
    )
    
"""
* 개별 파일 컬럼
	- 'id', 'player1_id', 'player2_id', 'winner_id', 'battle_date', 'battle_datetime', 'battle_timestamp'
    
* 파티셔닝 적용 테이블 컬럼
	- 'id', 'player1_id', 'player2_id', 'winner_id', 'battle_date', 'battle_datetime', 'battle_timestamp', 'yyyy', 'mm', 'dd'
"""

▪ 2-3) 파티션 반영하여 WRITE

  • dataframe의 특정 컬럼들을 대상으로 파티션 적용하여 저장하는 것도 가능하다.
# battle dataframe을 yyyy, mm, dd 컬럼들을 대상으로 파티셔닝하여 저장하고자 할 때

battle.write_parquet(
    file="저장 경로",
    partition_by=['yyyy','mm','dd']
    )

🔹 4. LazyFrame 기본

  • LazyFrame은 Polars의 핵심 기능으로, 지연 실행(Lazy Evaluation)을 통해 작업 명령을 즉시 실행하지 않고, 실행 계획을 최적화한 뒤 한 번에 처리합니다. 이는 Apache Spark와 같은 대규모 데이터 처리 프레임워크에서 사용되는 방식으로 데이터 처리시 속도와 효율성을 크게 향상시킵니다.
  • LazyFrame은 해당 테이블을 대상으로 하는 여러 작업 명령을 메모리 상에서 기록하고, collect() 명령어가 호출되는 시점에 모든 작업을 최적화하여 실행한 후 결과를 반환합니다.

▪ 3-1) LazyFrame으로 데이터 읽기

  • 일반적으로 쓰는 read 명령어가 아닌, scan 명령어를 통해 파일을 읽게되면 LazyFrame 타입으로 저장된다.
lf = pl.scan_csv("../dataset/emp.csv")

type(lf)

---
polars.lazyframe.frame.LazyFrame

▪ 3-2) DataFrame → LazyFrame

lf = df.lazy()

print(type(df))
print(type(lf))

---
polars.dataframe.frame.DataFrame
polars.lazyframe.frame.LazyFrame

▪ 3-3) LazyFrame → DataFrame

  • LazyFrame의 실제 결과를 조회하기 위해서는 collect() 메소드를 호출하여 DataFrame 형태로 바꿔주어야 한다.
df = lf.collect()

▪ 3-4) LazyFrame 스키마 확인

lf.collect_schema()

---
Schema([('First Name', String),
        ('Gender', String),
        ('Start Date', String),
        ('Last Login Time', String),
        ('Salary', Int64),
        ('Bonus', Float64),
        ('Senior Management', Boolean),
        ('Team', String)])

▪ 3-5) LazyFrame 테이블 가공

  • LazyFrame 테이블 역시 일반적인 DataFrame 다루듯 filter, group_by, agg, join 등과 같은 메소드를 동일하게 사용할 수 있다. 단, 결과물을 보기 위해서는 반드시 코드 마지막에 collect()를 붙여주어야 한다.
# filter 예시
condition = (pl.col('Gender') == 'Female') & (pl.col('Bonus') >= 10)
lf.filter(condition)

# sort 예시
lf.sort(
    by=['Team', 'Salary'], 
    descending=[False, True],
    nulls_last=True
    )

# LazyFrame group_by 예시
lazy_df = lf.group_by('Team').agg(
    cnt = pl.count('Team'),
    max_sal = pl.max('Salary'),
    mean_sal = pl.mean('Salary'),
    min_sal = pl.min('Salary'),
    mean_bonus = pl.mean('Bonus'),
    ).sort('mean_sal', descending=True)

🔹 5. LazyFrame 활용

▪ 4-1) explain

  • LazyFrame의 쿼리 실행 계획을 텍스트 형태로 출력하여 최적화된 실행 계획을 상세히 보여주는 함수
print(lazy_df.explain())

---
SORT BY [col("mean_sal")]
  AGGREGATE
  	[col("Team").count().alias("cnt"), col("Salary").max().alias("max_sal"), col("Salary").mean().alias("mean_sal"), col("Salary").min().alias("min_sal"), col("Bonus").mean().alias("mean_bonus")] BY [col("Team")] FROM
    simple π 3/3 ["Team", "Salary", "Bonus"]
      Csv SCAN [../dataset/emp.csv]
      PROJECT 3/8 COLUMNS

▪ 4-2) profile

  • 코드 실행 결과 테이블과 각 작업별 실행시간(마이크로초) 정보를 보여주는 테이블을 출력
lazy_df.profile()

▪ 4-3) show_graph

  • LazyFrame의 실행 계획을 그래프로 시각화하여 보여주는 함수
lazy_df.show_graph()

▪ 4-4) LazyFrame 파일로 저장

  • LazyFrame의 결과를 특정 파일 형태로 직접 저장하는 기능으로, DataFrame 형태가 아니어도 파일 저장이 가능하다.
  • csv, parquet을 비롯해 pyarrow, delta, iceberg 등의 형태로도 저장 가능하며, 각 메소드별로 적용할 수 있는 파라미터가 아주 다양하다.
# LazyFrame을 CSV 파일로 저장
lazy_df.sink_csv("lf_csv.csv")

# LazyFrame을 Parquet 파일로 저장
lazy_df.sink_parquet("lf_parquet.parquet")

▪ 4-5) collect(streaming=True)

  • collect() 메소드를 사용하게 되면 데이터를 한 번에 메모리로 로드하고 모든 연산을 처리한 후 결과를 반환한다. 따라서, 대규모 데이터셋에서는 LazyFrame으로 작업을 하더라도 결과 확인시에는 메모리 사용량이 커질 수 밖에 없고, 메모리 한도를 초과해버리면 세션이 멈추는 상황이 발생하게된다.
  • collect() 메소드에서 streaming=True 옵션을 설정하면 데이터 처리시 메모리에 전체 데이터를 로드하는 대신 스트리밍 방식으로 처리하게 된다. 이를 통해 LazyFrame의 쿼리를 실행할 때 데이터를 작은 청크(chunk) 단위로 처리하여 메모리 사용량을 줄이고, 성능을 최적화할 수 있는 것이다.
  • 단, 모든 연산에 대하여 해당 옵션이 적용되는 것은 아니며 스트리밍 불가능한 연산이 포함된 경우, Polars는 스트리밍을 중단하고 기본 방식으로 데이터를 처리한다.
    • ✅ 스트리밍 가능 연산

      filter, group_by, aggregation, with_columns, map 등

    • ❗ 스트리밍 불가능 연산

      sort 등 연산을 위해 전체 데이터 확인이 필요한 경우

lf.collect(streaming=True)

🔹 6. OUTRO

  • 이번 시리즈를 통해 Polars의 기본적인 사용법부터 시작해, SQL, 고급 기능인 SQLContext, 데이터 파티셔닝, 그리고 LazyFrame을 활용한 데이터 처리까지 다뤄보았습니다.

  • 눈에 띄는 장점으로는 아래 내용들을 들 수 있을 것 같습니다.

    • 속도: Rust로 구현된 Polars는 병렬 처리를 통해 대규모 데이터셋에서도 뛰어난 성능을 발휘합니다.
    • 학습 용이성: Pandas와 유사한 문법을 대부분 지원하며, SQL 스타일의 쿼리도 가능하므로 Pandas에 익숙한 사용자라면 러닝 커브 없이 바로 활용할 수 있습니다.
    • 다양성: 기본 DataFrame 외에도 지연 실행 기능을 제공하는 LazyFrame을 활용해 데이터 분석에서 높은 효율성을 얻을 수 있습니다.
    • 유연성: 파티셔닝과 스트리밍 처리 기능을 통해 데이터 워크플로우를 최적화할 수 있습니다.

    이번 시리즈를 통해 polars를 더 잘 활용할 수 있는 계기가 되었으면 좋겠습니다.

profile
데이터 엔지니어의 작업공간 / #PYTHON #CLOUD #SPARK #AWS #GCP #NCLOUD

0개의 댓글