
우선 Rust 프로젝트를 생성한다.
~/workspace/community-board-log$ cargo new log-engine --bin ~/workspace/community-board-log$ tree . ├── database │ └── scripts │ └── init.sql ├── docker-compose.yml ├── log-engine │ ├── Cargo.toml │ └── src │ └── main.rs └── proto └── log.proto 6 directories, 5 files
의존성 파일을 작성한다.
log-engine/Cargo.toml[package] name = "log-engine" version = "0.1.0" edition = "2024" [dependencies] # gRPC 및 비동기 런타임 tonic = "0.14" prost = "0.14" tonic-prost = "0.14" tokio = { version = "1.52", features = ["macros", "rt-multi-thread", "signal", "sync"] } # 데이터베이스 (컴파일 타임 쿼리 검증) sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "postgres", "chrono", "ipnetwork"] } # 설정 및 로깅 dotenvy = "0.15" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } # 시간 및 네트워크 타입 chrono = "0.4" [build-dependencies] tonic-build = "0.14" tonic-prost-build = "0.14"
proto/log.proto 파일에 정의한 gRPC 서비스 명세를
Rust 코드로 자동 변환하는 코드를 작성한다.
log-engine/build.rsfn main() -> Result<(), Box<dyn std::error::Error>> { // proto 파일이 변경될 때만 다시 빌드하도록 Cargo에 지시합니다. println!("cargo:rerun-if-changed=proto/log.proto"); tonic_prost_build::compile_protos("../proto/log.proto")?; Ok(()) }
수천만 건의 데이터를 다룰 때 API 요청마다 매번 DB에 INSERT 쿼리를 날리는 것은
커넥션 풀 고갈과 I/O 병목을 유발할 수 있다.
따라서 gRPC 서비스는 요청을 메모리 큐에 넣고 즉시 응답하며
백그라운드 워커가 큐의 데이터를 모아 주기적으로 DB에 한 번에 밀어 넣는
비동기 배치 아키텍처를 구현할 것이다.
~/workspace/community-board-log$ cd log-engine ~/workspace/community-board-log/log-engine$ touch src/service.rs ~/workspace/community-board-log/log-engine$ touch src/worker.rs ~/workspace/community-board-log/log-engine$ tree . ├── Cargo.toml ├── build.rs └── src ├── main.rs ├── service.rs └── worker.rs 2 directories, 5 files
service.rsgRPC 서비스 인터페이스에서는
Python에서 들어오는 요청을 검증하고
큐에 밀어 넣는 역할만 수행하여 응답 지연을 최소화한다.
log-engine/src/service.rsuse crate::log_proto::{ActionLogRequest, ActionLogResponse, log_service_server::LogService}; use tokio::sync::mpsc; use tonic::{Request, Response, Status}; /// 채널 송신부를 소유하는 서비스 구현체 구조체 pub struct LogServiceImpl { tx: mpsc::Sender<ActionLogRequest>, } impl LogServiceImpl { pub fn new(tx: mpsc::Sender<ActionLogRequest>) -> Self { Self { tx } } } #[tonic::async_trait] impl LogService for LogServiceImpl { /// 클라이언트로부터 단일 로그 기록을 받는다. async fn record_log( &self, request: Request<ActionLogRequest>, ) -> Result<Response<ActionLogResponse>, Status> { let req = request.into_inner(); if let Err(_) = self.tx.try_send(req) { tracing::error!("로그 큐가 가득 찼습니다. 요청을 거부합니다."); return Err(Status::resource_exhausted("로그 큐가 가득 찼습니다.")); } Ok(Response::new(ActionLogResponse { success: true })) } }
worker.rs백그라운드 워커에서는
일정 개수가 모이거나 일정 시간이 지나면
PostgreSQL의 UNNEST 를 사용하여 데이터를 한 번에 밀어 넣는다.
log-engine/src/worker.rsuse crate::log_proto::ActionLogRequest; use sqlx::PgPool; use std::time::Duration; use tokio::sync::mpsc; /// 채널에서 로그를 읽어와 버퍼링한 뒤 /// 조건에 맞으면 DB에 Bulk Insert하는 함수 호출 pub async fn run_worker( mut rx: mpsc::Receiver<ActionLogRequest>, pool: PgPool, ) { let mut buffer = Vec::with_capacity(1000); let mut interval = tokio::time::interval(Duration::from_secs(1)); loop { tokio::select! { // 1초마다 버퍼가 비어 있지 않다면 보내기 _ = interval.tick() => { if !buffer.is_empty() { flush_to_db(&pool, &mut buffer).await; } } // 새 로그 수신 msg = rx.recv() => { match msg { Some(log) => { buffer.push(log); // 버퍼가 가득 차면 즉시 보내기 if buffer.len() >= 1000 { flush_to_db(&pool, &mut buffer).await; } } None => { tracing::info!("채널 닫힘. 남은 로그 전송 중..."); if !buffer.is_empty() { flush_to_db(&pool, &mut buffer).await; } break; } } } } } } /// 버퍼의 데이터를 DB에 배열 형태로 Bulk Insert 수행 async fn flush_to_db( pool: &PgPool, buffer: &mut Vec<ActionLogRequest> ) { let len = buffer.len(); // 컬럼별로 데이터를 모아 한 번의 쿼리로 전달 let user_ids: Vec<String> = buffer.iter().map(|l| l.user_id.clone()).collect(); let action_types: Vec<String> = buffer.iter().map(|l| l.action_type.clone()).collect(); let target_ids: Vec<Option<String>> = buffer.iter().map(|l| l.target_id.clone()).collect(); let ip_addresses: Vec<String> = buffer.iter().map(|l| l.ip_address.clone()).collect(); let query = " INSERT INTO action_logs (user_id, action_type, target_id, ip_address) SELECT * FROM UNNEST($1::text[], $2::text[], $3::text[], $4::inet[]) "; match sqlx::query(query) .bind(&user_ids) .bind(&action_types) .bind(&target_ids) .bind(&ip_addresses) .execute(pool) .await { Ok(_) => { tracing::info!("로그 {}개 성공적으로 삽입 완료", len); }, Err(e) => { tracing::error!("로그 삽입 실패: {}", e); // 실무에서는 실패한 로그를 Dead Letter Queue에 저장하지만 // 우리 실습에서는 생략한다. }, } buffer.clear(); }
main.rs시스템의 메인 스레드를 작성한다.
log-engine/src/main.rsuse sqlx::postgres::PgPoolOptions; use std::env; use std::net::SocketAddr; use tokio::sync::mpsc; use tonic::transport::Server; use tracing_subscriber::FmtSubscriber; mod service; mod worker; pub mod log_proto { tonic::include_proto!("log"); } use log_proto::log_service_server::LogServiceServer; use service::LogServiceImpl; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { // 환경변수를 사용하여 로그 초기화 dotenvy::from_path("../.env").ok(); let subscriber = FmtSubscriber::builder() .with_max_level(tracing::Level::INFO) .json() .finish(); tracing::subscriber::set_global_default(subscriber)?; tracing::info!("로그 엔진 서버 실행..."); // DB 커넥션 풀 let db_url = env::var("DATABASE_URL").expect("DATABASE_URL이 설정되어 있지 않습니다."); let pool = PgPoolOptions::new() .max_connections(10) .connect(&db_url) .await?; // 비동기 큐 생성 let (tx, rx) = mpsc::channel(10000); // 백그라운드 워커 스레드 let worker_handle = tokio::spawn(async move { worker::run_worker(rx, pool).await; }); // gRPC 서버 설정 빛 실행 let addr: SocketAddr = env::var("GRPC_HOST") .unwrap_or_else(|_| "192.127.0.1:50051".to_string()) .parse()?; let log_service = LogServiceImpl::new(tx); tracing::info!("gRPC 서버 가동 중: {}", addr); Server::builder() .add_service(LogServiceServer::new(log_service)) .serve_with_shutdown(addr, shutdown_signal()) .await?; tracing::info!("gRPC 서버 종료. 남은 로그를 기록할 때까지 대기..."); let _ = worker_handle.await; tracing::info!("모든 자원이 정리되었습니다. 서버를 안전하게 종료합니다."); Ok(()) } async fn shutdown_signal() { tokio::signal::ctrl_c() .await .expect("Ctrl+C 시그널 핸들러 설치 실패"); tracing::info!("종료 시그널 감지"); }
🤖 AI AGENT | 설계 포인트 요약
- 성능 최적화 (UNNEST): 여러 개의
INSERT INTO ... VALUES (...)를 만드는 것보다UNNEST를 사용하여 배열 매개변수를 테이블처럼 취급하는 방식이 PostgreSQL 엔진 입장에서 파싱 오버헤드가 적고 처리 속도가 가장 빠릅니다.- 안전한 종료 (Graceful Shutdown): 메인 스레드에
Ctrl+C가 들어오면 gRPC 서버가 즉시 내려갑니다. 이로 인해LogServiceImpl가 소멸하면서 내부의tx가Drop되고, 워커의rx.recv()가None을 반환하여 남아있는 버퍼를 안전하게 비우고(flush_to_db) 프로그램을 종료합니다. 로그 유실을 원천 차단하는 현대적이고 안정적인 방식입니다.
~/workspace/community-board-log$ docker compose up -d ~/workspace/community-board-log$ cd log-engine ~/workspace/community-board-log/log-engine$ cargo build ~/workspace/community-board-log/log-engine$ Compiling log-engine v0.1.0 (/Users/edenjint3927/workspace/community-board-log/log-engine) Finished `dev` profile [unoptimized + debuginfo] target(s) in 1.04s Running `target/debug/log-engine` {"timestamp":"2026-05-19T22:24:38.765910Z","level":"INFO","fields":{"message":"로그 엔진 서버 실행..."},"target":"log_engine"} {"timestamp":"2026-05-19T22:24:38.828272Z","level":"INFO","fields":{"message":"gRPC 서버 가동 중: 127.0.0.1:50051"},"target":"log_engine"}
다른 터미널을 열고
~/workspace/community-board-log$ grpcurl -plaintext \ -import-path ./proto -proto log.proto \ -d '{"user_id": "user_123", "action_type": "VIEW_POST", "target_id": "post_456", "ip_address": "192.168.0.1"}' \ 127.0.0.1:50051 \ log.LogService/RecordLog { "success": true }
그 때의 로그
{"timestamp":"2026-05-19T22:24:38.765910Z","level":"INFO","fields":{"message":"로그 엔진 서버 실행..."},"target":"log_engine"} {"timestamp":"2026-05-19T22:24:38.828272Z","level":"INFO","fields":{"message":"gRPC 서버 가동 중: 127.0.0.1:50051"},"target":"log_engine"} {"timestamp":"2026-05-19T22:37:41.840728Z","level":"INFO","fields":{"message":"로그 1개 성공적으로 삽입 완료"},"target":"log_engine::worker"}
🤖 AI AGENT | 같은 계층에 두지 않고 docker를 실행하고 들어오는 이유
log-engine/디렉토리는 온전히 Rust 소스 코드(src/,Cargo.toml,build.rs등)만을 관리하는 공간이어야 합니다.Rust 애플리케이션 입장에서 PostgreSQL 데이터베이스가 Docker로 돌아가든, AWS RDS에 있든, 로컬 환경에 직접 설치되어 있든 알 필요가 없습니다.
데이터베이스 초기화 스크립트(
init.sql)나 컨테이너 오케스트레이션 설정(docker-compose.yml)이 Rust 프로젝트 안에 섞여 있으면, "비즈니스 로직(코드)"과 "인프라(환경)"가 강하게 결합되어 응집도가 떨어집니다.