이번에 패스트 캠퍼스 강의를 들으며 처음으로 Dask라는 라이브러리를 알게 되었다.
Dask는 Python에서 병렬 처리를 가능하게 하는 라이브러리로, 대규모 데이터셋을 사용하는 복잡한 계산을 빠르게 수행해야 할 때 유용하다. Apache Spark의 pyspark도 많이 사용하지만, pandas에 익숙해져 있는 상태로는 사용하기 불편하다는 문제점이 있고 Dask는 이 문제에 대한 해결책이 될 수 있다. Dask는 NumPy, Pandas와 같은 Python의 기술 스택과 밀접하게 통합되어 있어, 대규모 데이터셋을 사용하는 경우에도 친숙한 인터페이스를 제공한다는 장점이 있다.
Dask의 주요 구성은 크게 두 가지로, 동적 작업 스케줄링과 데이터 컬렉션이 있다.
1. 동적 작업 스케줄링 : 복잡한 작업을 작은 작업으로 나누고, 이를 동시에 실행하기 위한 스케줄링을 담당
2. 데이터 컬렉션 : Dask Array, Dask DataFrame, Dask Bag 등 자료 구조를 제공하여 대규모 데이터셋을 효율적으로 처리
코드를 통해 보면 더 쉽게 알 수 있다.
def create_datasets(nrows: int, ncols: int) -> tuple[pd.DataFrame, pd.DataFrame]:
main_data = {f"col_{i}": np.random.rand(nrows) for i in range(ncols)}
ref_data = {f"col_{i}": np.random.rand(nrows//10) for i in range(ncols)}
main_df = pd.DataFrame(main_data)
ref_df = pd.DataFrame(ref_data)
return main_df, ref_df
위 코드로 임의의 데이터셋을 생성해 준다.
def pandas_operations(main_df: pd.DataFrame, ref_df: pd.DataFrame) -> tuple[float, int]:
start_time_agg = time.time()
grouped = main_df.groupby("col_0").mean()
end_time_agg = time.time()
start_time_join = time.time()
joined= main_df.merge(ref_df, on="col_0", how="left")
end_time_join= time.time()
return end_time_agg - start_time_agg, end_time_join - start_time_join
def dask_operations(
main_df: pd.DataFrame, ref_df: pd.DataFrame, npartitions: int) -> tuple[float,float]:
dmain_df = dd.from_pandas(main_df, npartitions=npartitions)
dref_df = dd.from_pandas(ref_df, npartitions=npartitions)
start_time_agg = time.time()
grouped_task = dmain_df.groupby("col_0").mean()
grouped = grouped_task.compute() # compute를 넣어주어야 일을 시킴
end_time_agg = time.time()
grouped_task.visualize("grouped.svg")
start_time_join = time.time()
joined_task = dmain_df.merge(dref_df, on="col_0", how="left")
joined = joined_task.compute()
end_time_join= time.time()
joined_task.visualize("joined.svg")
return end_time_agg - start_time_agg, end_time_join - start_time_join
위 함수는 pandas 프레임워크를 활용해 데이터셋의 평균을 구하는 연산 과정이고, 아래 함수는 dask 프레임워크를 활용해 같은 연산을 병렬로 진행하는 것이다.
연산이 걸리는 시간은 다음과 같이 출력되었다.
Pandas 집계 시간: 7.414660930633545 초
Pandas 조인 시간: 10.029629230499268 초
Dask 집계 시간: 27.535321712493896 초
Dask 조인 시간: 7.579635143280029 초
Dask로 걸리는 시간이 더 오래 걸린 이유는 평균 계산은 덧셈, 나눗셈 등의 단순 연산만 존재하기 때문에 오히려 쪼개면 더 오래 걸리기 때문인 것 같다.
근데 시간이 짧게 걸린다고 하고 예시는 더 오래 걸려서 더 짧게 걸리는 간단한 예제가 무엇이 있을까 찾아보았고, 보다 큰 대규모 데이터셋을 처리하는 상황을 접해볼 수 있었다.
아래는 2001년부터 현재까지의 시카고 주의 범죄 관련 데이터이다.
https://catalog.data.gov/dataset/crimes-2001-to-present-398a4
import dask.dataframe as dd
from datetime import datetime
import time
start_time = datetime.now()
st = time.time()
print("start_time : {}".format(start_time))
ddf = dd.read_csv("crime.csv", dtype=str, on_bad_lines='skip') # Updated for future-proofing based on warning
print(ddf.head())
ddf = ddf.dropna()
# Correctly process 'Location' to extract 'lat' and 'lon'
ddf['Location'] = ddf['Location'].str.replace(r"[()]", "", regex=True).str.split(',') # Use regex to remove parentheses
# Assign 'lat' and 'lon' from the split 'Location'
ddf['lat'] = ddf['Location'].map(lambda x: x[0] if x else None).astype('float64')
ddf['lon'] = ddf['Location'].map(lambda x: x[1] if x else None).astype('float64')
print(ddf.head(30))
# Group by 'Date' and aggregate 'lat'
agg_ddf = ddf.groupby(['Date']).agg({"lat": ['mean', 'sum', 'count']})
print(agg_ddf.head(30))
# Count rows to check dataframe size
print(ddf.count().compute())
end_time = datetime.now()
et = time.time()
print("running time : {} seconds".format(et - st))
print("end_time : {}".format(end_time))
얘는 dask를 쓴 것이고,
import pandas as pd
from datetime import datetime
start_time = datetime.now()
print("start_time : {}".format(start_time))
# Read CSV using Pandas
df = pd.read_csv("crime.csv", dtype=str, na_filter=True)
print(df.head())
# Drop rows with any NA values
df = df.dropna()
# Process 'Location' to extract 'lat' and 'lon'
df['Location'] = df['Location'].str.replace(r"[()]", "", regex=True).str.split(',')
df['lat'] = df['Location'].apply(lambda x: float(x[0]) if x else None)
df['lon'] = df['Location'].apply(lambda x: float(x[1]) if x else None)
print(df.head(30))
# Group by 'Date' and aggregate 'lat'
agg_df = df.groupby('Date')['lat'].agg(['mean', 'sum', 'count'])
print(agg_df.head(30))
# Count rows to check dataframe size
print(df.count())
end_time = datetime.now()
print("running time : {} seconds".format((end_time - start_time).total_seconds()))
print("end_time : {}".format(end_time))
얘는 dask를 안 쓴건데
놀랍게도 dask를 썼을 때는 230초, dask를 쓰지 않았을 때는 123초가 걸린 것을 확인할 수 있었다.
아마 데이터셋 양 자체의 문제는 둘째치고 연산 자체가 너무 단순해서 그런 것이 아닌가 생각이 든다. 나중에 다시 시도할 일이 많겠지... 한 가지 확실한 것은 pandas 프레임워크와 비슷해서 쓰긴 정말 쉬운 듯하다.