데이터베이스 연결 재구조화

Pt J·약 6시간 전
post-thumbnail

데이터베이스 연결 재구조화

이전 실습에서는 데이터베이스 연결 자체에 집중하느라
구조적인 확장에 대해 고려하지 않고 코드를 작성해 보았다.

하지만 모든 기능이 src/lib.rs 파일 하나에 들어가면
프로젝트를 관리하기 어려워진다는 것을 ⟨모듈화⟩에서 언급한 바 있다.

따라서 본격적인 데이터베이스 실습에 들어가기에 앞서
프로젝트를 재구조화하도록 하겠다.

작업공간 조정 및 구조 확인

Tracing 및 로그 아카이빙 로직 관리를 담당할 src/logger.rs 파일과
PostgreSQL 연결 및 Pool 관리를 담당할 src/db.rs 파일을 생성하고
src/lib.rs 파일은 Python과의 인터페이스만 담당하도록 가볍게 유지할 것이다.

필요에 따라 공통 유틸리티 및 에러 처리를 담당할
src/common.rs 파일도 생성할 수 있지만 여기선 생략한다.

이후 각 프로젝트에서 필요한 로직이 있다면 이것을 템플릿삼아
해당 프로젝트를 위한 파일을 생성하여 사용하면 된다.

~/workspace/db-connection$ # 기존 구조 확인
~/workspace/db-connection$ tree -a -I venv -I target -I postgres_data -I logs
.
├── .env
├── .github
│   └── workflows
│       └── CI.yml
├── .gitignore
├── app
│   └── main.py
├── Cargo.lock
├── Cargo.toml
├── docker-compose.yml
├── pyproject.toml
└── src
    └── lib.rs
~/workspace/db-connection$ touch src/logger.rs src/db.rs
~/workspace/db-connection$ tree -a -I venv -I target -I postgres_data -I logs
.
├── .env
├── .github
│   └── workflows
│       └── CI.yml
├── .gitignore
├── app
│   └── main.py
├── Cargo.lock
├── Cargo.toml
├── docker-compose.yml
├── pyproject.toml
└── src
    ├── db.rs
    ├── lib.rs
    └── logger.rs

코드 분리

로그 관리 모듈

기존 코드에서는 server.log prefix가 붙은 로그 파일이
한 시간마다 생성되도록 구현되어 있었는데
Python 로그와 구분하기 위해 rust-engine.log prefix를 사용하고
하루 단위로 로그를 생성하도록 수정하겠다.

기존에는 콘솔에 출력할 것과 파일로 저장할 것에 대한 정보를
tracing_subscriber::registry()with() 로 바로 전달하였는데
코드를 알아보기 쉽게 변수로 따로 작성하도록 하겠다.
추가로, 텍스트 파일에 ANSI 색상 코드가 들어가면 가독성이 떨어지므로
콘솔에서만 색상 코드를 적용하고 파일로 저장 시 생략하도록 하겠다.

src/logger.rs

use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

pub fn init_tracing() -> WorkerGuard {
    let file_appender = tracing_appender::rolling::daily("logs", "rust-engine.log");
    let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);

    // 로그 수준
    let filter = EnvFilter::try_from_default_env()
        .unwrap_or_else(|_| EnvFilter::new("info"));

    // 콘솔 출력
    let stdout_layer = fmt::layer()
        .with_thread_ids(true)
        .with_thread_names(true)
        .with_ansi(true);

    // 파일 저장
    let file_layer = fmt::layer()
        .with_writer(non_blocking)
        .json()
        .with_thread_ids(true)
        .with_ansi(false);

    tracing_subscriber::registry()
        .with(filter)
        .with(stdout_layer)
        .with(file_layer)
        .init();

    guard
}

데이터베이스 관리 모듈

데이터베이스 작업은 #[instrument] 속성을 붙여 로그를 남기되
연결 함수에는 skip(url) 을 사용하여 보안 이슈가 발생하지 않도록 한다.

src/db.rs

use sqlx::postgres::PgPoolOptions;
use sqlx::{Pool, Postgres};
use std::sync::OnceLock;
use tracing::{info, warn, error, instrument};

pub static DB_POOL: OnceLock<Pool<Postgres>> = OnceLock::new();

#[instrument(skip(url))]
pub async fn connect(url: &str) -> Result<(), sqlx::Error> {
    let pool = PgPoolOptions::new()
        .max_connections(10)
        .connect(url)
        .await
        .map_err(|e| {
            error!("PostgreSQL 18 연결 실패");
            e   
        })?;

    if DB_POOL.set(pool).is_ok() {
        info!("PostgreSQL 18 연결 풀 초기화 완료");
    } else {
        warn!("PostgreSQL 18 연결 풀이 이미 초기화되어 있음");
    }   

    Ok(())
}

#[instrument]
pub async fn close() {
    if let Some(pool) = DB_POOL.get() {
        pool.close().await;

        info!("PostgreSQL 18 연결 안전하게 종료");
    } else {
        warn!("종료할 DB 연결 풀이 존재하지 않음");
    }   
}

#[instrument]
pub fn is_alive() -> bool {
    match DB_POOL.get() {
        Some(pool) => !pool.is_closed(),
        None => false,
    }   
}

인터페이스 모듈

로그 관리 모듈과 데이터베이스 관리 모듈을 불러와 사용한다.
이후에 추가되는 로직들도 이곳에서 Python과 연결될 것이다.

src/lib.rs

mod db;
mod logger;

use pyo3::prelude::*;
use tokio::runtime::Runtime;
use dotenvy::dotenv;
use std::env;
use std::sync::OnceLock;
use tracing::{debug, error, info};
use tracing_appender::non_blocking::WorkerGuard;

static LOG_GUARD: OnceLock<WorkerGuard> = OnceLock::new();

#[pyfunction]
fn init_engine() -> PyResult<String> {
    info!("엔진 초기화 절차 시작");

    debug!("로깅 시작");
    if LOG_GUARD.get().is_none() {
        let guard = logger::init_tracing();
        let _ = LOG_GUARD.set(guard);
    }

    debug!("환경 변수 로드");
    dotenv().ok();
    let url = env::var("DATABASE_URL")
        .map_err(|e| {
            error!("DATABASE_URL 환경 변수를 찾을 수 없음: {}", e);
            pyo3::exceptions::PyRuntimeError::new_err("DATABASE_URL not found in .env")
        })?;
  
    debug!("데이터베이스 연결 시작");
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        db::connect(&url).await
    })
    .map_err(|e| {
        error!("DB 연결 풀 생성 실패: {}", e);
        pyo3::exceptions::PyRuntimeError::new_err(format!("DB connection failed: {}", e))
    })?;

    Ok("엔진 초기화 및 DB 연결 성공".to_string())
}

#[pyfunction]
fn shutdown_engine() -> PyResult<()> {
    info!("엔진 종료 절차 시작");
  
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        db::close().await;
    });

    Ok(())
}

#[pyfunction]
fn check_connection() -> PyResult<bool> {
    debug!("DB 연결 상태 확인");

    Ok(db::is_alive())
}

#[pymodule]
fn rust_engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(init_engine, m)?)?;
    m.add_function(wrap_pyfunction!(shutdown_engine, m)?)?;
    m.add_function(wrap_pyfunction!(check_connection, m)?)?;

    Ok(())
}

Python 코드

Rust 엔진의 수정된 함수명에 따라 Python 코드도 수정한다.

app/main.py

from fastapi import FastAPI, HTTPException
from fastapi.responses import ORJSONResponse
from contextlib import asynccontextmanager
import rust_engine
import logging

logger = logging.getLogger("uvicorn.error")

class UTF8ORJSONResponse(ORJSONResponse):
    media_type = "application/json; charset=utf-8"

@asynccontextmanager
async def lifespan(app: FastAPI):
    logger.info("Rust 엔진 초기화 중...")
    try:
        msg = rust_engine.init_engine()
        logger.info(msg)
    except Exception as e:
        logger.info(f"Rust 엔진 초기화 실패: {e}")

    yield # 앱 가동

    # [SHUTDOWN]

    logger.info("서버 종료 감지: 자원 정리 중...")
    try:
        rust_engine.shutdown_engine()
        logger.info("모든 연결 안전하게 종료")
    except Exception as e:
        logger.error(f"종료 중 오류 발생: {e}")

app = FastAPI(default_response_class=UTF8ORJSONResponse, lifespan=lifespan)

@app.get("/")
def read_root():
    return {
        "status": "200",
        "info": "서버 가동 중입니다."
    }

@app.get("/db-status")
def get_db_status():
    logger.info("DB 상태 체크 요청")
    is_alive = rust_engine.check_connection()

    if is_alive:
        logger.info("DB 연결 상태 양호")
        return {
            "status": "online",
            "message": "Rust 엔진이 PostgreSQL을 사용합니다."
        }
    else:
        logger.error("DB 연결 끊김 감지")
        raise HTTPException(
            status_code=500,
            detail="DB 연결이 끊겼거나 초기화되지 않았습니다."
        )

빌드 및 실행

docker 서비스가 내려가 있다면 다시 실행해 준다.

~/workspace/db-connection$ docker compose up -d 

docker 서비스를 내리고 싶을 땐 다음 명령어를 사용하면 된다.

~/workspace/db-connection$ docker compose down

Maturin 라이브러리를 통해 Rust 코드를 다시 컴파일한 후
uvicorn 라이브러리를 통해 FastAPI를 실행하면
이전과 동일하게 실행되는 것을 확인할 수 있다.

~/workspace/db-connection$ maturin develop
~/workspace/db-connection$ uvicorn app.main:app --reload

앞으로 데이터베이스를 다루는 예제에서는
이 프로젝트 디렉토리를 복제 후 수정하여 사용할 것이다.

profile
Peter J Online Space - since July 2020 | 아무데서나 채용해줬으면 좋겠다

0개의 댓글