
이 실습은 이전 실습에서 이어서 진행한다.
따라서 작업공간 생성 및 구조 확인은 생략한다.
이전 실습에서는 Python의 Numpy 배열을 읽어서 사용하는 것만 다뤘다.
하지만 때로는 새로 생성하거나 수정하는 일도 있을 수 있다.
기존에 사용했던 PyReadonlyArray1 는
쓰기에 대한 고려를 하지 않아 빠르게 작업을 처리할 수 있지만
생성 및 수정 연산을 수행할 수 없다.
따라서 PyArray1 을 사용해야 한다.
원본 Numpy 배열을 복사하지 않고 곱셈 연산을 하는 코드를 추가한다.
&Bound 를 사용하여 Python 객체에 직접 접근할 수 있다.
컴파일러가 메모리 안전성을 보장할 수 없는
로우 레벨 작업을 할 때 사용되는 unsafe 블록 내부에서
Python 메모리 버퍼에 직접 가변 슬라이스로 접근한다.
호출 시점에 GIL(Global Interpreter Lock)이 보장되므로
Rust의 안전 가이드라인 안에서 Python 메모리를 직접 건드릴 수 있다.
as_slice_mut() 메서드를 사용하기 위해서는
PyArrayMethods 도 같이 불러와야 한다.
src/lib.rsuse pyo3::prelude::*; use numpy::{PyArray1, PyReadonlyArray1, PyArrayMethods}; // MODIFIED! use rayon::prelude::*; #[pyfunction] fn zero_copy_sum(array: PyReadonlyArray1<f64>) -> f64 { let slice = array.as_slice().expect("Numpy 슬라이스를 가져오는 데 실패했습니다."); slice.par_iter().sum() } // 이상 기존 코드 #[pyfunction] fn zero_copy_multiply(array: &Bound<'_, PyArray1<f64>>, factor: f64) { unsafe { let slice = array.as_slice_mut().expect("가변 슬라이스를 가져오는 데 실패했습니다."); slice.par_iter_mut().for_each(|x| *x *= factor); } } #[pymodule] fn rust_engine(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(zero_copy_sum, m)?)?; m.add_function(wrap_pyfunction!(zero_copy_multiply, m)?)?; // NEW! Ok(()) }
app/main.pyimport numpy as np from fastapi import FastAPI from fastapi.responses import ORJSONResponse import rust_engine import time class UTF8ORJSONResponse(ORJSONResponse): media_type = "application/json; charset=utf-8" app = FastAPI(default_response_class=UTF8ORJSONResponse) @app.get("/") def read_root(): return { "status": "200", "info": "서버 가동 중입니다." } @app.get("/compute/{size}") def compute(size: int): data = np.random.rand(size) start = time.perf_counter() result = rust_engine.zero_copy_sum(data) end = time.perf_counter() rust_duration = end - start return { "size": len(data), "result": result, "rust_pure_time": f"Rust 연산에 걸린 시간: {rust_duration:.4f} sec" } # 이상 기존 코드 @app.get("/multiply/{size}/{factor}") def multiply_inplace(size: int, factor: float): data = np.random.rand(size) start = time.perf_counter() result = rust_engine.zero_copy_multiply(data, factor) end = time.perf_counter() rust_duration = end - start return { "size": len(data), "factor": factor, "result_sample": data[:3].tolist(), "rust_pure_time": f"Rust 연산에 걸린 시간: {rust_duration:.4f} sec" }
Maturin 라이브러리를 통해 Rust 코드를 Python에서 호출 가능한 형태로 컴파일한다.
병렬 처리가 포함된 코드는 성능 최적화를 위해 --release 를 붙여 컴파일한다.
컴파일 후 pip list 명령어를 사용해 보면 Cargo.toml 파일에 작성한 패키지 이름을 확인할 수 있다.
uvicorn 라이브러리를 통해 FastAPI를 실행한다.
~/workspace/zero-copy$ maturin develop --release ~/workspace/zero-copy$ uvicorn app.main:app --reload
curl 명령어 또는 브라우저를 통해 다음과 같은 테스트를 해볼 수 있다.
http://127.0.0.1:8000/multiply/1000000000/0.5~$ curl -i http://127.0.0.1:8000/compute/1000000000 HTTP/1.1 200 OK date: Fri, 27 Mar 2026 00:54:42 GMT server: uvicorn content-length: 107 content-type: application/json; charset=utf-8 {"size":1000000000,"result":499981729.21686363,"rust_pure_time":"Rust 연산에 걸린 시간: 0.0686 sec"}%~$ curl -i http://127.0.0.1:8000/multiply/1000000000/0.5 HTTP/1.1 200 OK date: Fri, 27 Mar 2026 00:55:05 GMT server: uvicorn content-length: 169 content-type: application/json; charset=utf-8 {"size":1000000000,"factor":0.5,"result_sample":[0.48982013585054657,0.42512184525374297,0.2544803303002875],"rust_pure_time":"Rust 연산에 걸린 시간: 0.0833 sec"}%값을 수정할 때는 CPU 캐시와 실제 RAM 사이의 데이터를 동기화하는 오버헤드가 발생하여
같은 크기의 데이터에 대해 읽기 연산보다 조금 더 오래 걸리는 걸 확인할 수 있다.