1부 - Polars 기본 활용법
2부 - Polars 심화 활용법
들어가기 앞서 아래 실습에 사용한 데이터 및 코드는 저의 Github에서 확인 가능합니다.
polars 라이브러리를 활용해 데이터를 분석하는데 필요한 기본적인 python 메소드들을 전반적으로 알아 보았습니다.polars의 조금 더 심화적인 내용들을 다뤄볼까 합니다. SQL 쿼리를 활용한 Polars DataFrame 조회 방법부터 데이터 처리에서 중요한 파티셔닝 개념, 그리고 pandas와 차별되는 polars의 강력한 기능인 LazyFrame에 대하여 다뤄볼 예정입니다.pl.sql()을 사용하면 Lazy Evaluation 방식으로 조회가 되기 때문에 결과물을 얻기 위해선 collect() 메소드를 같이 사용해주어야 한다.
- df.sql() +
self이름으로 조회- pl.sql() +
df이름으로 조회
import polars as pl
df = pl.read_csv("../dataset/emp.csv")
# 1 - df.sql() + self 이름으로 조회
df.sql("SELECT * FROM self LIMIT 5")
# 2 - pl.sql() + df 이름으로 조회
pl.sql("SELECT * FROM df LIMIT 5").collect()
SQLContext 객체를 통해 현재 작업 세션에 있는 DataFrame들을 더 유연하고 통합적으로 관리하고 조회할 수도 있다.SQLContext 객체를 생성하여 작업을 하게되면 polars DataFrame으로 저장된 객체들을 테이블로 등록한 후 DB처럼 활용할 수 있다.ctx = pl.SQLContext(
register_globals=False, # 해당 메모리에 있는 dataframe을 다 테이블로 등록하는 설정
eager=True # 지연 평가(lazy evaluation) 대신 즉시 실행(eager evaluation)을 사용할지 여부를 설정
)
ctx 객체에 등록하여 관리 및 조회할 수 있다.tp = pl.read_csv("../dataset/trainer_pokemon.csv")
pk = pl.read_csv("../dataset/pokemon.csv")
ctx.register_many(
trainer_pokemon=tp, # 테이블 이름 = DF 이름
pokemon=pk
)
ctx.tables()
---
['pokemon', 'trainer_pokemon']
SQLContext는 with문을 활용한 자원 관리가 가능하다.with ctx:
pk = ctx.execute("SELECT * FROM pokemon")
display(pk)
with pl.SQLContext(
register_globals=True,
eager=True # collect()를 사용하지 않아도 되는 dataframe 타입으로 바로 읽음
) as ctx:
tdf = ctx.execute("SELECT * FROM tp LIMIT 5")
display(tdf)
sqlalchemy라이브러리를 connector로 활용하여 연결을 하는데 그 전에 connectorx 라이브러리에 대한 설치도 추가적으로 필요하다.pip install sqlalchemy connectorx
sqlalchemy로 연결할 수 있는 DBMS는 다 연동이 가능한데 아래 예제에서는 MySQL에 대해서만 다뤄볼 예정입니다.import polars as pl
from sqlalchemy import create_engine
user_id = "velog"
password = "123456"
host = "localhost"
port = "3306"
database = "mysql"
mysql_conn = create_engine(f'mysql://{user_id}:{password}@{host}:{port}/{database}')
query = "SHOW DATABASES"
df = pl.read_database(
query=query,
connection=mysql_conn
)
uri = f'mysql://{user_id}:{password}@{host}:{port}/{database}'
df = pl.read_database_uri(
query=query,
uri=uri
)
df.write_database(
table_name="table_name",
connection=mysql_conn,
if_table_exists="replace" # append, fail
)
partition_by()의 대상 컬럼에 있는 값을 기준으로 테이블을 분리한 후 각 테이블의 shape과 데이터를 보여준다.df.partition_by('Gender')
battle └── yyyy=2024 └── mm=10 ├── dd=01 │ └── battle.parquet ├── dd=02 │ └── battle.parquet ├── dd=03 │ └── battle.parquet └── dd=04 └── battle.parquet
battle = pl.read_parquet(
source='./battle/',
hive_partitioning=True # 파티셔닝 반영하여 읽기
)
"""
* 개별 파일 컬럼
- 'id', 'player1_id', 'player2_id', 'winner_id', 'battle_date', 'battle_datetime', 'battle_timestamp'
* 파티셔닝 적용 테이블 컬럼
- 'id', 'player1_id', 'player2_id', 'winner_id', 'battle_date', 'battle_datetime', 'battle_timestamp', 'yyyy', 'mm', 'dd'
"""
# battle dataframe을 yyyy, mm, dd 컬럼들을 대상으로 파티셔닝하여 저장하고자 할 때
battle.write_parquet(
file="저장 경로",
partition_by=['yyyy','mm','dd']
)
Apache Spark와 같은 대규모 데이터 처리 프레임워크에서 사용되는 방식으로 데이터 처리시 속도와 효율성을 크게 향상시킵니다.collect() 명령어가 호출되는 시점에 모든 작업을 최적화하여 실행한 후 결과를 반환합니다.lf = pl.scan_csv("../dataset/emp.csv")
type(lf)
---
polars.lazyframe.frame.LazyFrame
lf = df.lazy()
print(type(df))
print(type(lf))
---
polars.dataframe.frame.DataFrame
polars.lazyframe.frame.LazyFrame
collect() 메소드를 호출하여 DataFrame 형태로 바꿔주어야 한다.df = lf.collect()
lf.collect_schema()
---
Schema([('First Name', String),
('Gender', String),
('Start Date', String),
('Last Login Time', String),
('Salary', Int64),
('Bonus', Float64),
('Senior Management', Boolean),
('Team', String)])
filter, group_by, agg, join 등과 같은 메소드를 동일하게 사용할 수 있다. 단, 결과물을 보기 위해서는 반드시 코드 마지막에 collect()를 붙여주어야 한다.# filter 예시
condition = (pl.col('Gender') == 'Female') & (pl.col('Bonus') >= 10)
lf.filter(condition)
# sort 예시
lf.sort(
by=['Team', 'Salary'],
descending=[False, True],
nulls_last=True
)
# LazyFrame group_by 예시
lazy_df = lf.group_by('Team').agg(
cnt = pl.count('Team'),
max_sal = pl.max('Salary'),
mean_sal = pl.mean('Salary'),
min_sal = pl.min('Salary'),
mean_bonus = pl.mean('Bonus'),
).sort('mean_sal', descending=True)
print(lazy_df.explain())
---
SORT BY [col("mean_sal")]
AGGREGATE
[col("Team").count().alias("cnt"), col("Salary").max().alias("max_sal"), col("Salary").mean().alias("mean_sal"), col("Salary").min().alias("min_sal"), col("Bonus").mean().alias("mean_bonus")] BY [col("Team")] FROM
simple π 3/3 ["Team", "Salary", "Bonus"]
Csv SCAN [../dataset/emp.csv]
PROJECT 3/8 COLUMNS
lazy_df.profile()
lazy_df.show_graph()
# LazyFrame을 CSV 파일로 저장
lazy_df.sink_csv("lf_csv.csv")
# LazyFrame을 Parquet 파일로 저장
lazy_df.sink_parquet("lf_parquet.parquet")
collect() 메소드를 사용하게 되면 데이터를 한 번에 메모리로 로드하고 모든 연산을 처리한 후 결과를 반환한다. 따라서, 대규모 데이터셋에서는 LazyFrame으로 작업을 하더라도 결과 확인시에는 메모리 사용량이 커질 수 밖에 없고, 메모리 한도를 초과해버리면 세션이 멈추는 상황이 발생하게된다.collect() 메소드에서 streaming=True 옵션을 설정하면 데이터 처리시 메모리에 전체 데이터를 로드하는 대신 스트리밍 방식으로 처리하게 된다. 이를 통해 LazyFrame의 쿼리를 실행할 때 데이터를 작은 청크(chunk) 단위로 처리하여 메모리 사용량을 줄이고, 성능을 최적화할 수 있는 것이다.filter, group_by, aggregation, with_columns, map 등
sort 등 연산을 위해 전체 데이터 확인이 필요한 경우
lf.collect(streaming=True)
이번 시리즈를 통해 Polars의 기본적인 사용법부터 시작해, SQL, 고급 기능인 SQLContext, 데이터 파티셔닝, 그리고 LazyFrame을 활용한 데이터 처리까지 다뤄보았습니다.
눈에 띄는 장점으로는 아래 내용들을 들 수 있을 것 같습니다.
DataFrame 외에도 지연 실행 기능을 제공하는 LazyFrame을 활용해 데이터 분석에서 높은 효율성을 얻을 수 있습니다.이번 시리즈를 통해 polars를 더 잘 활용할 수 있는 계기가 되었으면 좋겠습니다.