CSV 파일 다루기 — 행 간 연산

Pt J·6일 전
post-thumbnail

CSV 파일 다루기 — 행 간 연산

단순히 개수를 세거나 총합을 구하는 건 비교적 쉽지만
이전 행이나 다음 행과 비교하거나
몇 개 행 단위로 묶어서 연산을 수행할 경우
병렬 처리를 위해 구간을 나누면서 문제가 발생할 수 있다.

몇 개 행 단위로 묶어서 연산을 하는 것은
구간을 나눌 때 기준 행 수의 배수로 나누면 되니
비교적 수월하게 구현할 수 있지만
이전 행과의 비교는 어떻게든 다른 구간과의 비교가 필요하다.

이런 상황에서 어떻게 처리할 수 있는지 알아보기 위해
현재 행의 value 값이 이전 행의 value 값보다 큰지 비교하여
그 결과를 저장하는 예제를 살펴보도록 하겠다.

이번에는 결과를 전송하지 않고
비교 결과 항목이 추가된 새 CSV 파일을 생성해 볼 것이다.

이 실습은 이전 실습에서 이어서 진행한다.
따라서 작업공간 생성 및 구조 확인은 생략한다.

코드 작성

Rust 코드

연산 결과를 새 CSV 파일로 저장할 것이므로
CSV 출력에 필요한 크레이트를 추가로 불러와야 한다.

병렬 처리를 위해 구간을 나누는 함수는 그대로 재사용한다.

맨 앞이 아니라면 이전 청크의 마지막 값을 탐색해야
병렬 처리를 해도 연속적인 결과를 확인할 수 있다.

wtr 를 사용하는 영역을 블록 스코프로 묶었는데
그렇게 하지 않으면 buffer 를 반환할 때 소유권 문제가 생긴다.
블록 스코프로 묶지 않고 drop(wtr); 하는 것도 방법이다.

src/lib.rs

use pyo3::prelude::*;
use memmap2::Mmap;
use std::fs::File;
use std::io::{BufWriter, Write}; // NEW!
use rayon::prelude::*;
use csv::{ReaderBuilder, WriterBuilder, ByteRecord}; // MODIFIED!

/// 청크가 줄 중간에서 잘리지 않도록
/// 파일을 코어 수에 맞춰 최적의 경계로 분리
fn get_split_points(bytes: &[u8], num_chunks: usize) -> Vec<usize> {
    let mut points = vec![0];
    let base_size = bytes.len() / num_chunks;

    for i in 1..num_chunks {
        let target = i * base_size;
        let pos = bytes[target..].iter()
            .position(|&b| b == b'\n')
            .map(|p| target + p + 1)
            .unwrap_or(bytes.len());

        if pos < bytes.len() && !points.contains(&pos) {
            points.push(pos);
        }
    }
  
    points.push(bytes.len());

    points
}

#[pyfunction]
fn process_large_csv(file_path: String) -> PyResult<(f64, usize)> {
    let file = File::open(file_path)?;
    let mmap = unsafe {
        Mmap::map(&file)?
    };
    let bytes = &mmap[..];

    let split_points = get_split_points(bytes, rayon::current_num_threads());

    let result = split_points.windows(2) // (start, end) 쌍으로 사용
        .enumerate() // 인덱스와 요소를 튜플로 받아 사용: (index, (start, end))
        .collect::<Vec<_>>()
        .into_par_iter()
        .map(|(i, window)| {
            let (start, end) = (window[0], window[1]);
            let mut rdr = ReaderBuilder::new()
                .has_headers(i == 0)
                .from_reader(&bytes[start..end]);

            let mut record = ByteRecord::new();
            let (mut l_sum, mut l_count) = (0.0, 0);

            while rdr.read_byte_record(&mut record).unwrap_or(false) {
                // 0:id, 1:value, 2:category
                if record.get(2) == Some(b"A") {
                    if let Some(val_bytes) = record.get(1) {
                        if let Ok(s) = std::str::from_utf8(val_bytes) {
                            if let Ok(val) = s.parse::<f64>() {
                                l_sum += val;
                                l_count += 1;
                            }
                        }
                    }
                }
            }

            (l_sum, l_count)
        })
        .reduce(
            || (0.0, 0),
            |(s1, c1), (s2, c2)| (s1 + s2, c1 + c2)
        );

    Ok(result)
}

// 이상 기존 코드

#[pyfunction]
fn compare_and_write(input_path: String, output_path: String) -> PyResult<usize> {
    let file = File::open(&input_path)?;
    let mmap = unsafe {
        Mmap::map(&file)?
    };
    let bytes = &mmap[..];

    let split_points = get_split_points(bytes, rayon::current_num_threads());

    let processed_chunks: Vec<Vec<u8>> = split_points.windows(2) // (start, end) 쌍으로 사용
        .enumerate() // 인덱스와 요소를 튜플로 받아 사용: (index, (start, end))
        .collect::<Vec<_>>()
        .into_par_iter()
        .map(|(i, window)| {
            let (start, end) = (window[0], window[1]);
            let mut buffer = Vec::new();
            {
                let mut wtr = WriterBuilder::new()
                    .has_headers(false)
                    .from_writer(&mut buffer);

                let mut prev_value: Option<f64> = None;

                // Look-back: 이전 청크의 마지막 값 가져오기
                if i > 0 {
                    let search_start = if start > 1024 { start - 1024 } else { 0 };
                    let mut rdr_prev = ReaderBuilder::new()
                        .has_headers(false)
                        .from_reader(&bytes[search_start..start]); // 이전 청크

                    let mut last_rec = ByteRecord::new();
                    let mut last_val = None;

                    // 마지막 값 찾기
                    while rdr_prev.read_byte_record(&mut last_rec).unwrap_or(false) {
                        // 0:id, 1:value, 2:category
                        if let Ok(s) = std::str::from_utf8(last_rec.get(1).unwrap_or(b"0")) {
                            last_val = s.parse::<f64>().ok();
                        }
                    }
                    prev_value = last_val;
                }

                // 본격적인 작업 시작
          
                let mut rdr = ReaderBuilder::new()
                    .has_headers(false)
                    .from_reader(&bytes[start..end]);
                let mut record = ByteRecord::new();

                while rdr.read_byte_record(&mut record).unwrap_or(false) {
                    // 0:id, 1:value, 2:category
                    let current_value = std::str::from_utf8(record.get(1).unwrap_or(b"0"))
                        .unwrap_or("0").parse::<f64>().unwrap_or(0.0);

                    let is_higher = match prev_value {
                        Some(v) if current_value > v => "1",
                        _ => "0",
                    };

                    record.push_field(is_higher.as_bytes());
                    wtr.write_byte_record(&record).unwrap();

                    prev_value = Some(current_value);
                }

                wtr.flush().unwrap();
            }

            buffer
        })
        .collect();

    let out_file = File::create(output_path)?;
    let mut buffered_writer = BufWriter::with_capacity(1024 * 1024, out_file);
    buffered_writer.write_all(b"id,value,category,is_higher\n")?;
    for chunk in processed_chunks {
        buffered_writer.write_all(&chunk)?;
    }

    Ok(mmap.len())
}

#[pymodule]
fn rust_engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(process_large_csv, m)?)?;
    m.add_function(wrap_pyfunction!(compare_and_write, m)?)?; // NEW!

    Ok(())
}

Python 코드

app/main.py

import pandas as pd
import numpy as np
from fastapi import FastAPI
from fastapi.responses import ORJSONResponse
import rust_engine
import time
import os

class UTF8ORJSONResponse(ORJSONResponse):
    media_type = "application/json; charset=utf-8"

app = FastAPI(default_response_class=UTF8ORJSONResponse)

@app.get("/")
def read_root():
    return {
        "status": "200",
        "info": "서버 가동 중입니다."
    }

@app.get("/setup-csv/{rows}")
def setup_csv(rows: int):
    file_path = "large_data.csv"

    df = pd.DataFrame({
        "id": range(rows),
        "value": np.random.rand(rows),
        "category": np.random.choice(["A", "B", "C", "D"], rows)
    })

    df.to_csv(file_path, index=False)

    return {
        "message": f"{rows:,}줄 CSV 파일 생성 완료"
    }

@app.get("/process-csv")
def process_csv():
    file_path = "large_data.csv"

    if not os.path.exists(file_path):
        return {
            "error": "파일이 없습니다. /setup-csv 라우트를 먼저 호출해주세요."
        }

    start_rust = time.perf_counter()
    rust_sum, rust_count = rust_engine.process_large_csv(file_path)
    end_rust = time.perf_counter()

    rust_duration = end_rust - start_rust

    start_pd = time.perf_counter()
    df = pd.read_csv(file_path)
    pd_sum = df[df["category"] == "A"]["value"].sum()
    end_pd = time.perf_counter()

    pd_duration = end_pd - start_pd

    return {
        "total_rows": rust_count,
        "rust_time": f"{rust_duration:.4f} sec",
        "pandas_time": f"{pd_duration:.4f} sec",
        "speed_up": f"{pd_duration / rust_duration:.1f}x",
        "results": {
            "rust_sum": rust_sum,
            "pandas_sum": pd_sum
        }
    }

# 이상 기존 코드

@app.get("/compare-and-write")
def compare_and_write():
    input_file = "large_data.csv"
    output_file = "processed_data.csv"

    if not os.path.exists(input_file):
        return {
            "error": "파일이 없습니다. /setup-csv 라우트를 먼저 호출해주세요."
        }

    start = time.perf_counter()
    processed_data = rust_engine.compare_and_write(input_file, output_file)
    end = time.perf_counter()

    duration = end - start

    return {
        "output_file": output_file,
        "rust_pure_time": f"{duration:.4f} sec",
        "file_size": f"{os.path.getsize(output_file) / (1024 * 1024):.2f} MB"
    }

빌드 및 실행

Maturin 라이브러리를 통해 Rust 코드를 Python에서 호출 가능한 형태로 컴파일한다.
병렬 처리가 포함된 코드는 성능 최적화를 위해 --release 를 붙여 컴파일한다.
컴파일 후 pip list 명령어를 사용해 보면 Cargo.toml 파일에 작성한 패키지 이름을 확인할 수 있다.

uvicorn 라이브러리를 통해 FastAPI를 실행한다.

~/workspace/csv-processor$ maturin develop --release
~/workspace/csv-processor$ uvicorn app.main:app --reload

curl 명령어 또는 브라우저를 통해 다음과 같은 테스트를 해볼 수 있다.

  • http://127.0.0.1:8000/setup-csv/50000000
  • http://127.0.0.1:8000/compare-and-write
~$ column -s, -t < workspace/csv-processor/large_data.csv | head -n 20
id        value                   category
0         0.40051801434356893     B
1         0.2546372973392159      B
2         0.8411876220186603      A
3         0.22976716598349933     C
4         0.2205761919396625      C
5         0.8969404327142123      C
6         0.24805162492147237     C
7         0.6157544951301185      B
8         0.37118755341262744     D
9         0.20077848283153787     C
10        0.7006307580777321      A
11        0.6289105208867691      C
12        0.5645651105257122      A
13        0.07251456319471494     C
14        0.4878721859338948      B
15        0.41749333189497495     B
16        0.793024114133093       B
17        0.684794684186633       C
18        0.7427233817197105      A
~$ curl -i http://127.0.0.1:8000/compare-and-write           
HTTP/1.1 200 OK
date: Thu, 02 Apr 2026 01:41:21 GMT
server: uvicorn
content-length: 91
content-type: application/json; charset=utf-8

{"output_file":"processed_data.csv","rust_pure_time":"0.7453 sec","file_size":"1528.15 MB"}%
~$ column -s, -t < workspace/csv-processor//processed_data.csv | head -n 20
id        value                   category  is_higher
0         0.40051801434356893     B         0
1         0.2546372973392159      B         0
2         0.8411876220186603      A         1
3         0.22976716598349933     C         0
4         0.2205761919396625      C         0
5         0.8969404327142123      C         1
6         0.24805162492147237     C         0
7         0.6157544951301185      B         1
8         0.37118755341262744     D         0
9         0.20077848283153787     C         0
10        0.7006307580777321      A         1
11        0.6289105208867691      C         0
12        0.5645651105257122      A         0
13        0.07251456319471494     C         0
14        0.4878721859338948      B         1
15        0.41749333189497495     B         0
16        0.793024114133093       B         1
17        0.684794684186633       C         0
18        0.7427233817197105      A         1
profile
Peter J Online Space - since July 2020 | 아무데서나 채용해줬으면 좋겠다

0개의 댓글