
단순히 개수를 세거나 총합을 구하는 건 비교적 쉽지만
이전 행이나 다음 행과 비교하거나
몇 개 행 단위로 묶어서 연산을 수행할 경우
병렬 처리를 위해 구간을 나누면서 문제가 발생할 수 있다.
몇 개 행 단위로 묶어서 연산을 하는 것은
구간을 나눌 때 기준 행 수의 배수로 나누면 되니
비교적 수월하게 구현할 수 있지만
이전 행과의 비교는 어떻게든 다른 구간과의 비교가 필요하다.
이런 상황에서 어떻게 처리할 수 있는지 알아보기 위해
현재 행의 value 값이 이전 행의 value 값보다 큰지 비교하여
그 결과를 저장하는 예제를 살펴보도록 하겠다.
이번에는 결과를 전송하지 않고
비교 결과 항목이 추가된 새 CSV 파일을 생성해 볼 것이다.
이 실습은 이전 실습에서 이어서 진행한다.
따라서 작업공간 생성 및 구조 확인은 생략한다.
연산 결과를 새 CSV 파일로 저장할 것이므로
CSV 출력에 필요한 크레이트를 추가로 불러와야 한다.
병렬 처리를 위해 구간을 나누는 함수는 그대로 재사용한다.
맨 앞이 아니라면 이전 청크의 마지막 값을 탐색해야
병렬 처리를 해도 연속적인 결과를 확인할 수 있다.
wtr 를 사용하는 영역을 블록 스코프로 묶었는데
그렇게 하지 않으면 buffer 를 반환할 때 소유권 문제가 생긴다.
블록 스코프로 묶지 않고 drop(wtr); 하는 것도 방법이다.
src/lib.rsuse pyo3::prelude::*; use memmap2::Mmap; use std::fs::File; use std::io::{BufWriter, Write}; // NEW! use rayon::prelude::*; use csv::{ReaderBuilder, WriterBuilder, ByteRecord}; // MODIFIED! /// 청크가 줄 중간에서 잘리지 않도록 /// 파일을 코어 수에 맞춰 최적의 경계로 분리 fn get_split_points(bytes: &[u8], num_chunks: usize) -> Vec<usize> { let mut points = vec![0]; let base_size = bytes.len() / num_chunks; for i in 1..num_chunks { let target = i * base_size; let pos = bytes[target..].iter() .position(|&b| b == b'\n') .map(|p| target + p + 1) .unwrap_or(bytes.len()); if pos < bytes.len() && !points.contains(&pos) { points.push(pos); } } points.push(bytes.len()); points } #[pyfunction] fn process_large_csv(file_path: String) -> PyResult<(f64, usize)> { let file = File::open(file_path)?; let mmap = unsafe { Mmap::map(&file)? }; let bytes = &mmap[..]; let split_points = get_split_points(bytes, rayon::current_num_threads()); let result = split_points.windows(2) // (start, end) 쌍으로 사용 .enumerate() // 인덱스와 요소를 튜플로 받아 사용: (index, (start, end)) .collect::<Vec<_>>() .into_par_iter() .map(|(i, window)| { let (start, end) = (window[0], window[1]); let mut rdr = ReaderBuilder::new() .has_headers(i == 0) .from_reader(&bytes[start..end]); let mut record = ByteRecord::new(); let (mut l_sum, mut l_count) = (0.0, 0); while rdr.read_byte_record(&mut record).unwrap_or(false) { // 0:id, 1:value, 2:category if record.get(2) == Some(b"A") { if let Some(val_bytes) = record.get(1) { if let Ok(s) = std::str::from_utf8(val_bytes) { if let Ok(val) = s.parse::<f64>() { l_sum += val; l_count += 1; } } } } } (l_sum, l_count) }) .reduce( || (0.0, 0), |(s1, c1), (s2, c2)| (s1 + s2, c1 + c2) ); Ok(result) } // 이상 기존 코드 #[pyfunction] fn compare_and_write(input_path: String, output_path: String) -> PyResult<usize> { let file = File::open(&input_path)?; let mmap = unsafe { Mmap::map(&file)? }; let bytes = &mmap[..]; let split_points = get_split_points(bytes, rayon::current_num_threads()); let processed_chunks: Vec<Vec<u8>> = split_points.windows(2) // (start, end) 쌍으로 사용 .enumerate() // 인덱스와 요소를 튜플로 받아 사용: (index, (start, end)) .collect::<Vec<_>>() .into_par_iter() .map(|(i, window)| { let (start, end) = (window[0], window[1]); let mut buffer = Vec::new(); { let mut wtr = WriterBuilder::new() .has_headers(false) .from_writer(&mut buffer); let mut prev_value: Option<f64> = None; // Look-back: 이전 청크의 마지막 값 가져오기 if i > 0 { let search_start = if start > 1024 { start - 1024 } else { 0 }; let mut rdr_prev = ReaderBuilder::new() .has_headers(false) .from_reader(&bytes[search_start..start]); // 이전 청크 let mut last_rec = ByteRecord::new(); let mut last_val = None; // 마지막 값 찾기 while rdr_prev.read_byte_record(&mut last_rec).unwrap_or(false) { // 0:id, 1:value, 2:category if let Ok(s) = std::str::from_utf8(last_rec.get(1).unwrap_or(b"0")) { last_val = s.parse::<f64>().ok(); } } prev_value = last_val; } // 본격적인 작업 시작 let mut rdr = ReaderBuilder::new() .has_headers(false) .from_reader(&bytes[start..end]); let mut record = ByteRecord::new(); while rdr.read_byte_record(&mut record).unwrap_or(false) { // 0:id, 1:value, 2:category let current_value = std::str::from_utf8(record.get(1).unwrap_or(b"0")) .unwrap_or("0").parse::<f64>().unwrap_or(0.0); let is_higher = match prev_value { Some(v) if current_value > v => "1", _ => "0", }; record.push_field(is_higher.as_bytes()); wtr.write_byte_record(&record).unwrap(); prev_value = Some(current_value); } wtr.flush().unwrap(); } buffer }) .collect(); let out_file = File::create(output_path)?; let mut buffered_writer = BufWriter::with_capacity(1024 * 1024, out_file); buffered_writer.write_all(b"id,value,category,is_higher\n")?; for chunk in processed_chunks { buffered_writer.write_all(&chunk)?; } Ok(mmap.len()) } #[pymodule] fn rust_engine(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(process_large_csv, m)?)?; m.add_function(wrap_pyfunction!(compare_and_write, m)?)?; // NEW! Ok(()) }
app/main.pyimport pandas as pd import numpy as np from fastapi import FastAPI from fastapi.responses import ORJSONResponse import rust_engine import time import os class UTF8ORJSONResponse(ORJSONResponse): media_type = "application/json; charset=utf-8" app = FastAPI(default_response_class=UTF8ORJSONResponse) @app.get("/") def read_root(): return { "status": "200", "info": "서버 가동 중입니다." } @app.get("/setup-csv/{rows}") def setup_csv(rows: int): file_path = "large_data.csv" df = pd.DataFrame({ "id": range(rows), "value": np.random.rand(rows), "category": np.random.choice(["A", "B", "C", "D"], rows) }) df.to_csv(file_path, index=False) return { "message": f"{rows:,}줄 CSV 파일 생성 완료" } @app.get("/process-csv") def process_csv(): file_path = "large_data.csv" if not os.path.exists(file_path): return { "error": "파일이 없습니다. /setup-csv 라우트를 먼저 호출해주세요." } start_rust = time.perf_counter() rust_sum, rust_count = rust_engine.process_large_csv(file_path) end_rust = time.perf_counter() rust_duration = end_rust - start_rust start_pd = time.perf_counter() df = pd.read_csv(file_path) pd_sum = df[df["category"] == "A"]["value"].sum() end_pd = time.perf_counter() pd_duration = end_pd - start_pd return { "total_rows": rust_count, "rust_time": f"{rust_duration:.4f} sec", "pandas_time": f"{pd_duration:.4f} sec", "speed_up": f"{pd_duration / rust_duration:.1f}x", "results": { "rust_sum": rust_sum, "pandas_sum": pd_sum } } # 이상 기존 코드 @app.get("/compare-and-write") def compare_and_write(): input_file = "large_data.csv" output_file = "processed_data.csv" if not os.path.exists(input_file): return { "error": "파일이 없습니다. /setup-csv 라우트를 먼저 호출해주세요." } start = time.perf_counter() processed_data = rust_engine.compare_and_write(input_file, output_file) end = time.perf_counter() duration = end - start return { "output_file": output_file, "rust_pure_time": f"{duration:.4f} sec", "file_size": f"{os.path.getsize(output_file) / (1024 * 1024):.2f} MB" }
Maturin 라이브러리를 통해 Rust 코드를 Python에서 호출 가능한 형태로 컴파일한다.
병렬 처리가 포함된 코드는 성능 최적화를 위해 --release 를 붙여 컴파일한다.
컴파일 후 pip list 명령어를 사용해 보면 Cargo.toml 파일에 작성한 패키지 이름을 확인할 수 있다.
uvicorn 라이브러리를 통해 FastAPI를 실행한다.
~/workspace/csv-processor$ maturin develop --release ~/workspace/csv-processor$ uvicorn app.main:app --reload
curl 명령어 또는 브라우저를 통해 다음과 같은 테스트를 해볼 수 있다.
http://127.0.0.1:8000/setup-csv/50000000http://127.0.0.1:8000/compare-and-write~$ column -s, -t < workspace/csv-processor/large_data.csv | head -n 20 id value category 0 0.40051801434356893 B 1 0.2546372973392159 B 2 0.8411876220186603 A 3 0.22976716598349933 C 4 0.2205761919396625 C 5 0.8969404327142123 C 6 0.24805162492147237 C 7 0.6157544951301185 B 8 0.37118755341262744 D 9 0.20077848283153787 C 10 0.7006307580777321 A 11 0.6289105208867691 C 12 0.5645651105257122 A 13 0.07251456319471494 C 14 0.4878721859338948 B 15 0.41749333189497495 B 16 0.793024114133093 B 17 0.684794684186633 C 18 0.7427233817197105 A~$ curl -i http://127.0.0.1:8000/compare-and-write HTTP/1.1 200 OK date: Thu, 02 Apr 2026 01:41:21 GMT server: uvicorn content-length: 91 content-type: application/json; charset=utf-8 {"output_file":"processed_data.csv","rust_pure_time":"0.7453 sec","file_size":"1528.15 MB"}%~$ column -s, -t < workspace/csv-processor//processed_data.csv | head -n 20 id value category is_higher 0 0.40051801434356893 B 0 1 0.2546372973392159 B 0 2 0.8411876220186603 A 1 3 0.22976716598349933 C 0 4 0.2205761919396625 C 0 5 0.8969404327142123 C 1 6 0.24805162492147237 C 0 7 0.6157544951301185 B 1 8 0.37118755341262744 D 0 9 0.20077848283153787 C 0 10 0.7006307580777321 A 1 11 0.6289105208867691 C 0 12 0.5645651105257122 A 0 13 0.07251456319471494 C 0 14 0.4878721859338948 B 1 15 0.41749333189497495 B 0 16 0.793024114133093 B 1 17 0.684794684186633 C 0 18 0.7427233817197105 A 1