
지난 예제에는 읽기 전용 데이터를 공유하여
데이터에 동시에 접근해도 문제 없는 상황에서의 병렬 처리를 살펴 보았다.
이번에는 하나의 공유 자원에 동시에 데이터를 작성하려고 할 때
순서가 꼬이지 않게 하는 방법을 살펴보겠다.
~/workspace$ mkdir parallel-sync && cd parallel-sync ~/workspace/parallel-sync$ python3 -m venv venv ~/workspace/parallel-sync$ source venv/bin/activate ~/workspace/parallel-sync$ pip install maturin fastapi uvicorn ~/workspace/parallel-sync$ maturin init ~/workspace/parallel-sync$ # 선택지 중 기본값인 PyO3 선택 ~/workspace/parallel-sync$ # Cargo.toml과 src/lib.rs가 자동 생성된다 ~/workspace/parallel-sync$ # Python 코드는 직접 생성해 주어야 한다 ~/workspace/parallel-sync$ mkdir app && touch app/main.py ~/workspace/parallel-sync$ tree -I venv . ├── app │ └── main.py ├── Cargo.toml ├── pyproject.toml └── src └── lib.rs
Cargo.toml 파일을 열어 라이브러리 이름을 수정해 주겠다.
병렬 처리를 위한 Rayon 크레이트도 추가해 준다.
Cargo.toml[package] name = "parallel-sync" version = "0.1.0" edition = "2024" [lib] name = "rust_engine" crate-type = ["cdylib"] [dependencies] pyo3 = "0.28.0" rayon = "1.11"
Rayon 크레이트를 사용하기 위해 use 를 통해 그것을 불러와야 한다.
그리고 여기서 중요한 건 Arc 와 Mutex 다.
Arc (Atomic Reference Counter)Mutex (Mutual Exclusion)
src/lib.rsuse pyo3::prelude::*; use rayon::prelude::*; use std::sync::{Arc, Mutex}; #[pyfunction] fn compute_with_shared_log(py: Python<'_>, data: Vec<i32>) -> PyResult<(i64, Vec<String>)> { let shared_logs = Arc::new(Mutex::new(Vec::new())); let total_sum: i64 = py.detach(|| { data.par_iter().map(|&x| { let val = x as i64; if val % 1_000_000 == 0 { let logs = Arc::clone(&shared_logs); let mut logs_lock = logs.lock().unwrap(); logs_lock.push(format!("Thread processing value: {}", val)); } val.pow(2) }).sum() }); let final_logs = shared_logs.lock().unwrap().clone(); Ok((total_sum, final_logs)) } #[pymodule] fn rust_engine(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(compute_with_shared_log, m)?)?; Ok(()) }
이번에도 정수 값을 담는 List 를 속성으로 가진 DataInput 클래스를 만들어 사용할 것이다.
이것은 데이터 검증 라이브러리 pydantic 의 BaseModel 클래스를 상속받아 생성한다.
병렬 처리에 대한 연산은 Rust가 알아서 다 해주기 때문에
Python에서는 추가적으로 해줘야 할 건 없다.
app/main.pyfrom fastapi import FastAPI from pydantic import BaseModel import rust_engine app = FastAPI() class DataInput(BaseModel): numbers: list[int] @app.post("/shared-log-test") def shared_log_test(data: DataInput): result, logs = rust_engine.compute_with_shared_log(data.numbers) return { "result": result, "worker_logs": logs, "log_count": len(logs) }
Maturin 라이브러리를 통해 Rust 코드를 Python에서 호출 가능한 형태로 컴파일한다.
병렬 처리가 포함된 코드는 성능 최적화를 위해 --release 를 붙여 컴파일한다.
컴파일 후 pip list 명령어를 사용해 보면 Cargo.toml 파일에 작성한 패키지 이름을 확인할 수 있다.
uvicorn 라이브러리를 통해 FastAPI를 실행한다.
~/workspace/parallel-sync$ maturin develop --release ~/workspace/parallel-sync$ uvicorn app.main:app --reload
curl 명령어 또는 브라우저를 통해 테스트를 해볼 수 있다.
이 예제의 경우 GET 메서드가 아닌 POST 메서드로 통신하므로
브라우저를 통한 테스트 시 Swagger UI를 사용해야 한다.
FastAPI의 경우 다음과 같은 주소로 Swagger UI가 내장되어 있다.
http://127.0.0.1:8000/docs브라우저상에서는 대용량 데이터를 전달하기 어려우므로
테스트를 위해 클라이언트 파일을 사용한다.
sample-client.pyimport requests import time # 1. 테스트 데이터 생성 (0부터 9,999,999까지의 리스트) print("데이터 생성 중... (10,000,000개)") large_data = list(range(10_000_000)) # 2. 서버 주소 및 데이터 설정 url = "http://127.0.0.1:8000/shared-log-test" payload = {"numbers": large_data} # 3. 시간 측정 시작 print("서버에 요청을 보냅니다...") start_time = time.time() # 4. POST 요청 전송 response = requests.post(url, json=payload) # 5. 결과 출력 end_time = time.time() if response.status_code == 200: print(f"✅ 성공! 결과값: {response.json()['result']}") print("📄로그:"); for log in response.json()['worker_logs']: print(f"> {log}") print(f"⏱️ 총 소요 시간: {end_time - start_time:.4f} 초") else: print(f"❌ 실패: {response.status_code}, {response.text}")