이제 본격적으로 대용량 텍스트를 빠르게 벌크 색인하는 API와
Nori 분석기를 연동해 하이라이팅을 제공하는 검색 API를 구축할 차례다.
작성한 API를 모아둘 디렉토리 routers 를 생성하고
Weak ETag를 활용해 캐싱 히트 시 304를 먼저 리턴하는 최적화 로직을 사용하여
검색 API를 작성한다.
gateway/app/routers/search.pyimport logging import hashlib from fastapi import APIRouter, HTTPException, Request, Response from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_bulk import fast_text_engine from ..config import settings from ..schemas import ( IndexRequest, IndexResponse, SearchResponse, SearchHit, IndexStatsResponse, ) logger = logging.getLogger("gateway.search") router = APIRouter(prefix="/api", tags=["search"]) async def get_index_stats(es_client: AsyncElasticsearch, index_name: str) -> IndexStatsResponse: """Elasticsearch 인덱스의 현재 문서 수와 세그먼트 수를 조회합니다.""" try: stats = await es_client.indices.stats(index=index_name) index_stats = stats["indices"][index_name]["total"] doc_count = index_stats["docs"]["count"] segment_count = index_stats["segments"]["count"] return IndexStatsResponse(document_count=doc_count, segment_count=segment_count) except Exception as e: logger.warning(f"인덱스 통계 조회 실패: {e}") return IndexStatsResponse(document_count=0, segment_count=0) def generate_weak_etag(stats: IndexStatsResponse, query: str) -> str: """인덱스 통계 정보와 검색 쿼리 문자열을 결합하여 Weak ETag를 생성합니다. 형식: W/"<doc_count>-<segment_count>-<query_hash>" """ query_hash = hashlib.md5(query.strip().encode("utf-8")).hexdigest()[:8] return f'W/"{stats.document_count}-{stats.segment_count}-{query_hash}"' @router.post("/index") async def index_data(request: Request, body: IndexRequest) -> IndexResponse: """Rust FFI 엔진을 호출하여 대용량 더미 데이터를 생성한 뒤 Elasticsearch Bulk API를 활용해 고속 색인합니다. """ es_client: AsyncElasticsearch = request.app.state.es_client index_name = settings.ELASTICSEARCH_INDEX # Rust 엔진을 통해 대용량 더미 데이터 10만 건 생성 try: text_list = fast_text_engine.generate_dummy_data(body.lines) logger.info(f"Rust FFI를 통해 {len(text_list)}건의 더미 텍스트를 생성했습니다.") except Exception as e: logger.error(f"더미 데이터 생성 실패: {e}") raise HTTPException(status_code=500, detail="Rust FFI 연산 중 오류가 발생했습니다.") # Bulk API 액션 정의 actions = [ { "_index": index_name, "_id": f"doc-{i}", "_source": { "line_num": i + 1, "text": line } } for i, line in enumerate(text_list) ] # 비동기 Bulk 색인 try: success, failed = await async_bulk(es_client, actions) logger.info(f"Bulk 색인 성공: {success} 건, 실패: {len(failed) if isinstance(failed, list) else failed} 건") await es_client.indices.refresh(index=index_name) return IndexResponse(status="success", indexed_count=success) except Exception as e: logger.error(f"색인 중 에러 발생: {e}") raise HTTPException(status_code=500, detail="색인 작업 중 에러가 발생했습니다.") @router.get("/search", response_model=None) async def search_data(request: Request, response: Response, q: str, from_idx: int = 0, size: int = 10) -> SearchResponse | Response: """Nori 분석기를 사용해 본문을 풀텍스트 검색하고 하이라이팅을 제공합니다. 검색 질의 전에 Weak ETag를 먼저 검사하여 캐시 히트 시 304를 반환합니다. """ es_client: AsyncElasticsearch = request.app.state.es_client index_name = settings.ELASTICSEARCH_INDEX # 인텍스 통계 가져와 ETag 검증 stats = await get_index_stats(es_client, index_name) etag_val = generate_weak_etag(stats, q) if_none_match = request.headers.get("If-None-Match") if if_none_match == etag_val and stats.document_count > 0: logger.info(f"검색 캐시 히트! (304 Not Modified) - Query: '{q}', Etag: {etag_val}") return Response(status_code=304) # ES 쿼리 생성 query_body = { "from": from_idx, "size": size, "query": { "match": { "text": { "query": q, "analyzer": "nori_analyzer" } } }, "highlight": { "fields": { "text": {} } } } try: res = await es_client.search(index=index_name, body=query_body) total_hits = res["hits"]["total"]["value"] hits_data =[] for hit in res["hits"]["hits"]: source = hit["_source"] highlight = hit.get("highlight", {}).get("text", []) hits_data.append( SearchHit( id=hit["_id"], text=source["text"], highlight=highlight ) ) # Weak Etag 캐시 헤더 설정 response.headers["ETag"] = etag_val response.headers["Cache-Control"] = "public, max-age=0, must-revalidate" return SearchResponse(total=total_hits, hits=hits_data) except Exception as e: logger.error(f"검색 쿼리 실행 실패: {e}") raise HTTPException(status_code=500, detail="검색 쿼리 실행 중 에러가 발생했습니다.")
🤖 AI AGENT |
search_data()에서response도 받는 이유FastAPI는 의존성 주입(Dependency Injection) 방식으로 작동합니다. 라우터 함수 매개변수에
request: Request, response: Response라는 타입 힌팅을 적어두면, FastAPI가 이 엔드포인트가 호출될 때 현재 들어온 HTTP 요청 정보(Request) 와 앞으로 나갈 HTTP 응답 정보(Response) 객체를 알아서 생성해 함수 내부로 찔러 넣어 줍니다.이 두 객체는 다음과 같이 활용됩니다.
request: Request객체:
- 목적: 클라이언트가 보낸 HTTP 헤더 중에서 캐시 확인용 헤더인
If-None-Match를 읽기 위해 사용합니다 (request.headers.get("If-None-Match")).- 또한, 저희가
lifespan시점에 생성해서 앱 전역 상태에 저장해 둔es_client를 꺼내 쓰기 위해 사용합니다 (request.app.state.es_client).response: Response객체:
- 목적: 검색 결과를 클라이언트에게 정상적으로 돌려줄 때, HTTP 응답 헤더 영역에 캐시 관련 헤더(
ETag,Cache-Control)를 직접 심어주기 위해 사용합니다.response.headers["ETag"] = etag_val처럼 값을 채워 넣으면, FastAPI가 이 응답 정보와 본문(Body) 데이터를 합쳐서 최종 클라이언트 브라우저로 패킷을 쏘아 보냅니다.
🤖 AI AGENT |
search_data()의response_model=None이를 생략하면 다음과 같은 오류가 발생합니다.
Invalid args for response field! Hint: check that app.schemas.SearchResponse | starlette.responses.Response is a valid Pydantic field type.최신 FastAPI는 함수의 반환 타입 힌트(
-> SearchResponse | Response)를 기반으로 응답 스펙을 자동 생성하고 데이터를 직렬화하려고 시도합니다. 하지만Response(Starlette/FastAPI 기본 응답 객체)는 Pydantic 모델이 아니기 때문에, FastAPI가 이를 필드 타입으로 해석하지 못해 에러가 발생한 것입니다.💡 해결 방법
데코레이터에response_model=None을 추가하여 FastAPI가 반환 타입 힌트로부터 자동 응답 모델을 생성하지 않도록 비활성화해주시면 됩니다. (에러 메시지 힌트에서도 이 방법을 권장하고 있습니다.)
작성한 라우터를 등록한다.
gateway/app/main.pyimport logging from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, Request, Response from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware from brotli_asgi import BrotliMiddleware import hashlib from elasticsearch import AsyncElasticsearch import fast_text_engine from .config import settings from .schemas import HealthResponse, TextDataResponse from .routers import search # (중략) app = FastAPI( title="Fast Text Search Gateway", description="대용량 텍스트 최적화 전송 및 Elasticsearch 검색을 처리하는 API 게이트웨이", version="0.1.0", lifespan=lifespan ) app.include_router(search.router) app.add_middleware(GZipMiddleware, minimum_size=1000) app.add_middleware(BrotliMiddleware, quality=4, minimum_size=1000) # (후략)
서버를 가동하고 테스트를 해 보면,
[터미널 A | 게이트웨이 실행]
~/workspace/fast-text-search/gateway$ uv run uvicorn app.main:app --reload
[터미널 B | 응답 확인]
~$ curl -X POST http://localhost:8000/api/index \ -H "Content-Type: application/json" \ -d '{"lines": 10}' {"status":"success","indexed_count":10}%~$ curl -i "http://localhost:8000/api/search?q=test&size=3" HTTP/1.1 200 OK date: Tue, 23 Jun 2026 02:24:24 GMT server: uvicorn content-length: 21 content-type: application/json etag: W/"10-1-098f6bcd" cache-control: public, max-age=0, must-revalidate {"total":0,"hits":[]}%~$ curl -i -H 'If-None-Match: W/"10-1-098f6bcd"' "http://localhost:8000/api/search?q=test&size=3" HTTP/1.1 304 Not Modified date: Tue, 23 Jun 2026 02:28:15 GMT server: uvicorn데이터를 추가하면
~$ curl -X POST http://localhost:8000/api/index \ -H "Content-Type: application/json" \ -d '{"lines": 10}' {"status":"success","indexed_count":10}%~$ curl -i -H 'If-None-Match: W/"10-2-9ece2a1b"' "http://localhost:8000/api/search?q=test&size=3" HTTP/1.1 200 OK date: Tue, 23 Jun 2026 02:30:42 GMT server: uvicorn content-length: 21 content-type: application/json etag: W/"10-1-098f6bcd" cache-control: public, max-age=0, must-revalidate {"total":0,"hits":[]}%응답이 있는 경우
~$ curl -i -G "http://localhost:8000/api/search" \ --data-urlencode "q=테스트" \ -d "size=3" HTTP/1.1 200 OK date: Tue, 23 Jun 2026 02:33:52 GMT server: uvicorn content-length: 993 content-type: application/json etag: W/"10-1-3b6e8490" cache-control: public, max-age=0, must-revalidate {"total":10,"hits":[{"id":"doc-0","text":"이것은 Rust 엔진에서 생성된 0번째 줄 더미 데이터입니다. 대용량 페이로드 최적화 테스트를 위해 반복되는 텍스트 세그먼트입니다.","highlight":["대용량 페이로드 최적화 <em>테스트</em>를 위해 반복되는 텍스트 세그먼트입니다."]},{"id":"doc-1","text":"이것은 Rust 엔진에서 생성된 1번째 줄 더미 데이터입니다. 대용량 페이로드 최적화 테스트를 위해 반복되는 텍스트 세그먼트입니다.","highlight":["대용량 페이로드 최적화 <em>테스트</em>를 위해 반복되는 텍스트 세그먼트입니다."]},{"id":"doc-2","text":"이것은 Rust 엔진에서 생성된 2번째 줄 더미 데이터입니다. 대용량 페이로드 최적화 테스트를 위해 반복되는 텍스트 세그먼트입니다.","highlight":["대용량 페이로드 최적화 <em>테스트</em>를 위해 반복되는 텍스트 세그먼트입니다."]}]}%~$ curl -i -H 'If-None-Match: W/"10-1-3b6e8490"' \ -G "http://localhost:8000/api/search" \ --data-urlencode "q=테스트" \ -d "size=3" HTTP/1.1 304 Not Modified date: Tue, 23 Jun 2026 02:35:55 GMT server: uvicorn
앞서 수동으로 진행한 테스트를 pytest로 자동화하여 검증하는 코드를 작성한다.
gateway/tests/test_search.pyimport pytest from fastapi.testclient import TestClient from app.main import app @pytest.fixture(scope="module") def client() -> TestClient: with TestClient(app) as c: yield c def test_bulk_indexing(client: TestClient) -> None: """10줄의 더미 데이터를 Elasticsearch에 색인하는 API를 테스트합니다.""" response = client.post("/api/index", json={"lines": 10}) assert response.status_code == 200 data = response.json() assert data["status"] == "success" assert data["indexed_count"] == 10 def test_search_and_highlighting(client: TestClient) -> None: """한글 '테스트' 검색 시 Nori 형태소 분석 및 하이라이팅이 적용되는지 검증합니다.""" response = client.get("/api/search?q=테스트&size=3") assert response.status_code == 200 data = response.json() assert data["total"] > 0 assert len(data["hits"]) > 0 first_hit_highlight = data["hits"][0]["highlight"] assert any("<em>테스트</em>" in hl for hl in first_hit_highlight) assert "ETag" in response.headers assert response.headers["ETag"].startswith('W/"') assert "Cache-Control" in response.headers def test_search_cache_hit_304(client: TestClient) -> None: """동일한 쿼리에 대해 If-None-Match 헤더를 전달했을 때 304 Not Modified를 반환하는지 검증합니다.""" first_response = client.get("/api/search?q=테스트&size=3") assert first_response.status_code == 200 etag = first_response.headers.get("ETag") assert etag is not None headers = {"If-None-Match": etag} second_response = client.get("/api/search?q=테스트&size=3", headers=headers) assert second_response.status_code == 304 assert second_response.text == "" def test_search_cache_miss_after_new_index(client: TestClient) -> None: """색인이 추가되어 데이터가 업데이트되면 기존 ETag 캐시가 만료(200 OK 및 신규 ETag 발급)되는지 검증합니다.""" first_response = client.get("/api/search?q=테스트&size=3") assert first_response.status_code == 200 old_etag = first_response.headers.get("ETag") index_response = client.post("/api/index", json={"lines": 5}) assert index_response.status_code == 200 headers = {"If-None-Match": old_etag} second_response = client.get("/api/search?q=테스트&size=3", headers=headers) assert second_response.status_code == 200 new_etag = second_response.headers.get("ETag") assert new_etag != old_etag def test_search_with_different_query(client: TestClient) -> None: """다른 검색어에 대해서는 정상적으로 검색 결과와 새로운 ETag를 반환하는지 검증합니다.""" first_response = client.get("/api/search?q=테스트&size=3") assert first_response.status_code == 200 old_etag = first_response.headers.get("ETag") second_response = client.get("/api/search?q=데이터&size=3") assert second_response.status_code == 200 new_etag = second_response.headers.get("ETag") assert new_etag != old_etag
테스트를 실행해 보면,
~/workspace/fast-text-search/gateway$ uv run pytest tests/test_search.py -v ================== test session starts ================== platform darwin -- Python 3.12.13, pytest-9.1.0, pluggy-1.6.0 -- /Users/edenjint3927/workspace/fast-text-search/gateway/.venv/bin/python3 cachedir: .pytest_cache rootdir: /Users/edenjint3927/workspace/fast-text-search/gateway configfile: pyproject.toml plugins: anyio-4.13.0 collected 5 items tests/test_search.py::test_bulk_indexing PASSED [ 20%] tests/test_search.py::test_search_and_highlighting PASSED [ 40%] tests/test_search.py::test_search_cache_hit_304 PASSED [ 60%] tests/test_search.py::test_search_cache_miss_after_new_index PASSED [ 80%] tests/test_search.py::test_search_with_different_query PASSED [100%] =================== 5 passed in 0.24s ===================
🤖 AI AGENT | 학습 회고
- 완료일: 2026-06-23
- 성과:
- Elasticsearch 9.4.2 + Kibana 9.4.2 기반 단일 노드 스택을 Nori 분석기를 탑재한 Docker 이미지로 구축 및 가동했습니다.
- Rust FFI 엔진에서 생성한 데이터를 Elasticsearch Bulk API로 초고속 색인하는 파이프라인을 완성했습니다.
- Query DSL의
match및highlight기능을 결합하여, 형태소 기반 한글 풀텍스트 검색 및 실시간 검색 키워드 하이라이팅을 성공적으로 개발했습니다.- Weak ETag 캐싱 (기법 B): 인덱스의 총 문서 개수 + 세그먼트 생성 횟수 + 검색 쿼리 해시 정보를 결합하여 Weak ETag(
W/"...")를 조합하고, 무변경 시304 Not Modified를 즉시 리턴하는 고도화된 캐시 필터를 적용했습니다.pytest통합 테스트를 구축하여 5가지 성공 시나리오(인덱싱, 하이라이트 검색, 캐시 히트, 데이터 변경 시 캐시 만료, 쿼리 간 캐시 격리)를 자동 검증 완료했습니다.- 배운 점 & 트러블슈팅:
- VS Code/Cursor 내부의 Python 및 Pyrefly 확장 프로그램 충돌로 인해 가상환경 인터프리터 경로가 꼬이던 블로커를 확장 프로그램 정리로 해결했습니다.
SearchResponse | Response유니온 리턴 시 Pydantic의 Response 객체 해석 실패로 인해FastAPIError가 발생하던 현상을response_model=None지정을 통해 해결했습니다.- Rust 모듈 코드 수정 후 변경사항이 파이썬 가상환경에 즉시 반영되지 않던 현상을
uv pip install --reinstall --editable ../engine명령을 통한 FFI 강제 컴파일 재설치로 극복했습니다.