날짜별 정보를 담은 데이터베이스 (3) FastAPI 게이트웨이

Pt J·약 9시간 전
post-thumbnail

날짜별 정보를 담은 데이터베이스 (3) FastAPI 게이트웨이

설정 파일

Python 게이트웨이를 작성할 디렉토리 및 파일을 생성한다.

~/workspace/community-board-log$ mkdir log-gateway && cd log-gateway
~/workspace/community-board-log/log-gateway$ uv init
~/workspace/community-board-log/log-gateway$ uv add fastapi uvicorn pydantic grpcio grpcio-tools

의존성 파일은 자동으로 생성되며
설명 정도만 추가로 수정해 주면 된다.

log-gateway/pyproject.toml

[project]
name = "log-gateway"
version = "0.1.0"
description = "FastAPI to gRPC Gateway for Community Board Action Logs"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
    "fastapi>=0.136.1",
    "grpcio>=1.80.0",
    "grpcio-tools>=1.80.0",
    "pydantic>=2.13.4",
    "uvicorn>=0.47.0",
]

proto/log.proto 파일에 정의한 gRPC 서비스 명세를 기반으로
Python 파일을 생성하는 명령어를 실행한다.

~/workspace/community-board-log/log-gateway$ uv run python -m grpc_tools.protoc \
    -I ../proto \
    --python_out=. \
    --pyi_out=. \
    --grpc_python_out=. \
    ../proto/log.proto

자동 생성된 파일들은 다음과 같다.

log-gateway/log_pb2.py

# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler.  DO NOT EDIT!
# NO CHECKED-IN PROTOBUF GENCODE
# source: log.proto
# Protobuf Python Version: 6.31.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import runtime_version as _runtime_version
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
_runtime_version.ValidateProtobufRuntimeVersion(
    _runtime_version.Domain.PUBLIC,
    6,
    31,
    1,
    '',
    'log.proto'
)
# @@protoc_insertion_point(imports)

_sym_db = _symbol_database.Default()




DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\tlog.proto\x12\x03log\"r\n\x10\x41\x63tionLogRequest\x12\x0f\n\x07user_id\x18\x01 \x01(\t\x12\x13\n\x0b\x61\x63tion_type\x18\x02 \x01(\t\x12\x16\n\ttarget_id\x18\x03 \x01(\tH\x00\x88\x01\x01\x12\x12\n\nip_address\x18\x04 \x01(\tB\x0c\n\n_target_id\"$\n\x11\x41\x63tionLogResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x32H\n\nLogService\x12:\n\tRecordLog\x12\x15.log.ActionLogRequest\x1a\x16.log.ActionLogResponseb\x06proto3')

_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'log_pb2', _globals)
if not _descriptor._USE_C_DESCRIPTORS:
  DESCRIPTOR._loaded_options = None
  _globals['_ACTIONLOGREQUEST']._serialized_start=18
  _globals['_ACTIONLOGREQUEST']._serialized_end=132
  _globals['_ACTIONLOGRESPONSE']._serialized_start=134
  _globals['_ACTIONLOGRESPONSE']._serialized_end=170
  _globals['_LOGSERVICE']._serialized_start=172
  _globals['_LOGSERVICE']._serialized_end=244
# @@protoc_insertion_point(module_scope)

log-gateway/log_pb2.pyi

from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Optional as _Optional

DESCRIPTOR: _descriptor.FileDescriptor

class ActionLogRequest(_message.Message):
    __slots__ = ("user_id", "action_type", "target_id", "ip_address")
    USER_ID_FIELD_NUMBER: _ClassVar[int]
    ACTION_TYPE_FIELD_NUMBER: _ClassVar[int]
    TARGET_ID_FIELD_NUMBER: _ClassVar[int]
    IP_ADDRESS_FIELD_NUMBER: _ClassVar[int]
    user_id: str
    action_type: str
    target_id: str
    ip_address: str
    def __init__(self, user_id: _Optional[str] = ..., action_type: _Optional[str] = ..., target_id: _Optional[str] = ..., ip_address: _Optional[str] = ...) -> None: ...

class ActionLogResponse(_message.Message):
    __slots__ = ("success",)
    SUCCESS_FIELD_NUMBER: _ClassVar[int]
    success: bool
    def __init__(self, success: bool = ...) -> None: ...

log-gateway/log_pb2_grpc.py

# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings

import log_pb2 as log__pb2

GRPC_GENERATED_VERSION = '1.80.0'
GRPC_VERSION = grpc.__version__
_version_not_supported = False

try:
    from grpc._utilities import first_version_is_lower
    _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
    _version_not_supported = True

if _version_not_supported:
    raise RuntimeError(
        f'The grpc package installed is at version {GRPC_VERSION},'
        + ' but the generated code in log_pb2_grpc.py depends on'
        + f' grpcio>={GRPC_GENERATED_VERSION}.'
        + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
        + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
    )


class LogServiceStub(object):
    """로그 수집 서비스 정의
    """

    def __init__(self, channel):
        """Constructor.

        Args:
            channel: A grpc.Channel.
        """
        self.RecordLog = channel.unary_unary(
                '/log.LogService/RecordLog',
                request_serializer=log__pb2.ActionLogRequest.SerializeToString,
                response_deserializer=log__pb2.ActionLogResponse.FromString,
                _registered_method=True)


class LogServiceServicer(object):
    """로그 수집 서비스 정의
    """

    def RecordLog(self, request, context):
        """단일 로그 기록 (향후 스트리밍 방식으로 확장이 용이하도록 설계)
        """
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
        context.set_details('Method not implemented!')
        raise NotImplementedError('Method not implemented!')


def add_LogServiceServicer_to_server(servicer, server):
    rpc_method_handlers = {
            'RecordLog': grpc.unary_unary_rpc_method_handler(
                    servicer.RecordLog,
                    request_deserializer=log__pb2.ActionLogRequest.FromString,
                    response_serializer=log__pb2.ActionLogResponse.SerializeToString,
            ),
    }
    generic_handler = grpc.method_handlers_generic_handler(
            'log.LogService', rpc_method_handlers)
    server.add_generic_rpc_handlers((generic_handler,))
    server.add_registered_method_handlers('log.LogService', rpc_method_handlers)


 # This class is part of an EXPERIMENTAL API.
class LogService(object):
    """로그 수집 서비스 정의
    """

    @staticmethod
    def RecordLog(request,
            target,
            options=(),
            channel_credentials=None,
            call_credentials=None,
            insecure=False,
            compression=None,
            wait_for_ready=None,
            timeout=None,
            metadata=None):
        return grpc.experimental.unary_unary(
            request,
            target,
            '/log.LogService/RecordLog',
            log__pb2.ActionLogRequest.SerializeToString,
            log__pb2.ActionLogResponse.FromString,
            options,
            channel_credentials,
            insecure,
            call_credentials,
            compression,
            wait_for_ready,
            timeout,
            metadata,
            _registered_method=True)

코드 작성

main.py

log-gateway/main.py

import logging
from contextlib import asynccontextmanager

import grpc
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel, Field

import log_pb2
import log_pb2_grpc

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

grpc_channel = None
grpc_stub = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global grpc_channel, grpc_stub
    grpc_channel = grpc.aio.insecure_channel('127.0.0.1:50051')
    grpc_stub = log_pb2_grpc.LogSerciceStub(grpc_channel)
    logger.info("gRPC 채널 연결")

    yield

    await grpc_channel.close()
    logger.info("gRPC 채널 종료")

app = FastAPI(lifespan=lifespan, title="Log Gateway API")

class ActionLogRequest(BaseModel):
    user_id: str = Field(..., max_length=50)
    action_type: str = Field(..., max_length=30)
    target_id: Optional[str] = Field(None, max_length=50)

@app.post("/api/v1/logs", status_code=202)
async def record_action_log(log_data: ActionLogRequest, request: Request):
    client_ip = request.headers.get('X-Forwarded-For') or request.client.host

    grpc_request = log_pb2.ActionLogRequest(
        user_id=log_data.user_id,
        action_type=log_data.action_type,
        target_id=log_data.target_id if log_data.target_id else "",
        ip_address=client_ip
    )

    try:
        response = await grpc_stub.RecordLog(grpc_request)
        if response.success:
            return {
                "status": "accepted",
                "message": "Log successfully queued"
            }
        else:
            raise HTTPException(
                status_code=500,
                detail="Log engine rejected"
            )
    except grpc.RpcError as e:
        logger.error(f"gRPC 통신 에러: {e.details()} (코드: {e.code()}")
        raise HTTPException(
            status_code=503,
            detail="Log engine is currently unavailable"
        )

실행 및 테스트

~/workspace/community-board-log/log-gateway$ uv run uvicorn main:app --reload --port 8000

새 터미널을 열고

~$ curl -iX POST http://127.0.0.1:8000/api/v1/logs \
  -H "Content-Type: application/json" \
  -d '{
    "user_id": "uv_user_1",
    "action_type": "LOGIN",
    "target_id": "none"
  }'
HTTP/1.1 202 Accepted
date: Wed, 20 May 2026 00:56:17 GMT
server: uvicorn
content-length: 57
content-type: application/json

{"status":"accepted","message":"Log successfully queued"}%      

그 때 로그

INFO:     Started reloader process [35670] using StatReload
INFO:     Started server process [35672]
INFO:     Waiting for application startup.
2026-05-20 09:54:35,155 - INFO - gRPC 채널 연결
INFO:     Application startup complete.
INFO:     127.0.0.1:49322 - "POST /api/v1/logs HTTP/1.1" 202 Accepted

DB 확인: Rust 엔진 테스트에서 넣은 데이터와 방금 넣은 데이터가 들어 있다.

~$ docker exec -it log_db psql -U admin -d log_db -c "SELECT * FROM action_logs;"
 id |  user_id  | action_type | target_id | ip_address  |          created_at           
----+-----------+-------------+-----------+-------------+-------------------------------
  1 | user_123  | VIEW_POST   | post_456  | 192.168.0.1 | 2026-05-20 07:37:41.836401+09
  2 | uv_user_1 | LOGIN       | none      | 127.0.0.1   | 2026-05-20 09:56:18.660744+09
(2 rows)
profile
Peter J Online Space - since July 2020 | 아무데서나 채용해줬으면 좋겠다

0개의 댓글