비동기 작업 제어 방식 비교
비동기 작업을 처리하다 보면, 자연스럽게 동시성 제어가 필요해짐
한번에 너무 많은 작업이 실행되지 않도록 제한해야함
이러한 상황에서 Python에서는 보통 다음 두 가지 접근이 사용됨
asyncio.Semaphore
(혹은 BoundedSemaphore
)항목 | asyncio.Semaphore | Celery |
---|---|---|
목적 | 동시에 실행할 수 있는 작업 수 제한 | 작업을 큐에 넣고 비동기 백그라운드에서 실행 |
동작 방식 | 메모리 내 세마포어 객체로 컨텍스트 관리 | 메시지 브로커(Redis 등)에 작업 전송 후 워커가 처리 |
적용 범위 | 단일 프로세스 내부 | 멀티 프로세스, 멀티 서버 가능 |
설치/설정 | 내장 라이브러리, 즉시 사용 가능 | 외부 구성 필요 (브로커, 워커 등) |
asyncio.Semaphore
는 이벤트 루프 기반의 내부 제어를 위한 도구로, 동시 실행되는 코루틴 수를 제한할 수 있음Celery
는 작업 자체를 큐로 분리하여 별도 프로세스/서버에서 처리하는 구조Semaphore
는 한 프로세스 내의 동시성만 제한할 수 있음Celery
는 워커 수, 워커 클러스터 수 등 전체 작업 처리량을 전역적으로 제어할 수 있음Semaphore
는 수직 확장(서버 성능 업그레이드)정도만 가능Celery
는 수평 확장(워커/서버 추가)이 가능하므로 대규모 시스템에서도 효과적으로 사용됨초기에는 asyncio.Semaphore
를 사용해서 다음과 같이 동시 실행수를 제한했음
import asyncio
import random
semaphore = asyncio.Semaphore(3)
async def process_task(task_id: int):
async with semaphore:
print(f"[{task_id}] 시작")
await asyncio.sleep(random.uniform(1, 3)) # LLM 호출 등
print(f"[{task_id}] 완료")
async def main():
tasks = [process_task(i) for i in range(10)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
이 방식의 장점은 특정 함수 내에서 작업 수를 쉬벡 제한 할 수 있고, 별도 설정이 필요없다는 것
장점
한계
이와 달리 Celery는 작업을 큐에 넣고, 독립된 워커(worker)가 작업을 처리함
브로커(Redis, RabbitMQ 등)는 작업을 큐에 저장하고,
워커는 이를 하나씩 꺼내서 실행
Celery에서는 워커 수나 실행 제한을 통해 글로벌하게 작업 병렬성을 조절할 수 있음
예시: 동시 3개 워커만 실행
celery -A celery_app worker --concurrency=3
or
작업 단위 제한
# celery_app.py
from celery import Celery
celery_app = Celery(
"my_worker",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
#task.py
from celery_app import celery_app
import time
import random
@celery_app.task(rate_limit="3/s") # 초당 3건 제한
def process_task(task_id: int):
print(f"[{task_id}] 시작")
time.sleep(random.uniform(1, 3)) # LLM 호출 등
print(f"[{task_id}] 완료")
return f"작업 {task_id} 완료"
# main.py
from tasks import process_task
if __name__ == "__main__":
for i in range(10):
result = process_task.delay(i)
print(f"요청 {i} → task_id: {result.id}")
# main.py
from fastapi import FastAPI
from tasks import process_task
app = FastAPI()
@app.get("/run")
def run_task(task_id: int):
result = process_task.delay(task_id)
return {"task_id": result.id, "status": "queued"}
asyncio.Semaphore
asyncio.Semaphore
Celery
Celery
Celery
Celery
항목 | asyncio.Semaphore | Celery |
---|---|---|
단순성 | 높음 | 낮음 (설정 필요) |
확장성 | 낮음 | 높음 (분산 처리 가능) |
동시성 제어 | 로컬 수준 | 전역 수준 |
작업 추적 | 불가능 | 가능 |
실패 복구 | 없음 | 있음 (재시도 등) |
asyncio.semaphore
는 간단한 동시성 제어를 빠르게 구현할 수 있는 좋은 선택이지만, 멀티 프로세스/서버 환경에서는 한계가 분명Celery
는 구조적으로 작업 분리, 글로벌 동시성 제어, 작업 추적 및 복구 기능 등을 제공하며, 규모가 커지거나 무거운 시스템에서는 필수적인 도구가 될 수 있음