1부 - Polars 기본 활용법
2부 - Polars 심화 활용법
들어가기 앞서 아래 실습에 사용한 데이터 및 코드는 저의 Github에서 확인 가능합니다.
polars
라이브러리를 활용해 데이터를 분석하는데 필요한 기본적인 python 메소드들을 전반적으로 알아 보았습니다.polars
의 조금 더 심화적인 내용들을 다뤄볼까 합니다. SQL 쿼리를 활용한 Polars DataFrame 조회 방법부터 데이터 처리에서 중요한 파티셔닝 개념, 그리고 pandas
와 차별되는 polars
의 강력한 기능인 LazyFrame에 대하여 다뤄볼 예정입니다.pl.sql()
을 사용하면 Lazy Evaluation
방식으로 조회가 되기 때문에 결과물을 얻기 위해선 collect()
메소드를 같이 사용해주어야 한다.
- df.sql() +
self
이름으로 조회- pl.sql() +
df
이름으로 조회
import polars as pl
df = pl.read_csv("../dataset/emp.csv")
# 1 - df.sql() + self 이름으로 조회
df.sql("SELECT * FROM self LIMIT 5")
# 2 - pl.sql() + df 이름으로 조회
pl.sql("SELECT * FROM df LIMIT 5").collect()
SQLContext
객체를 통해 현재 작업 세션에 있는 DataFrame들을 더 유연하고 통합적으로 관리하고 조회할 수도 있다.SQLContext
객체를 생성하여 작업을 하게되면 polars DataFrame으로 저장된 객체들을 테이블로 등록한 후 DB처럼 활용할 수 있다.ctx = pl.SQLContext(
register_globals=False, # 해당 메모리에 있는 dataframe을 다 테이블로 등록하는 설정
eager=True # 지연 평가(lazy evaluation) 대신 즉시 실행(eager evaluation)을 사용할지 여부를 설정
)
ctx
객체에 등록하여 관리 및 조회할 수 있다.tp = pl.read_csv("../dataset/trainer_pokemon.csv")
pk = pl.read_csv("../dataset/pokemon.csv")
ctx.register_many(
trainer_pokemon=tp, # 테이블 이름 = DF 이름
pokemon=pk
)
ctx.tables()
---
['pokemon', 'trainer_pokemon']
SQLContext
는 with문을 활용한 자원 관리가 가능하다.with ctx:
pk = ctx.execute("SELECT * FROM pokemon")
display(pk)
with pl.SQLContext(
register_globals=True,
eager=True # collect()를 사용하지 않아도 되는 dataframe 타입으로 바로 읽음
) as ctx:
tdf = ctx.execute("SELECT * FROM tp LIMIT 5")
display(tdf)
sqlalchemy
라이브러리를 connector로 활용하여 연결을 하는데 그 전에 connectorx
라이브러리에 대한 설치도 추가적으로 필요하다.pip install sqlalchemy connectorx
sqlalchemy
로 연결할 수 있는 DBMS는 다 연동이 가능한데 아래 예제에서는 MySQL에 대해서만 다뤄볼 예정입니다.import polars as pl
from sqlalchemy import create_engine
user_id = "velog"
password = "123456"
host = "localhost"
port = "3306"
database = "mysql"
mysql_conn = create_engine(f'mysql://{user_id}:{password}@{host}:{port}/{database}')
query = "SHOW DATABASES"
df = pl.read_database(
query=query,
connection=mysql_conn
)
uri = f'mysql://{user_id}:{password}@{host}:{port}/{database}'
df = pl.read_database_uri(
query=query,
uri=uri
)
df.write_database(
table_name="table_name",
connection=mysql_conn,
if_table_exists="replace" # append, fail
)
partition_by()
의 대상 컬럼에 있는 값을 기준으로 테이블을 분리한 후 각 테이블의 shape과 데이터를 보여준다.df.partition_by('Gender')
battle └── yyyy=2024 └── mm=10 ├── dd=01 │ └── battle.parquet ├── dd=02 │ └── battle.parquet ├── dd=03 │ └── battle.parquet └── dd=04 └── battle.parquet
battle = pl.read_parquet(
source='./battle/',
hive_partitioning=True # 파티셔닝 반영하여 읽기
)
"""
* 개별 파일 컬럼
- 'id', 'player1_id', 'player2_id', 'winner_id', 'battle_date', 'battle_datetime', 'battle_timestamp'
* 파티셔닝 적용 테이블 컬럼
- 'id', 'player1_id', 'player2_id', 'winner_id', 'battle_date', 'battle_datetime', 'battle_timestamp', 'yyyy', 'mm', 'dd'
"""
# battle dataframe을 yyyy, mm, dd 컬럼들을 대상으로 파티셔닝하여 저장하고자 할 때
battle.write_parquet(
file="저장 경로",
partition_by=['yyyy','mm','dd']
)
Apache Spark
와 같은 대규모 데이터 처리 프레임워크에서 사용되는 방식으로 데이터 처리시 속도와 효율성을 크게 향상시킵니다.collect()
명령어가 호출되는 시점에 모든 작업을 최적화하여 실행한 후 결과를 반환합니다.lf = pl.scan_csv("../dataset/emp.csv")
type(lf)
---
polars.lazyframe.frame.LazyFrame
lf = df.lazy()
print(type(df))
print(type(lf))
---
polars.dataframe.frame.DataFrame
polars.lazyframe.frame.LazyFrame
collect()
메소드를 호출하여 DataFrame 형태로 바꿔주어야 한다.df = lf.collect()
lf.collect_schema()
---
Schema([('First Name', String),
('Gender', String),
('Start Date', String),
('Last Login Time', String),
('Salary', Int64),
('Bonus', Float64),
('Senior Management', Boolean),
('Team', String)])
filter
, group_by
, agg
, join
등과 같은 메소드를 동일하게 사용할 수 있다. 단, 결과물을 보기 위해서는 반드시 코드 마지막에 collect()
를 붙여주어야 한다.# filter 예시
condition = (pl.col('Gender') == 'Female') & (pl.col('Bonus') >= 10)
lf.filter(condition)
# sort 예시
lf.sort(
by=['Team', 'Salary'],
descending=[False, True],
nulls_last=True
)
# LazyFrame group_by 예시
lazy_df = lf.group_by('Team').agg(
cnt = pl.count('Team'),
max_sal = pl.max('Salary'),
mean_sal = pl.mean('Salary'),
min_sal = pl.min('Salary'),
mean_bonus = pl.mean('Bonus'),
).sort('mean_sal', descending=True)
print(lazy_df.explain())
---
SORT BY [col("mean_sal")]
AGGREGATE
[col("Team").count().alias("cnt"), col("Salary").max().alias("max_sal"), col("Salary").mean().alias("mean_sal"), col("Salary").min().alias("min_sal"), col("Bonus").mean().alias("mean_bonus")] BY [col("Team")] FROM
simple π 3/3 ["Team", "Salary", "Bonus"]
Csv SCAN [../dataset/emp.csv]
PROJECT 3/8 COLUMNS
lazy_df.profile()
lazy_df.show_graph()
# LazyFrame을 CSV 파일로 저장
lazy_df.sink_csv("lf_csv.csv")
# LazyFrame을 Parquet 파일로 저장
lazy_df.sink_parquet("lf_parquet.parquet")
collect()
메소드를 사용하게 되면 데이터를 한 번에 메모리로 로드하고 모든 연산을 처리한 후 결과를 반환한다. 따라서, 대규모 데이터셋에서는 LazyFrame으로 작업을 하더라도 결과 확인시에는 메모리 사용량이 커질 수 밖에 없고, 메모리 한도를 초과해버리면 세션이 멈추는 상황이 발생하게된다.collect()
메소드에서 streaming=True
옵션을 설정하면 데이터 처리시 메모리에 전체 데이터를 로드하는 대신 스트리밍 방식으로 처리하게 된다. 이를 통해 LazyFrame의 쿼리를 실행할 때 데이터를 작은 청크(chunk) 단위로 처리하여 메모리 사용량을 줄이고, 성능을 최적화할 수 있는 것이다.filter, group_by, aggregation, with_columns, map 등
sort 등 연산을 위해 전체 데이터 확인이 필요한 경우
lf.collect(streaming=True)
이번 시리즈를 통해 Polars의 기본적인 사용법부터 시작해, SQL, 고급 기능인 SQLContext, 데이터 파티셔닝, 그리고 LazyFrame을 활용한 데이터 처리까지 다뤄보았습니다.
눈에 띄는 장점으로는 아래 내용들을 들 수 있을 것 같습니다.
DataFrame
외에도 지연 실행 기능을 제공하는 LazyFrame
을 활용해 데이터 분석에서 높은 효율성을 얻을 수 있습니다.이번 시리즈를 통해 polars를 더 잘 활용할 수 있는 계기가 되었으면 좋겠습니다.