대용량 텍스트 검색 및 전송 최적화 엔진 (5) Elasticsearch 기반 풀텍스트 검색 (下)

Pt J·3일 전
post-thumbnail

대용량 텍스트 검색 및 전송 최적화 엔진 (5) Elasticsearch 기반 풀텍스트 검색 (下)

이제 본격적으로 대용량 텍스트를 빠르게 벌크 색인하는 API와
Nori 분석기를 연동해 하이라이팅을 제공하는 검색 API를 구축할 차례다.

색인 파이프라인

Python

작성한 API를 모아둘 디렉토리 routers 를 생성하고
Weak ETag를 활용해 캐싱 히트 시 304를 먼저 리턴하는 최적화 로직을 사용하여
검색 API를 작성한다.

gateway/app/routers/search.py

import logging
import hashlib
from fastapi import APIRouter, HTTPException, Request, Response
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk

import fast_text_engine
from ..config import settings
from ..schemas import (
    IndexRequest,
    IndexResponse,
    SearchResponse,
    SearchHit,
    IndexStatsResponse,
)

logger = logging.getLogger("gateway.search")
router = APIRouter(prefix="/api", tags=["search"])

async def get_index_stats(es_client: AsyncElasticsearch, index_name: str) -> IndexStatsResponse:
    """Elasticsearch 인덱스의 현재 문서 수와 세그먼트 수를 조회합니다."""
    try:
        stats = await es_client.indices.stats(index=index_name)
        index_stats = stats["indices"][index_name]["total"]
        doc_count = index_stats["docs"]["count"]
        segment_count = index_stats["segments"]["count"]

        return IndexStatsResponse(document_count=doc_count, segment_count=segment_count)
    except Exception as e:
        logger.warning(f"인덱스 통계 조회 실패: {e}")
        return IndexStatsResponse(document_count=0, segment_count=0)

def generate_weak_etag(stats: IndexStatsResponse, query: str) -> str:
    """인덱스 통계 정보와 검색 쿼리 문자열을 결합하여 Weak ETag를 생성합니다.

    형식: W/"<doc_count>-<segment_count>-<query_hash>"
    """
    query_hash = hashlib.md5(query.strip().encode("utf-8")).hexdigest()[:8]
    return f'W/"{stats.document_count}-{stats.segment_count}-{query_hash}"'

@router.post("/index")
async def index_data(request: Request, body: IndexRequest) -> IndexResponse:
    """Rust FFI 엔진을 호출하여 대용량 더미 데이터를 생성한 뒤
    Elasticsearch Bulk API를 활용해 고속 색인합니다.
    """
    es_client: AsyncElasticsearch = request.app.state.es_client
    index_name = settings.ELASTICSEARCH_INDEX

    # Rust 엔진을 통해 대용량 더미 데이터 10만 건 생성
    try:
        text_list = fast_text_engine.generate_dummy_data(body.lines)
        logger.info(f"Rust FFI를 통해 {len(text_list)}건의 더미 텍스트를 생성했습니다.")
    except Exception as e:
        logger.error(f"더미 데이터 생성 실패: {e}")
        raise HTTPException(status_code=500, detail="Rust FFI 연산 중 오류가 발생했습니다.")

    # Bulk API 액션 정의
    actions = [
        {
            "_index": index_name,
            "_id": f"doc-{i}",
            "_source": {
                "line_num": i + 1,
                "text": line
            }
        } for i, line in enumerate(text_list)
    ]

    # 비동기 Bulk 색인
    try:
        success, failed = await async_bulk(es_client, actions)
        logger.info(f"Bulk 색인 성공: {success} 건, 실패: {len(failed) if isinstance(failed, list) else failed} 건")

        await es_client.indices.refresh(index=index_name)

        return IndexResponse(status="success", indexed_count=success)
    except Exception as e:
        logger.error(f"색인 중 에러 발생: {e}")
        raise HTTPException(status_code=500, detail="색인 작업 중 에러가 발생했습니다.")

@router.get("/search", response_model=None)
async def search_data(request: Request, response: Response, q: str, from_idx: int = 0, size: int = 10) -> SearchResponse | Response:
    """Nori 분석기를 사용해 본문을 풀텍스트 검색하고 하이라이팅을 제공합니다.
    검색 질의 전에 Weak ETag를 먼저 검사하여 캐시 히트 시 304를 반환합니다.
    """
    es_client: AsyncElasticsearch = request.app.state.es_client
    index_name = settings.ELASTICSEARCH_INDEX

    # 인텍스 통계 가져와 ETag 검증
    stats = await get_index_stats(es_client, index_name)
    etag_val = generate_weak_etag(stats, q)
    if_none_match = request.headers.get("If-None-Match")

    if if_none_match == etag_val and stats.document_count > 0:
        logger.info(f"검색 캐시 히트! (304 Not Modified) - Query: '{q}', Etag: {etag_val}")
        return Response(status_code=304)

    # ES 쿼리 생성
    query_body = {
        "from": from_idx,
        "size": size,
        "query": {
            "match": {
                "text": {
                    "query": q,
                    "analyzer": "nori_analyzer"
                }
            }
        },
        "highlight": {
            "fields": {
                "text": {}
            }
        }
    }

    try:
        res = await es_client.search(index=index_name, body=query_body)
        total_hits = res["hits"]["total"]["value"]
        hits_data =[]

        for hit in res["hits"]["hits"]:
            source = hit["_source"]
            highlight = hit.get("highlight", {}).get("text", [])
            hits_data.append(
                SearchHit(
                    id=hit["_id"],
                    text=source["text"],
                    highlight=highlight
                )
            )

        # Weak Etag 캐시 헤더 설정
        response.headers["ETag"] = etag_val
        response.headers["Cache-Control"] = "public, max-age=0, must-revalidate"

        return SearchResponse(total=total_hits, hits=hits_data)
    except Exception as e:
        logger.error(f"검색 쿼리 실행 실패: {e}")
        raise HTTPException(status_code=500, detail="검색 쿼리 실행 중 에러가 발생했습니다.")
🤖 AI AGENT | search_data() 에서 response 도 받는 이유

FastAPI는 의존성 주입(Dependency Injection) 방식으로 작동합니다. 라우터 함수 매개변수에 request: Request, response: Response 라는 타입 힌팅을 적어두면, FastAPI가 이 엔드포인트가 호출될 때 현재 들어온 HTTP 요청 정보(Request)앞으로 나갈 HTTP 응답 정보(Response) 객체를 알아서 생성해 함수 내부로 찔러 넣어 줍니다.

이 두 객체는 다음과 같이 활용됩니다.

  • request: Request 객체:
    • 목적: 클라이언트가 보낸 HTTP 헤더 중에서 캐시 확인용 헤더인 If-None-Match 를 읽기 위해 사용합니다 (request.headers.get("If-None-Match")).
    • 또한, 저희가 lifespan 시점에 생성해서 앱 전역 상태에 저장해 둔 es_client 를 꺼내 쓰기 위해 사용합니다 (request.app.state.es_client).
  • response: Response 객체:
    • 목적: 검색 결과를 클라이언트에게 정상적으로 돌려줄 때, HTTP 응답 헤더 영역에 캐시 관련 헤더(ETag, Cache-Control)를 직접 심어주기 위해 사용합니다.
    • response.headers["ETag"] = etag_val 처럼 값을 채워 넣으면, FastAPI가 이 응답 정보와 본문(Body) 데이터를 합쳐서 최종 클라이언트 브라우저로 패킷을 쏘아 보냅니다.
🤖 AI AGENT | search_data()response_model=None

이를 생략하면 다음과 같은 오류가 발생합니다.

Invalid args for response field! Hint: check that app.schemas.SearchResponse | starlette.responses.Response is a valid Pydantic field type.

최신 FastAPI는 함수의 반환 타입 힌트(-> SearchResponse | Response)를 기반으로 응답 스펙을 자동 생성하고 데이터를 직렬화하려고 시도합니다. 하지만 Response (Starlette/FastAPI 기본 응답 객체)는 Pydantic 모델이 아니기 때문에, FastAPI가 이를 필드 타입으로 해석하지 못해 에러가 발생한 것입니다.

💡 해결 방법
데코레이터에 response_model=None을 추가하여 FastAPI가 반환 타입 힌트로부터 자동 응답 모델을 생성하지 않도록 비활성화해주시면 됩니다. (에러 메시지 힌트에서도 이 방법을 권장하고 있습니다.)

작성한 라우터를 등록한다.

gateway/app/main.py

import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from brotli_asgi import BrotliMiddleware
import hashlib

from elasticsearch import AsyncElasticsearch

import fast_text_engine
from .config import settings
from .schemas import HealthResponse, TextDataResponse
from .routers import search

# (중략)

app = FastAPI(
    title="Fast Text Search Gateway",
    description="대용량 텍스트 최적화 전송 및 Elasticsearch 검색을 처리하는 API 게이트웨이",
    version="0.1.0",
    lifespan=lifespan
)

app.include_router(search.router)

app.add_middleware(GZipMiddleware, minimum_size=1000)
app.add_middleware(BrotliMiddleware, quality=4, minimum_size=1000)

# (후략)

서버를 가동하고 테스트를 해 보면,

[터미널 A | 게이트웨이 실행]

~/workspace/fast-text-search/gateway$ uv run uvicorn app.main:app --reload

[터미널 B | 응답 확인]

~$ curl -X POST http://localhost:8000/api/index \
  -H "Content-Type: application/json" \
  -d '{"lines": 10}'

{"status":"success","indexed_count":10}%  
~$ curl -i "http://localhost:8000/api/search?q=test&size=3"

HTTP/1.1 200 OK
date: Tue, 23 Jun 2026 02:24:24 GMT
server: uvicorn
content-length: 21
content-type: application/json
etag: W/"10-1-098f6bcd"
cache-control: public, max-age=0, must-revalidate

{"total":0,"hits":[]}%  
~$ curl -i -H 'If-None-Match: W/"10-1-098f6bcd"' "http://localhost:8000/api/search?q=test&size=3"

HTTP/1.1 304 Not Modified
date: Tue, 23 Jun 2026 02:28:15 GMT
server: uvicorn

데이터를 추가하면

~$ curl -X POST http://localhost:8000/api/index \
  -H "Content-Type: application/json" \
  -d '{"lines": 10}'

{"status":"success","indexed_count":10}%   
~$  curl -i -H 'If-None-Match: W/"10-2-9ece2a1b"' "http://localhost:8000/api/search?q=test&size=3"

HTTP/1.1 200 OK
date: Tue, 23 Jun 2026 02:30:42 GMT
server: uvicorn
content-length: 21
content-type: application/json
etag: W/"10-1-098f6bcd"
cache-control: public, max-age=0, must-revalidate

{"total":0,"hits":[]}%     

응답이 있는 경우

~$ curl -i -G "http://localhost:8000/api/search" \
  --data-urlencode "q=테스트" \
  -d "size=3"

HTTP/1.1 200 OK
date: Tue, 23 Jun 2026 02:33:52 GMT
server: uvicorn
content-length: 993
content-type: application/json
etag: W/"10-1-3b6e8490"
cache-control: public, max-age=0, must-revalidate

{"total":10,"hits":[{"id":"doc-0","text":"이것은 Rust 엔진에서 생성된 0번째 줄 더미 데이터입니다. 대용량 페이로드 최적화 테스트를 위해 반복되는 텍스트 세그먼트입니다.","highlight":["대용량 페이로드 최적화 <em>테스트</em>를 위해 반복되는 텍스트 세그먼트입니다."]},{"id":"doc-1","text":"이것은 Rust 엔진에서 생성된 1번째 줄 더미 데이터입니다. 대용량 페이로드 최적화 테스트를 위해 반복되는 텍스트 세그먼트입니다.","highlight":["대용량 페이로드 최적화 <em>테스트</em>를 위해 반복되는 텍스트 세그먼트입니다."]},{"id":"doc-2","text":"이것은 Rust 엔진에서 생성된 2번째 줄 더미 데이터입니다. 대용량 페이로드 최적화 테스트를 위해 반복되는 텍스트 세그먼트입니다.","highlight":["대용량 페이로드 최적화 <em>테스트</em>를 위해 반복되는 텍스트 세그먼트입니다."]}]}% 
~$ curl -i -H 'If-None-Match: W/"10-1-3b6e8490"' \
  -G "http://localhost:8000/api/search" \     
  --data-urlencode "q=테스트" \
  -d "size=3"

HTTP/1.1 304 Not Modified
date: Tue, 23 Jun 2026 02:35:55 GMT
server: uvicorn

통합 테스트

앞서 수동으로 진행한 테스트를 pytest로 자동화하여 검증하는 코드를 작성한다.

gateway/tests/test_search.py

import pytest
from fastapi.testclient import TestClient
from app.main import app

@pytest.fixture(scope="module")
def client() -> TestClient:
    with TestClient(app) as c:
        yield c

def test_bulk_indexing(client: TestClient) -> None:
    """10줄의 더미 데이터를 Elasticsearch에 색인하는 API를 테스트합니다."""
    response = client.post("/api/index", json={"lines": 10})
    assert response.status_code == 200

    data = response.json()
    assert data["status"] == "success"
    assert data["indexed_count"] == 10

def test_search_and_highlighting(client: TestClient) -> None:
    """한글 '테스트' 검색 시 Nori 형태소 분석 및 하이라이팅이 적용되는지 검증합니다."""
    response = client.get("/api/search?q=테스트&size=3")
    assert response.status_code == 200

    data = response.json()
    assert data["total"] > 0
    assert len(data["hits"]) > 0

    first_hit_highlight = data["hits"][0]["highlight"]
    assert any("<em>테스트</em>" in hl for hl in first_hit_highlight)

    assert "ETag" in response.headers
    assert response.headers["ETag"].startswith('W/"')
    assert "Cache-Control" in response.headers

def test_search_cache_hit_304(client: TestClient) -> None:
    """동일한 쿼리에 대해 If-None-Match 헤더를 전달했을 때 304 Not Modified를 반환하는지 검증합니다."""
    first_response = client.get("/api/search?q=테스트&size=3")
    assert first_response.status_code == 200
    etag = first_response.headers.get("ETag")
    assert etag is not None

    headers = {"If-None-Match": etag}
    second_response = client.get("/api/search?q=테스트&size=3", headers=headers)

    assert second_response.status_code == 304
    assert second_response.text == ""

def test_search_cache_miss_after_new_index(client: TestClient) -> None:
    """색인이 추가되어 데이터가 업데이트되면 기존 ETag 캐시가 만료(200 OK 및 신규 ETag 발급)되는지 검증합니다."""
    first_response = client.get("/api/search?q=테스트&size=3")
    assert first_response.status_code == 200
    old_etag = first_response.headers.get("ETag")

    index_response = client.post("/api/index", json={"lines": 5})
    assert index_response.status_code == 200

    headers = {"If-None-Match": old_etag}
    second_response = client.get("/api/search?q=테스트&size=3", headers=headers)

    assert second_response.status_code == 200
    new_etag = second_response.headers.get("ETag")
    assert new_etag != old_etag

def test_search_with_different_query(client: TestClient) -> None:
    """다른 검색어에 대해서는 정상적으로 검색 결과와 새로운 ETag를 반환하는지 검증합니다."""
    first_response = client.get("/api/search?q=테스트&size=3")
    assert first_response.status_code == 200
    old_etag = first_response.headers.get("ETag")

    second_response = client.get("/api/search?q=데이터&size=3")
    assert second_response.status_code == 200
    new_etag = second_response.headers.get("ETag")

    assert new_etag != old_etag

테스트를 실행해 보면,

~/workspace/fast-text-search/gateway$ uv run pytest tests/test_search.py -v

================== test session starts ==================
platform darwin -- Python 3.12.13, pytest-9.1.0, pluggy-1.6.0 -- /Users/edenjint3927/workspace/fast-text-search/gateway/.venv/bin/python3
cachedir: .pytest_cache
rootdir: /Users/edenjint3927/workspace/fast-text-search/gateway
configfile: pyproject.toml
plugins: anyio-4.13.0
collected 5 items                                       

tests/test_search.py::test_bulk_indexing PASSED   [ 20%]
tests/test_search.py::test_search_and_highlighting PASSED [ 40%]
tests/test_search.py::test_search_cache_hit_304 PASSED [ 60%]
tests/test_search.py::test_search_cache_miss_after_new_index PASSED [ 80%]
tests/test_search.py::test_search_with_different_query PASSED [100%]

=================== 5 passed in 0.24s ===================
🤖 AI AGENT | 학습 회고
  • 완료일: 2026-06-23
  • 성과:
    • Elasticsearch 9.4.2 + Kibana 9.4.2 기반 단일 노드 스택을 Nori 분석기를 탑재한 Docker 이미지로 구축 및 가동했습니다.
    • Rust FFI 엔진에서 생성한 데이터를 Elasticsearch Bulk API로 초고속 색인하는 파이프라인을 완성했습니다.
    • Query DSL의 matchhighlight 기능을 결합하여, 형태소 기반 한글 풀텍스트 검색 및 실시간 검색 키워드 하이라이팅을 성공적으로 개발했습니다.
    • Weak ETag 캐싱 (기법 B): 인덱스의 총 문서 개수 + 세그먼트 생성 횟수 + 검색 쿼리 해시 정보를 결합하여 Weak ETag(W/"...")를 조합하고, 무변경 시 304 Not Modified를 즉시 리턴하는 고도화된 캐시 필터를 적용했습니다.
    • pytest 통합 테스트를 구축하여 5가지 성공 시나리오(인덱싱, 하이라이트 검색, 캐시 히트, 데이터 변경 시 캐시 만료, 쿼리 간 캐시 격리)를 자동 검증 완료했습니다.
  • 배운 점 & 트러블슈팅:
    • VS Code/Cursor 내부의 Python 및 Pyrefly 확장 프로그램 충돌로 인해 가상환경 인터프리터 경로가 꼬이던 블로커를 확장 프로그램 정리로 해결했습니다.
    • SearchResponse | Response 유니온 리턴 시 Pydantic의 Response 객체 해석 실패로 인해 FastAPIError가 발생하던 현상을 response_model=None 지정을 통해 해결했습니다.
    • Rust 모듈 코드 수정 후 변경사항이 파이썬 가상환경에 즉시 반영되지 않던 현상을 uv pip install --reinstall --editable ../engine 명령을 통한 FFI 강제 컴파일 재설치로 극복했습니다.
profile
Peter J Online Space - since July 2020 | 아무데서나 채용해줬으면 좋겠다 (지금은 학생 때 하던 거 아무거나 공부하고 있고요, 취업시켜 주시면 그 분야로 공부할게요)

0개의 댓글