FastAPI + Cloud Run Job 기반 크롤링 자동화 파이프라인 구축 후기

yeahcold·2025년 5월 26일

Data Engineering

목록 보기
18/20

왜 Cloud Run Job으로 전환했는가?

초기에는 크롤러를 GCP Compute Engine 인스턴스에서 직접 돌리고 있었습니다. 하지만 크롤러의 특성상 사용자가 동시에 여러 개의 크롤 요청을 보낼 수 있고, 각 크롤링 작업은 Selenium과 Chrome을 사용하는 무거운 작업입니다. 이 때문에 다음과 같은 문제점이 있었습니다:

  • 동시 실행 불가능: 하나의 VM에서 병렬로 실행하기 어렵고 thread/process 충돌 위험 존재
  • 리소스 낭비: 크롤러는 짧게 실행되고 끝나지만 VM은 계속 켜져 있음
  • 스케일링 어려움: 수요에 따라 리소스 확장이 어려움

그래서 다음 조건을 만족하는 구조로 리팩터링하기로 했습니다:

"요청이 들어올 때마다 하나의 독립된 크롤러 컨테이너를 실행하고, 크롤링이 끝나면 컨테이너를 종료하자."

이 구조를 만족시킬 수 있는 것이 바로 Cloud Run Job입니다.


전체 구조 개요

Client (POST /dispatch)
        ↓
[Cloud Run 서비스] dispatcher (FastAPI)
        ↓ REST API 호출
[Cloud Run Job] media-plan-crawler 실행
        ↓
 크롤링 후 자동 종료

Dispatcher 구축: FastAPI 기반

dispatcher는 사용자의 요청을 받아, 해당 요청 내용을 기반으로 Cloud Run Job을 실행하는 역할을 합니다.

main.py (중요 부분 발췌)

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import os, json, logging
from google.auth import default
from google.auth.transport.requests import AuthorizedSession

app = FastAPI()
logging.basicConfig(format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO)

PROJECT_ID = "innocean-mkt-datalake-01"
REGION = "asia-northeast3"
JOB_NAME = "crawler-job"

@app.get("/")
def health_check():
    return {"status": "ok"}

@app.post("/dispatch")
async def dispatch_job(req: Request):
    try:
        data = await req.json()
        plan_id = str(data.get("plan_id"))
        platforms = data.get("platforms")
        options = data.get("options")

        if not plan_id or not platforms or not options:
            return JSONResponse(status_code=400, content={"error": "Missing required fields"})

        env_vars = {
            "PLAN_ID": plan_id,
            "PLATFORMS": json.dumps(platforms),
            "OPTIONS_JSON": json.dumps(options),
        }

        credentials, _ = default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
        authed_session = AuthorizedSession(credentials)

        url = f"https://{REGION}-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/{PROJECT_ID}/jobs/{JOB_NAME}:run"

        body = {
            "overrides": {
                "containerOverrides": [
                    {"name": JOB_NAME, "env": [{"name": k, "value": v} for k, v in env_vars.items()]}
                ]
            }
        }

        response = authed_session.post(url, json=body)
        if response.status_code != 200:
            logging.error(f"Job execution failed: {response.text}")
            return JSONResponse(status_code=500, content={"error": "Job execution failed", "details": response.text})

        return {"message": "Job triggered"}

    except Exception as e:
        logging.exception("Unhandled exception")
        return JSONResponse(status_code=500, content={"error": "Unexpected error", "details": str(e)})

Dockerfile for Dispatcher

FROM python:3.10-slim

WORKDIR /app
COPY . /app

RUN pip install --no-cache-dir -r requirements.txt

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"]

requirements.txt

fastapi==0.110.0
uvicorn==0.29.0
google-auth
google-auth-httplib2
google-auth-oauthlib
google-api-python-client

Cloud Run Job 생성 스크립트

#!/bin/bash
set -e

PROJECT_ID="innocean-mkt-datalake-01"
REGION="asia-northeast3"
REPO="media-plan"
IMAGE_NAME="media-plan-crawler"
IMAGE_URI="asia-northeast3-docker.pkg.dev/$PROJECT_ID/$REPO/$IMAGE_NAME"
SERVICE_ACCOUNT="media-plan-crawler-sa@$PROJECT_ID.iam.gserviceaccount.com"

JOB_NAME="crawler-job"
gcloud beta run jobs create $JOB_NAME \
  --image $IMAGE_URI \
  --region $REGION \
  --memory 8Gi \
  --cpu 4 \
  --task-timeout 900s \
  --service-account $SERVICE_ACCOUNT

한 번만 실행하면 Job은 계속 재사용 가능하며, dispatcher에서 env만 매번 달리 넘겨주면 됩니다.


테스트

curl -X POST https://crawler-dispatcher-xxxxxx.run.app/dispatch \
  -H "Content-Type: application/json" \
  -d '{
    "plan_id": 12345,
    "platforms": ["dv360", "meta"],
    "options": {
      "dv360": { "plan_name": "DV Plan", "budget": 8000, "location": "서울" },
      "meta": { "plan_name": "Meta Plan", "location": "서울" }
    }
  }'

정리

  • Compute Engine → Cloud Run Job 전환으로 동시성 문제 해결
  • FastAPI 기반 Dispatcher가 사용자 요청을 수신하고 Job 실행
  • Cloud Run Job은 컨테이너 단위로 크롤링을 실행하고 종료
  • 전체 구조가 확장성유지보수성을 모두 만족할 수 있게 되었습니다.
profile
Software Engineer

0개의 댓글