개요
- 학원 광고나 채용공고를 보면 대용량 파일 처리 시스템을 강조하는 글 을 많이 읽어보았다.
- 그래서 직접 구현해보기로 했다.
1차적으로 만든 코드
깃허브 주소
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import FileResponse
import pandas as pd
from io import StringIO, BytesIO
import os, shutil, uuid
from datetime import datetime
from typing import Dict
import re
app = FastAPI()
MAX_FILE_SIZE = 10 * 1024 *1024
CHUNK_SIZE = 64* 1024
uploaded_files: Dict[str, dict] = {}
def validate_file_size(file: UploadFile) -> int:
file_size = 0
file.file.seek(0, 2)
file_size = file.file.tell()
file.file.seek(0)
if file_size > MAX_FILE_SIZE:
raise HTTPException(
status_code = 413,
detail = f"파일 크기 제한 초과 ({MAX_FILE_SIZE//1024//1024}MB)"
)
return file_size
async def read_file_in_chunks(file: UploadFile) -> bytes:
contents = b''
while True:
chunk = await file.read(CHUNK_SIZE)
if not chunk:
break
contents += chunk
if len(contents) > MAX_FILE_SIZE:
raise HTTPException(413, "파일 크기 초과")
return contents
def validate_file_format_and_data(filename:str, contents: bytes) -> pd.DataFrame:
ext = filename.split(".")[-1].lower()
try:
if ext == "csv":
df = pd.read_csv(StringIO(contents.decode("utf-8")))
elif ext in ("xlsx", "xls"):
df = pd.read_excel(BytesIO(contents))
else:
raise HTTPException(400, "지원되지 않는 파일 형식입니다.")
except pd.errors.EmptyDataError:
raise HTTPException(400, "파일에 읽을 수 있는 데이터가 없습니다.")
if df.empty:
raise HTTPException(400, "파일에 데이터가 없습니다.")
if len(df.columns) != 1 or df.columns[0] != "customer_id":
raise HTTPException(400, "헤더가 customer_id 하나만 있어야 합니다.")
if len(df) == 0:
raise HTTPException(400, "회원 목록이 비어있습니다.")
return df
def sanitize_filename(filename: str) -> str:
sanitize_name = os.path.basename(filename)
sanitize_name = re.sub(r'[<>:"|?*]', '_', sanitize_name)
sanitize_name = re.sub(r'[\x00-\x1f]', '', sanitize_name)
window_reserved = (['CON', 'PRN', 'AUX', 'NULL'] +
[f'CON{i}' for i in range(1, 10)] +
[f'LPT{i}' for i in range(1, 10)])
if sanitize_name.upper().split('.')[0] in window_reserved:
sanitize_name = f"file_{sanitize_name}"
if len(sanitize_name) > MAX_FILE_SIZE:
raise HTTPException(400, f"파일명이 너무 깁니다 (최대 255자)")
return sanitize_name
def save_file_to_disk(file_id: str, sanitized_name: str, contents: bytes) -> str:
upload_dir = os.path.abspath("uploads")
os.makedirs(upload_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_name = f"{file_id}_{timestamp}_{sanitized_name}"
path = os.path.join(upload_dir, safe_name)
final_path = os.path.abspath(path)
if not final_path.startswith(upload_dir):
raise HTTPException(400, "허용되지 않는 파일 경로입니다.")
with open(path, "wb") as buf:
buf.write(contents)
return path
@app.post("/upload-file/")
async def upload_file(file: UploadFile = File(...)):
file_size = validate_file_size(file)
contents = read_file_in_chunks(file)
df = validate_file_format_and_data(file.filename, contents)
sanitized_name = sanitize_filename(file.filename)
file_id = str(uuid.uuid4())
path = save_file_to_disk(file_id, sanitized_name, contents)
uploaded_files[file_id] = {
"path": path,
"original_name": sanitized_name,
"uploaded_at": datetime.now().isoformat()
}
return {
"file_id": file_id,
"original_filename": file.filename,
"safe_filename" : sanitized_name,
"download_url": f"/download/{file_id}"
}
@app.get("/download/{file_id}")
async def download_file(file_id: str):
info = uploaded_files.get(file_id)
if not info:
raise HTTPException(404, "파일을 찾을 수 없습니다.")
if not os.path.exists(info["path"]):
raise HTTPException(404, "서버에 파일이 없습니다.")
return FileResponse(
path=info["path"],
filename=info["original_name"],
media_type="application/octet-stream"
)
파일 업로드
- 업로드 파일을 바로 디스크로 저장
- 업로드 중에 파일 검증을 진행
- 파일의 메타데이터는 메모리 내에 저장
파일 다운로드
문제점
- 현업에서의 대용량 파일은 몇GB 이상을 대용량 파일이라 부를 수 있다. 하지만 현재 코드는 10MB 제한한 상태
- 파일 검증시에 파일을 변수 안에 담아서 사용할 경우 파일의 용량이 커지거나 요청 횟수가 많아질 경우 메모리가 감당 못 하는 상황 발생 가능
- 요청 횟수가 많아지거나 파일의 용량이 커지더라도 처리 가능하게 변경 필요
- 파일의 메타데이터를 메모리 내에 저장 서버가 재시작 할 경우 데이터가 휘발됨
해결방법
- 파일 업로드는 외부 DB에 저장(S3)
- S3 같은 AWS 서비스의 경우 결제 때문에 고민이 많이 된다. =>
LocalStack 사용
- 파일 검증은 스트림 방식으로 수행
- 메타데이터는 DB를 사용해서 저장
2차로 완성한 코드
깃허브 주소
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import boto3
import io
import csv
import uuid
from datetime import datetime
app = FastAPI()
s3_client = boto3.client(
"s3",
aws_access_key_id="test",
aws_secret_access_key="test",
region_name="us-east-1",
endpoint_url="http://localhost:4566",
)
dynamodb = boto3.resource(
"dynamodb",
endpoint_url="http://localhost:4566",
region_name="us-east-1",
aws_access_key_id="test",
aws_secret_access_key="test",
)
table = dynamodb.Table("FileMetadata")
class FileMetadata(BaseModel):
filename: str
size: int
content_type: str
uploaded_at: str
@app.post("/upload-presigned-url/")
async def get_presigned_url_upload(metadata: FileMetadata):
bucket_name = "sample-bucket"
key = f"uploads/{metadata.filename}"
try:
presigned_put_url = s3_client.generate_presigned_url(
ClientMethod="put_object",
Params={
"Bucket": bucket_name,
"Key": key,
"ContentType": metadata.content_type,
},
ExpiresIn=3600,
)
return {
"upload_url": presigned_put_url,
"s3_key": key,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/save-metadata/")
def save_metadata(metadata: FileMetadata):
file_id = str(uuid.uuid4())
item = {
"file_id": file_id,
"filename": metadata.filename,
"s3_key": f"uploads/{metadata.filename}",
"size": metadata.size,
"content_type": metadata.content_type,
"uploaded_at": metadata.uploaded_at or datetime.utcnow().isoformat(),
}
try:
table.put_item(Item=item)
return {"file_id": file_id, "message": "메타데이터 저장 성공"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
class S3FileInfo(BaseModel):
bucket: str
key: str
@app.post("/validate-s3-csv/")
def validate_s3_csv(info: S3FileInfo):
"""
S3에 저장된 대용량 CSV 파일을 스트리밍으로 읽으면서
1. 헤더가 customer_id 하나인지
2. 데이터가 비어있지 않은지
를 검증합니다.
"""
try:
response = s3_client.get_object(Bucket=info.bucket, Key=info.key)
stream = io.TextIOWrapper(response["Body"], encoding="utf-8")
reader = csv.reader(stream)
try:
header = next(reader)
except StopIteration:
raise HTTPException(400, "파일이 비어 있습니다.")
if len(header) != 1 or header[0] != "customer_id":
raise HTTPException(400, "헤더가 customer_id 하나만 있어야 합니다.")
try:
first_row = next(reader)
except StopIteration:
raise HTTPException(400, "회원 목록이 비어 있습니다.")
return {
"bucket": info.bucket,
"key": info.key,
"header": header,
"message": "파일 검증 통과",
}
except Exception as e:
raise HTTPException(500, f"S3 파일 검증 중 오류 발생: {str(e)}")
@app.post("/download-presigned-url/")
async def get_presigned_url_download(metadata: FileMetadata):
bucket_name = "sample-bucket"
key = f"uploads/{metadata.filename}"
try:
presigned_get_url = s3_client.generate_presigned_url(
ClientMethod="get_object",
Params={
"Bucket": bucket_name,
"Key": key,
},
ExpiresIn=3600,
)
return {
"download_url": presigned_get_url,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
파일 업로드
- 클라이언트에서 서버로 파일 업로드 요청
- 서버에서 S3에게 업로드용
Presigned_url 요청 및 클라이언트에게 반환
- 클라이언트에서
Presigned_url을 사용해서 직접 S3에 업로드
- 메타데이터는 Dynamo DB에 저장
파일 다운로드
문제점
- 같은 파일명을 업로드 하면 현재 코드상 덮어씌어지면서 기존의 파일이 사라짐
- 현재 모든 과정을 사용자가 전부 요청을 해야하는데 일부 자동화가 필요
- 해당 문제점들은 추후 개선 시도
결론
- 클라이언트에서 서버로 서버에서 DB로 가는 흐름을 사용하면 서버가 가지는 부담이 커지게 된다.
- 클라이언트에서 DB로 파일을 저장할 수 있는 흐름을 사용해야 한다.
- 클라이언트에서 보내는 모든 요청을 DB가 허락하면 보안성에 문제가 생긴다.
- DB는 특정 링크로만 다운로드나 업로드를 허락하는 기능이 필요하다.(
presigned URL)