CSV 파일 다루기 — 행 단위 독립 연산

Pt J·2026년 4월 1일
post-thumbnail

CSV 파일 다루기 — 행 단위 독립 연산

CSV 파일은 DBMS를 사용할 정도의 규모와 복잡도를 갖지 않은
간단하지만 많은 양의 데이터를 다룰 때 유용하게 사용된다.

CSV 파일의 행 단위 독립 연산을 빠르게 수행하는 방법을 알아보자.

작업공간 생성 및 구조 확인

~/workspace$ mkdir csv-processor && cd csv-processor
~/workspace/csv-processor$ python3 -m venv venv
~/workspace/csv-processor$ source venv/bin/activate
~/workspace/csv-processor$ # 평소에 설치하던 것 외에 추가된 라이브러리를 놓치지 말자
~/workspace/csv-processor$ pip install maturin fastapi uvicorn orjson numpy pandas
~/workspace/csv-processor$ maturin init
~/workspace/csv-processor$ # 선택지 중 기본값인 PyO3 선택
~/workspace/csv-processor$ # Cargo.toml과 src/lib.rs가 자동 생성된다
~/workspace/csv-processor$ # Python 코드는 직접 생성해 주어야 한다
~/workspace/csv-processor$ mkdir app && touch app/main.py
~/workspace/csv-processor$ tree -I venv
.
├── app
│   └── main.py
├── Cargo.toml
├── pyproject.toml
└── src
    └── lib.rs

Cargo.toml 파일을 열어 라이브러리 이름을 수정해 주겠다.

CSV 처리를 위한 Rust 생태계의 표준인 csv 크레이트와
데이터 직렬화를 담당하는 serde 크레이트를 추가한다.

features = ["derive"] 를 명시해 주어야
serde 크레이트의 매크로를 사용할 수 있음을 유의하자.

CSV 파일의 크기가 클 경우 MMAP을 사용하는 게 효율적이다.
여기서는 대용량 CSV 파일을 상정하고 실습을 진행할 것이므로
MMAP을 사용하기 위해 memmap2 크레이트도 사용한다.

CSV 파일 다루는 것 자체는 순차적으로 진행되지만
데이터를 파싱하고 계산하는 과정은 병렬화가 가능하므로 rayon도 사용한다.

Cargo.toml

[package]
name = "csv-processor"
version = "0.1.0"
edition = "2024"

[lib]
name = "rust_engine"
crate-type = ["cdylib"]

[dependencies]
pyo3 = "0.28.0"
rayon = "1.11"
memmap2 = "0.9"
csv = "1.4"
serde = { version = "1.0", features = ["derive"] }

코드 작성

Rust 코드

연습 수준에서는 모든 데이터를 벡터로 수집하여 처리해도 괜찮지만
데이터 양이 많아질 경우에는 메모리 이슈가 발생하므로
데이터를 메모리에 쌓지 않고 읽는 즉시 병렬 파이프라인으로 던지는 방식으로
스트리밍 병렬 처리를 하는 것이 효율적이다.

코어 수에 맞게 청크를 나눌 때 라인 중간에서 끊기면 안 되므로
최적의 경계로 분리하는 기준점을 제시하는 헬퍼 함수를 따로 작성한다.

Rust의 csv 생태계에서는 관습적으로
Reader를 rdr, Writer를 wtr 라는 약어로 사용한다는 건 여담.

src/lib.rs

use pyo3::prelude::*;
use memmap2::Mmap;
use std::fs::File;
use rayon::prelude::*;
use csv::{ReaderBuilder, ByteRecord};

/// 청크가 줄 중간에서 잘리지 않도록 파일을 코어 수에 맞춰 최적의 경계로 분리
fn get_split_points(bytes: &[u8], num_chunks: usize) -> Vec<usize> {
    let mut points = vec![0];
    let base_size = bytes.len() / num_chunks;

    for i in 1..num_chunks {
        let target = i * base_size;
        let pos = bytes[target..].iter()
            .position(|&b| b == b'\n')
            .map(|p| target + p + 1)
            .unwrap_or(bytes.len());

        if pos < bytes.len() && !points.contains(&pos) {
            points.push(pos);
        }
    }
  
    points.push(bytes.len());

    points
}

#[pyfunction]
fn process_large_csv(file_path: String) -> PyResult<(f64, usize)> {
    let file = File::open(file_path)?;
    let mmap = unsafe {
    	Mmap::map(&file)?
    };
    let bytes = &mmap[..];

    let split_points = get_split_points(bytes, rayon::current_num_threads());

    let result = split_points.windows(2) // (start, end) 쌍으로 사용
        .enumerate() // 인덱스와 요소를 튜플로 받아 사용: (index, (start, end))
        .collect::<Vec<_>>()
        .into_par_iter()
        .map(|(i, window)| {
            let (start, end) = (window[0], window[1]);
            let mut rdr = ReaderBuilder::new()
                .has_headers(i == 0)
                .from_reader(&bytes[start..end]);

            let mut record = ByteRecord::new();
            let (mut l_sum, mut l_count) = (0.0, 0);

            while rdr.read_byte_record(&mut record).unwrap_or(false) {
                // 0:id, 1:value, 2:category
                if record.get(2) == Some(b"A") {
                    if let Some(val_bytes) = record.get(1) {
                        if let Ok(s) = std::str::from_utf8(val_bytes) {
                            if let Ok(val) = s.parse::<f64>() {
                                l_sum += val;
                                l_count += 1;
                            }
                        }
                    }
                }
            }

            (l_sum, l_count)
        })
        .reduce(
            || (0.0, 0),
            |(s1, c1), (s2, c2)| (s1 + s2, c1 + c2)
        );

    Ok(result)
}

#[pymodule]
fn rust_engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(process_large_csv, m)?)?;

    Ok(())
}

Python 코드

C언어 기반으로 최적화되어 있는 Pandas도 충분히 빠르지만
우리가 작성한 Rust 코드를 사용하여 데이터를 처리하는 것과
Pandas를 사용하여 데이터를 처리하는 것의 시간을 비교해 보자.

app/main.py

import pandas as pd
import numpy as np
from fastapi import FastAPI
from fastapi.responses import ORJSONResponse
import rust_engine
import time
import os

class UTF8ORJSONResponse(ORJSONResponse):
    media_type = "application/json; charset=utf-8"

app = FastAPI(default_response_class=UTF8ORJSONResponse)

@app.get("/")
def read_root():
    return {
        "status": "200",
        "info": "서버 가동 중입니다."
    }

@app.get("/setup-csv/{rows}")
def setup_csv(rows: int):
    file_path = "large_data.csv"

    df = pd.DataFrame({
        "id": range(rows),
        "value": np.random.rand(rows),
        "category": np.random.choice(["A", "B", "C", "D"], rows)
    })

    df.to_csv(file_path, index=False)

    return {
        "message": f"{rows:,}줄 CSV 파일 생성 완료"
    }

@app.get("/process-csv")
def process_csv():
    file_path = "large_data.csv"

    if not os.path.exists(file_path):
        return {
            "error": "파일이 없습니다. /setup-csv 라우트를 먼저 호출해주세요."
        }

    start_rust = time.perf_counter()
    rust_sum, rust_count = rust_engine.process_large_csv(file_path)
    end_rust = time.perf_counter()

    rust_duration = end_rust - start_rust

    start_pd = time.perf_counter()
    df = pd.read_csv(file_path)
    pd_sum = df[df["category"] == "A"]["value"].sum()
    end_pd = time.perf_counter()

    pd_duration = end_pd - start_pd

    return {
        "total_rows": rust_count,
        "rust_time": f"{rust_duration:.4f} sec",
        "pandas_time": f"{pd_duration:.4f} sec",
        "speed_up": f"{pd_duration / rust_duration:.1f}x",
        "results": {
            "rust_sum": rust_sum,
            "pandas_sum": pd_sum
        }
    }

빌드 및 실행

Maturin 라이브러리를 통해 Rust 코드를 Python에서 호출 가능한 형태로 컴파일한다.
병렬 처리가 포함된 코드는 성능 최적화를 위해 --release 를 붙여 컴파일한다.
컴파일 후 pip list 명령어를 사용해 보면 Cargo.toml 파일에 작성한 패키지 이름을 확인할 수 있다.

uvicorn 라이브러리를 통해 FastAPI를 실행한다.

~/workspace/csv-processor$ maturin develop --release
~/workspace/csv-processor$ uvicorn app.main:app --reload

curl 명령어 또는 브라우저를 통해 다음과 같은 테스트를 해볼 수 있다.

  • http://127.0.0.1:8000/setup-csv/50000000
  • http://127.0.0.1:8000/process-csv
~$ curl -i http://127.0.0.1:8000/setup-csv/50000000
HTTP/1.1 200 OK
date: Wed, 01 Apr 2026 05:11:26 GMT
server: uvicorn
content-length: 52
content-type: application/json; charset=utf-8

{"message":"50,000,000줄 CSV 파일 생성 완료"}%
~$ curl -i http://127.0.0.1:8000/process-csv       
HTTP/1.1 200 OK
date: Wed, 01 Apr 2026 05:12:02 GMT
server: uvicorn
content-length: 166
content-type: application/json; charset=utf-8

{"total_rows":12502433,"rust_time":"0.2692 sec","pandas_time":"4.8297 sec","speed_up":"17.9x","results":{"rust_sum":6252556.523183651,"pandas_sum":6252556.523183713}}%

부동소수점 연산 특성 상 연산 순서에 따라 소숫점 아래 낮은 자리 값은 오차가 있을 수 있다.

profile
Peter J Online Space - since July 2020 | 아무데서나 채용해줬으면 좋겠다

0개의 댓글