날짜별 정보를 담은 데이터베이스 (2) Rust gRPC 엔진

Pt J·약 4시간 전
post-thumbnail

날짜별 정보를 담은 데이터베이스 (2) Rust gRPC 엔진

설정 파일

우선 Rust 프로젝트를 생성한다.

~/workspace/community-board-log$ cargo new log-engine --bin
~/workspace/community-board-log$ tree
.
├── database
│   └── scripts
│       └── init.sql
├── docker-compose.yml
├── log-engine
│   ├── Cargo.toml
│   └── src
│       └── main.rs
└── proto
    └── log.proto

6 directories, 5 files

의존성 파일을 작성한다.

log-engine/Cargo.toml

[package]
name = "log-engine"
version = "0.1.0"
edition = "2024"

[dependencies]
# gRPC 및 비동기 런타임
tonic = "0.14"
prost = "0.14"
tonic-prost = "0.14"
tokio = { version = "1.52", features = ["macros", "rt-multi-thread", "signal", "sync"] }

# 데이터베이스 (컴파일 타임 쿼리 검증)
sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "postgres", "chrono", "ipnetwork"] }

# 설정 및 로깅
dotenvy = "0.15"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }

# 시간 및 네트워크 타입
chrono = "0.4"

[build-dependencies]
tonic-build = "0.14"
tonic-prost-build = "0.14"

proto/log.proto 파일에 정의한 gRPC 서비스 명세를
Rust 코드로 자동 변환하는 코드를 작성한다.

log-engine/build.rs

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // proto 파일이 변경될 때만 다시 빌드하도록 Cargo에 지시합니다.
    println!("cargo:rerun-if-changed=proto/log.proto");
   
    tonic_prost_build::compile_protos("../proto/log.proto")?;
    Ok(())
}

코드 작성

수천만 건의 데이터를 다룰 때 API 요청마다 매번 DB에 INSERT 쿼리를 날리는 것은
커넥션 풀 고갈과 I/O 병목을 유발할 수 있다.
따라서 gRPC 서비스는 요청을 메모리 큐에 넣고 즉시 응답하며
백그라운드 워커가 큐의 데이터를 모아 주기적으로 DB에 한 번에 밀어 넣는
비동기 배치 아키텍처를 구현할 것이다.

~/workspace/community-board-log$ cd log-engine
~/workspace/community-board-log/log-engine$ touch src/service.rs
~/workspace/community-board-log/log-engine$ touch src/worker.rs
~/workspace/community-board-log/log-engine$ tree
.
├── Cargo.toml
├── build.rs
└── src
    ├── main.rs
    ├── service.rs
    └── worker.rs

2 directories, 5 files

service.rs

gRPC 서비스 인터페이스에서는
Python에서 들어오는 요청을 검증하고
큐에 밀어 넣는 역할만 수행하여 응답 지연을 최소화한다.

log-engine/src/service.rs

use crate::log_proto::{ActionLogRequest, ActionLogResponse, log_service_server::LogService};
use tokio::sync::mpsc;
use tonic::{Request, Response, Status};

/// 채널 송신부를 소유하는 서비스 구현체 구조체
pub struct LogServiceImpl {
    tx: mpsc::Sender<ActionLogRequest>,
}

impl LogServiceImpl {
    pub fn new(tx: mpsc::Sender<ActionLogRequest>) -> Self {
        Self { tx }
    }
}

#[tonic::async_trait]
impl LogService for LogServiceImpl {
    /// 클라이언트로부터 단일 로그 기록을 받는다.
    async fn record_log(
        &self,
        request: Request<ActionLogRequest>,
    ) -> Result<Response<ActionLogResponse>, Status> {
        let req = request.into_inner();

        if let Err(_) = self.tx.try_send(req) {
            tracing::error!("로그 큐가 가득 찼습니다. 요청을 거부합니다.");
            return Err(Status::resource_exhausted("로그 큐가 가득 찼습니다."));
        }

        Ok(Response::new(ActionLogResponse { success: true }))
    }
}

worker.rs

백그라운드 워커에서는
일정 개수가 모이거나 일정 시간이 지나면
PostgreSQL의 UNNEST 를 사용하여 데이터를 한 번에 밀어 넣는다.

log-engine/src/worker.rs

use crate::log_proto::ActionLogRequest;
use sqlx::PgPool;
use std::time::Duration;
use tokio::sync::mpsc;

/// 채널에서 로그를 읽어와 버퍼링한 뒤
/// 조건에 맞으면 DB에 Bulk Insert하는 함수 호출
pub async fn run_worker(
    mut rx: mpsc::Receiver<ActionLogRequest>,
    pool: PgPool,
) {
    let mut buffer = Vec::with_capacity(1000);
    let mut interval = tokio::time::interval(Duration::from_secs(1));

    loop {
        tokio::select! {
            // 1초마다 버퍼가 비어 있지 않다면 보내기
            _ = interval.tick() => {
                if !buffer.is_empty() {
                    flush_to_db(&pool, &mut buffer).await;
                }
            }
            // 새 로그 수신
            msg = rx.recv() => {
                match msg {
                    Some(log) => {
                        buffer.push(log);
                        // 버퍼가 가득 차면 즉시 보내기
                        if buffer.len() >= 1000 {
                            flush_to_db(&pool, &mut buffer).await;
                        }
                    }
                    None => {
                        tracing::info!("채널 닫힘. 남은 로그 전송 중...");
                        if !buffer.is_empty() {
                            flush_to_db(&pool, &mut buffer).await;
                        }
                        break;
                    }
                }
            }
        }
    }
}

/// 버퍼의 데이터를 DB에 배열 형태로 Bulk Insert 수행
async fn flush_to_db(
    pool: &PgPool,
    buffer: &mut Vec<ActionLogRequest>
) {
    let len = buffer.len();

    // 컬럼별로 데이터를 모아 한 번의 쿼리로 전달
    let user_ids: Vec<String> = buffer.iter().map(|l| l.user_id.clone()).collect();
    let action_types: Vec<String> = buffer.iter().map(|l| l.action_type.clone()).collect();
    let target_ids: Vec<Option<String>> = buffer.iter().map(|l| l.target_id.clone()).collect();
    let ip_addresses: Vec<String> = buffer.iter().map(|l| l.ip_address.clone()).collect();

    let query = "
        INSERT INTO action_logs (user_id, action_type, target_id, ip_address)
        SELECT * FROM UNNEST($1::text[], $2::text[], $3::text[], $4::inet[])
    ";

    match sqlx::query(query)
        .bind(&user_ids)
        .bind(&action_types)
        .bind(&target_ids)
        .bind(&ip_addresses)
        .execute(pool)
        .await
    {
        Ok(_) => {
            tracing::info!("로그 {}개 성공적으로 삽입 완료", len);
        },
        Err(e) => {
            tracing::error!("로그 삽입 실패: {}", e);
            // 실무에서는 실패한 로그를 Dead Letter Queue에 저장하지만
            // 우리 실습에서는 생략한다.
        },
    }

    buffer.clear();
}

main.rs

시스템의 메인 스레드를 작성한다.

log-engine/src/main.rs

use sqlx::postgres::PgPoolOptions;
use std::env;
use std::net::SocketAddr;
use tokio::sync::mpsc;
use tonic::transport::Server;
use tracing_subscriber::FmtSubscriber;

mod service;
mod worker;

pub mod log_proto {
    tonic::include_proto!("log");
}

use log_proto::log_service_server::LogServiceServer;
use service::LogServiceImpl;

#[tokio::main]
async fn main() -> Result<(), Box<dyn  std::error::Error>> {
    // 환경변수를 사용하여 로그 초기화
    dotenvy::from_path("../.env").ok();
    let subscriber = FmtSubscriber::builder()
        .with_max_level(tracing::Level::INFO)
        .json()
        .finish();
    tracing::subscriber::set_global_default(subscriber)?;

    tracing::info!("로그 엔진 서버 실행...");

    // DB 커넥션 풀
    let db_url = env::var("DATABASE_URL").expect("DATABASE_URL이 설정되어 있지 않습니다.");
    let pool = PgPoolOptions::new()
        .max_connections(10)
        .connect(&db_url)
        .await?;
  
    // 비동기 큐 생성
    let (tx, rx) = mpsc::channel(10000);

    // 백그라운드 워커 스레드
    let worker_handle = tokio::spawn(async move {
        worker::run_worker(rx, pool).await;
    });

    // gRPC 서버 설정 빛 실행
    let addr: SocketAddr = env::var("GRPC_HOST")
        .unwrap_or_else(|_| "192.127.0.1:50051".to_string())
        .parse()?;

    let log_service = LogServiceImpl::new(tx);

    tracing::info!("gRPC 서버 가동 중: {}", addr);

    Server::builder()
        .add_service(LogServiceServer::new(log_service))
        .serve_with_shutdown(addr, shutdown_signal())
        .await?;
  
    tracing::info!("gRPC 서버 종료. 남은 로그를 기록할 때까지 대기...");

    let _ =  worker_handle.await;
    tracing::info!("모든 자원이 정리되었습니다. 서버를 안전하게 종료합니다.");

    Ok(())
}

async fn shutdown_signal() {
    tokio::signal::ctrl_c()
        .await
        .expect("Ctrl+C 시그널 핸들러 설치 실패");
    tracing::info!("종료 시그널 감지");
}
🤖 AI AGENT | 설계 포인트 요약
  • 성능 최적화 (UNNEST): 여러 개의 INSERT INTO ... VALUES (...) 를 만드는 것보다 UNNEST 를 사용하여 배열 매개변수를 테이블처럼 취급하는 방식이 PostgreSQL 엔진 입장에서 파싱 오버헤드가 적고 처리 속도가 가장 빠릅니다.
  • 안전한 종료 (Graceful Shutdown): 메인 스레드에 Ctrl+C 가 들어오면 gRPC 서버가 즉시 내려갑니다. 이로 인해 LogServiceImpl 가 소멸하면서 내부의 txDrop 되고, 워커의 rx.recv()None 을 반환하여 남아있는 버퍼를 안전하게 비우고(flush_to_db) 프로그램을 종료합니다. 로그 유실을 원천 차단하는 현대적이고 안정적인 방식입니다.

빌드 및 실행

~/workspace/community-board-log$ docker compose up -d
~/workspace/community-board-log$ cd log-engine
~/workspace/community-board-log/log-engine$ cargo build
~/workspace/community-board-log/log-engine$ 
   Compiling log-engine v0.1.0 (/Users/edenjint3927/workspace/community-board-log/log-engine)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 1.04s
     Running `target/debug/log-engine`
{"timestamp":"2026-05-19T22:24:38.765910Z","level":"INFO","fields":{"message":"로그 엔진 서버 실행..."},"target":"log_engine"}
{"timestamp":"2026-05-19T22:24:38.828272Z","level":"INFO","fields":{"message":"gRPC 서버 가동 중: 127.0.0.1:50051"},"target":"log_engine"}

다른 터미널을 열고

~/workspace/community-board-log$ grpcurl -plaintext \
  -import-path ./proto -proto log.proto \
  -d '{"user_id": "user_123", "action_type": "VIEW_POST", "target_id": "post_456", "ip_address": "192.168.0.1"}' \
  127.0.0.1:50051 \
  log.LogService/RecordLog
{
  "success": true
}

그 때의 로그

{"timestamp":"2026-05-19T22:24:38.765910Z","level":"INFO","fields":{"message":"로그 엔진 서버 실행..."},"target":"log_engine"}
{"timestamp":"2026-05-19T22:24:38.828272Z","level":"INFO","fields":{"message":"gRPC 서버 가동 중: 127.0.0.1:50051"},"target":"log_engine"}
{"timestamp":"2026-05-19T22:37:41.840728Z","level":"INFO","fields":{"message":"로그 1개 성공적으로 삽입 완료"},"target":"log_engine::worker"}
🤖 AI AGENT | 같은 계층에 두지 않고 docker를 실행하고 들어오는 이유

log-engine/ 디렉토리는 온전히 Rust 소스 코드(src/, Cargo.toml, build.rs 등)만을 관리하는 공간이어야 합니다.

Rust 애플리케이션 입장에서 PostgreSQL 데이터베이스가 Docker로 돌아가든, AWS RDS에 있든, 로컬 환경에 직접 설치되어 있든 알 필요가 없습니다.

데이터베이스 초기화 스크립트(init.sql)나 컨테이너 오케스트레이션 설정(docker-compose.yml)이 Rust 프로젝트 안에 섞여 있으면, "비즈니스 로직(코드)"과 "인프라(환경)"가 강하게 결합되어 응집도가 떨어집니다.

profile
Peter J Online Space - since July 2020 | 아무데서나 채용해줬으면 좋겠다

0개의 댓글