대용량 파일 처리 시스템 구현하기

안성희·2025년 7월 30일

개요

  • 학원 광고나 채용공고를 보면 대용량 파일 처리 시스템을 강조하는 글 을 많이 읽어보았다.
  • 그래서 직접 구현해보기로 했다.

1차적으로 만든 코드

깃허브 주소

from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import FileResponse
import pandas as pd
from io import StringIO, BytesIO
import os, shutil, uuid
from datetime import datetime
from typing import Dict
import re

app = FastAPI()

# 파일 크기 제한 상수 (10MB) Dos 공격 방지
MAX_FILE_SIZE = 10 * 1024 *1024
# 청크 크게 제한 상수 (64KB) 메모리 효율성 확보 메모리 오버플로우 방지 및 시스템 안정성 확보
CHUNK_SIZE = 64* 1024
# 업로드된 파일 메타데이터 저장소
uploaded_files: Dict[str, dict] = {}

# 파일 크기 검증
def validate_file_size(file: UploadFile) -> int:
    file_size = 0                   # 변수 초기화
    file.file.seek(0, 2)            # 포인터 끝으로 이동
    file_size = file.file.tell()    # 실제 파일 사이즈 대입
    file.file.seek(0)               # 포인터 초기화

    if file_size > MAX_FILE_SIZE:
        raise HTTPException(
            status_code = 413,
            detail = f"파일 크기 제한 초과 ({MAX_FILE_SIZE//1024//1024}MB)"
        )
    return file_size

# 청크 단위로 파일 읽기
async def read_file_in_chunks(file: UploadFile) -> bytes:
    contents = b''
    while True:
        chunk = await file.read(CHUNK_SIZE)
        if not chunk:
            break
        contents += chunk
        if len(contents) > MAX_FILE_SIZE:
            raise HTTPException(413, "파일 크기 초과")
    return contents

# 파일 형식 및 데이터 검증
def validate_file_format_and_data(filename:str, contents: bytes) -> pd.DataFrame:
    # 파일 명에서 확장자 추출
    ext = filename.split(".")[-1].lower()
    try:
        if ext == "csv":
            df = pd.read_csv(StringIO(contents.decode("utf-8")))
        elif ext in ("xlsx", "xls"):
            df = pd.read_excel(BytesIO(contents))
        else:
            raise HTTPException(400, "지원되지 않는 파일 형식입니다.")
    except pd.errors.EmptyDataError:
        raise HTTPException(400, "파일에 읽을 수 있는 데이터가 없습니다.")
    
    if df.empty:
        raise HTTPException(400, "파일에 데이터가 없습니다.")
    if len(df.columns) != 1 or df.columns[0] != "customer_id":
        raise HTTPException(400, "헤더가 customer_id 하나만 있어야 합니다.")
    if len(df) == 0:
        raise HTTPException(400, "회원 목록이 비어있습니다.")
    
    return df

# 파일명 정제
def sanitize_filename(filename: str) -> str:
    # 기본 경로 제거
    sanitize_name = os.path.basename(filename)
    # 특수문자 반환
    sanitize_name = re.sub(r'[<>:"|?*]', '_', sanitize_name)
    # 제어문자 제거
    sanitize_name = re.sub(r'[\x00-\x1f]', '', sanitize_name)
    # 윈도우 예약어 처리
    window_reserved = (['CON', 'PRN', 'AUX', 'NULL'] + 
                       [f'CON{i}' for i in range(1, 10)] + 
                       [f'LPT{i}' for i in range(1, 10)])
    if sanitize_name.upper().split('.')[0] in window_reserved:
        sanitize_name = f"file_{sanitize_name}"

    if len(sanitize_name) > MAX_FILE_SIZE:
        raise HTTPException(400, f"파일명이 너무 깁니다 (최대 255자)")    

    return sanitize_name

# 파일을 디스크에 저장
def save_file_to_disk(file_id: str, sanitized_name: str, contents: bytes) -> str:
    upload_dir = os.path.abspath("uploads")
    os.makedirs(upload_dir, exist_ok=True)

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    safe_name = f"{file_id}_{timestamp}_{sanitized_name}"
    path = os.path.join(upload_dir, safe_name)

    final_path = os.path.abspath(path)
    if not final_path.startswith(upload_dir):
        raise HTTPException(400, "허용되지 않는 파일 경로입니다.")
    
    with open(path, "wb") as buf:
        buf.write(contents)

    return path


@app.post("/upload-file/")
async def upload_file(file: UploadFile = File(...)):
    # 파일 크기 검증
    file_size = validate_file_size(file)
    
    # 청크 단위로 파일 읽기
    contents = read_file_in_chunks(file) 

    # 파일 형식 및 데이터 검증
    df = validate_file_format_and_data(file.filename, contents)
    
    # 파일명 정제 
    sanitized_name = sanitize_filename(file.filename)

    # 파일 저장
    file_id = str(uuid.uuid4())
    path = save_file_to_disk(file_id, sanitized_name, contents)

    # 3. 메타데이터 저장
    uploaded_files[file_id] = {
        "path": path,
        "original_name": sanitized_name,
        "uploaded_at": datetime.now().isoformat()
    }

    # 4. 다운로드 URL 포함 응답
    return {
        "file_id": file_id,
        "original_filename": file.filename,
        "safe_filename" : sanitized_name,
        "download_url": f"/download/{file_id}"
    }

@app.get("/download/{file_id}")
async def download_file(file_id: str):
    # file_id 만으로 접근 허용
    info = uploaded_files.get(file_id)
    if not info:
        raise HTTPException(404, "파일을 찾을 수 없습니다.")
    if not os.path.exists(info["path"]):
        raise HTTPException(404, "서버에 파일이 없습니다.")
    
    # 파일 그대로 내려주기
    return FileResponse(
        path=info["path"],
        filename=info["original_name"],
        media_type="application/octet-stream"
    )

파일 업로드

  • 업로드 파일을 바로 디스크로 저장
  • 업로드 중에 파일 검증을 진행
  • 파일의 메타데이터는 메모리 내에 저장

파일 다운로드

  • 파일 다운로드 할 때는 서버에서 직접 반환

문제점

  • 현업에서의 대용량 파일은 몇GB 이상을 대용량 파일이라 부를 수 있다. 하지만 현재 코드는 10MB 제한한 상태
    • 대용량 파일도 처리 가능하게 변경 필요
  • 파일 검증시에 파일을 변수 안에 담아서 사용할 경우 파일의 용량이 커지거나 요청 횟수가 많아질 경우 메모리가 감당 못 하는 상황 발생 가능
    • 요청 횟수가 많아지거나 파일의 용량이 커지더라도 처리 가능하게 변경 필요
  • 파일의 메타데이터를 메모리 내에 저장 서버가 재시작 할 경우 데이터가 휘발됨
    • 메타데이터를 영구히 저장해야 함

해결방법

  • 파일 업로드는 외부 DB에 저장(S3)
    • S3 같은 AWS 서비스의 경우 결제 때문에 고민이 많이 된다. => LocalStack 사용
  • 파일 검증은 스트림 방식으로 수행
  • 메타데이터는 DB를 사용해서 저장

2차로 완성한 코드

깃허브 주소

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import boto3
import io
import csv
import uuid
from datetime import datetime

app = FastAPI()

# LocalStack S3 클라이언트 : Presigned URL 생성 등 특수 기능은 client에서만 지원
s3_client = boto3.client(
    "s3",
    aws_access_key_id="test",
    aws_secret_access_key="test",
    region_name="us-east-1",
    endpoint_url="http://localhost:4566",
)

# DynamoDB 리소스 : 테이블 아이템등 리소스 단위로 작업하는 경우가 많아 resource 방식 사용
dynamodb = boto3.resource(
    "dynamodb",
    endpoint_url="http://localhost:4566",
    region_name="us-east-1",
    aws_access_key_id="test",
    aws_secret_access_key="test",
)
# FileMetadata 테이블 객체 할당 해당 테이블이 없을경우 table 객체를 만들때는 오류가 일어나지 않지만 읽거나 쓰기 작업을 할 때 오류가 발생
table = dynamodb.Table("FileMetadata")


# Pydantic의 BaesModel을 상속받아 데이터 모델로 동작
class FileMetadata(BaseModel):
    filename: str
    size: int
    content_type: str
    uploaded_at: str


# Presigned URL 발급
@app.post("/upload-presigned-url/")
async def get_presigned_url_upload(metadata: FileMetadata):
    # 사용할 S3 bucket 이름
    bucket_name = "sample-bucket"
    # 업로드할 S3 오브젝트의 경로(키)를 파일명 기반으로 생성
    key = f"uploads/{metadata.filename}"
    try:
        presigned_put_url = s3_client.generate_presigned_url(
            ClientMethod="put_object",
            Params={
                "Bucket": bucket_name,
                "Key": key,
                "ContentType": metadata.content_type,
            },
            ExpiresIn=3600,
        )
        return {
            "upload_url": presigned_put_url,
            "s3_key": key,
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


# DynamoDB에 메타데이터 저장
@app.post("/save-metadata/")
def save_metadata(metadata: FileMetadata):
    file_id = str(uuid.uuid4())
    item = {
        # 파일 고유 식별자
        "file_id": file_id,
        # 파일 이름
        "filename": metadata.filename,
        # S3 버킷에 저장될 경로
        "s3_key": f"uploads/{metadata.filename}",
        # 파일 크기
        "size": metadata.size,
        # 파일의 MIME 타입
        "content_type": metadata.content_type,
        # 업로드 시각
        "uploaded_at": metadata.uploaded_at or datetime.utcnow().isoformat(),
    }
    try:
        table.put_item(Item=item)
        return {"file_id": file_id, "message": "메타데이터 저장 성공"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


# S3 스트리밍 검증 엔드포인트
class S3FileInfo(BaseModel):
    bucket: str
    key: str


@app.post("/validate-s3-csv/")
def validate_s3_csv(info: S3FileInfo):
    """
    S3에 저장된 대용량 CSV 파일을 스트리밍으로 읽으면서
    1. 헤더가 customer_id 하나인지
    2. 데이터가 비어있지 않은지
    를 검증합니다.
    """
    try:
        # Bucket의 Key 파일의 바이너리 스트림과 메타데이터를 response에 저장
        response = s3_client.get_object(Bucket=info.bucket, Key=info.key)
        # 바이너리스트림을 텍스트 스트림으로 변환
        stream = io.TextIOWrapper(response["Body"], encoding="utf-8")
        # CSV 파일을 한 줄씩 읽을 준비
        reader = csv.reader(stream)
        # 헤더 검증
        try:
            header = next(reader)
        except StopIteration:
            raise HTTPException(400, "파일이 비어 있습니다.")
        if len(header) != 1 or header[0] != "customer_id":
            raise HTTPException(400, "헤더가 customer_id 하나만 있어야 합니다.")
        # 데이터 유무 검증
        try:
            first_row = next(reader)
        except StopIteration:
            raise HTTPException(400, "회원 목록이 비어 있습니다.")
        # 검증 통과
        return {
            "bucket": info.bucket,
            "key": info.key,
            "header": header,
            "message": "파일 검증 통과",
        }
    except Exception as e:
        raise HTTPException(500, f"S3 파일 검증 중 오류 발생: {str(e)}")


@app.post("/download-presigned-url/")
async def get_presigned_url_download(metadata: FileMetadata):
    # 사용할 S3 bucket 이름
    bucket_name = "sample-bucket"
    # 다운로드할 S3 오브젝트의 경로(키)를 파일명 기반으로 생성
    key = f"uploads/{metadata.filename}"
    try:
        presigned_get_url = s3_client.generate_presigned_url(
            ClientMethod="get_object",
            Params={
                "Bucket": bucket_name,
                "Key": key,
            },
            ExpiresIn=3600,
        )
        return {
            "download_url": presigned_get_url,
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

파일 업로드

  • 클라이언트에서 서버로 파일 업로드 요청
  • 서버에서 S3에게 업로드용 Presigned_url 요청 및 클라이언트에게 반환
  • 클라이언트에서 Presigned_url을 사용해서 직접 S3에 업로드
  • 메타데이터는 Dynamo DB에 저장

파일 다운로드

  • 위와 비슷한 과정으로 파일 다운로드 진행

문제점

  • 같은 파일명을 업로드 하면 현재 코드상 덮어씌어지면서 기존의 파일이 사라짐
  • 현재 모든 과정을 사용자가 전부 요청을 해야하는데 일부 자동화가 필요
  • 해당 문제점들은 추후 개선 시도

결론

  • 클라이언트에서 서버로 서버에서 DB로 가는 흐름을 사용하면 서버가 가지는 부담이 커지게 된다.
  • 클라이언트에서 DB로 파일을 저장할 수 있는 흐름을 사용해야 한다.
  • 클라이언트에서 보내는 모든 요청을 DB가 허락하면 보안성에 문제가 생긴다.
  • DB는 특정 링크로만 다운로드나 업로드를 허락하는 기능이 필요하다.(presigned URL)
profile
무재다능 개발자

0개의 댓글