[Python]Celery vs asyncio.Semaphore

류지수·2025년 7월 17일
0

비동기 작업 제어 방식 비교
비동기 작업을 처리하다 보면, 자연스럽게 동시성 제어가 필요해짐
한번에 너무 많은 작업이 실행되지 않도록 제한해야함
이러한 상황에서 Python에서는 보통 다음 두 가지 접근이 사용됨

  • asyncio.Semaphore (혹은 BoundedSemaphore)
  • Celery (비동기 작업 큐)

1. 개념 비교

항목asyncio.SemaphoreCelery
목적동시에 실행할 수 있는 작업 수 제한작업을 큐에 넣고 비동기 백그라운드에서 실행
동작 방식메모리 내 세마포어 객체로 컨텍스트 관리메시지 브로커(Redis 등)에 작업 전송 후 워커가 처리
적용 범위단일 프로세스 내부멀티 프로세스, 멀티 서버 가능
설치/설정내장 라이브러리, 즉시 사용 가능외부 구성 필요 (브로커, 워커 등)

2. 어떤 차이가 있을까

구조적 차이

  • asyncio.Semaphore는 이벤트 루프 기반의 내부 제어를 위한 도구로, 동시 실행되는 코루틴 수를 제한할 수 있음
  • Celery는 작업 자체를 큐로 분리하여 별도 프로세스/서버에서 처리하는 구조

동작 범위 차이

  • Semaphore는 한 프로세스 내의 동시성만 제한할 수 있음
    예를들어, Uvicorn 워커가 4개라면, 세마포어는 각 워커 안에서만 적용된다.
  • Celery는 워커 수, 워커 클러스터 수 등 전체 작업 처리량을 전역적으로 제어할 수 있음

확장성 차이

  • Semaphore는 수직 확장(서버 성능 업그레이드)정도만 가능
  • Celery수평 확장(워커/서버 추가)이 가능하므로 대규모 시스템에서도 효과적으로 사용됨

3. 실제 적용 경험

  • 요청 하나마다 LLM 호출, 전처리, 후처리 등이 포함된 무거운 작업 실행
  • 동시에 여러개의 작업을 호출

초기에는 asyncio.Semaphore를 사용해서 다음과 같이 동시 실행수를 제한했음

import asyncio
import random

semaphore = asyncio.Semaphore(3)

async def process_task(task_id: int):
    async with semaphore:
        print(f"[{task_id}] 시작")
        await asyncio.sleep(random.uniform(1, 3))  # LLM 호출 등
        print(f"[{task_id}] 완료")

async def main():
    tasks = [process_task(i) for i in range(10)]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

이 방식의 장점은 특정 함수 내에서 작업 수를 쉬벡 제한 할 수 있고, 별도 설정이 필요없다는 것

장점

  • 코드 구조가 단순
  • 설정 없이 바로 적용 가능
  • 단일 프로세스 환경에서 잘 작동

한계

  • 처리 지연: 요청이 많아지면 대기 큐가 길어져 전체 응답 속도 저하
  • 글로벌 제어 불가: Uvicorn 등에서 워커를 여러 개 띄우면 세마포어가 각 워커에 독립적으로 적용됨
  • 서버 확장 시 한계: 서버 인스턴스가 늘어나면 제한이 분산되어, 전체 동시성 제어가 무력화됨

4. Celery와의 비교

이와 달리 Celery는 작업을 큐에 넣고, 독립된 워커(worker)가 작업을 처리함
브로커(Redis, RabbitMQ 등)는 작업을 큐에 저장하고,
워커는 이를 하나씩 꺼내서 실행

Celery에서는 워커 수나 실행 제한을 통해 글로벌하게 작업 병렬성을 조절할 수 있음


예시: 동시 3개 워커만 실행

celery -A celery_app worker --concurrency=3

or

작업 단위 제한

# celery_app.py
from celery import Celery

celery_app = Celery(
    "my_worker",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)


#task.py
from celery_app import celery_app
import time
import random

@celery_app.task(rate_limit="3/s")  # 초당 3건 제한
def process_task(task_id: int):
    print(f"[{task_id}] 시작")
    time.sleep(random.uniform(1, 3))  # LLM 호출 등
    print(f"[{task_id}] 완료")
    return f"작업 {task_id} 완료"



# main.py
from tasks import process_task

if __name__ == "__main__":
    for i in range(10):
        result = process_task.delay(i)
        print(f"요청 {i} → task_id: {result.id}")

FastAPI + Celery 통합

# main.py
from fastapi import FastAPI
from tasks import process_task

app = FastAPI()

@app.get("/run")
def run_task(task_id: int):
    result = process_task.delay(task_id)
    return {"task_id": result.id, "status": "queued"}

5. 상황별 선택 기준

  • 간단한 동시성 제어 → asyncio.Semaphore
  • 단일 프로세스 테스트 → asyncio.Semaphore
  • 비동기 + 백그라운드 처리 → Celery
  • 멀티 서버 환경 → Celery
  • 작업이 오래 걸리거나 외부 서비스 호출 포함 → Celery
  • 작업 실패 재시도, 로깅이 필요 → Celery

6. 요약

항목asyncio.SemaphoreCelery
단순성높음낮음 (설정 필요)
확장성낮음높음 (분산 처리 가능)
동시성 제어로컬 수준전역 수준
작업 추적불가능가능
실패 복구없음있음 (재시도 등)

마무리

  • asyncio.semaphore는 간단한 동시성 제어를 빠르게 구현할 수 있는 좋은 선택이지만, 멀티 프로세스/서버 환경에서는 한계가 분명
  • Celery는 구조적으로 작업 분리, 글로벌 동시성 제어, 작업 추적 및 복구 기능 등을 제공하며, 규모가 커지거나 무거운 시스템에서는 필수적인 도구가 될 수 있음
profile
끄적끄적

0개의 댓글