이번에는 비동기 모듈을 사용하기 위해 도입한 Celery에 대해 설명하려고 합니다.
실제 제 프로젝트에서는 1초 단위 데이터로 수백개의 센서 데이터를 받아 저장, 변환, 계산 하는 로직이 필요했습니다.
이걸 하나의 서비스 안에서 동작시킨다면 부하는 커지고 다운될 수 있거나 오류를 발생시킬 수 있는 위험도가 너무 높았고 시스템 안정성을 생각해서라도 분리하는게 좋다고 판단했습니다.
앞으로의 예시 코드는 Flask로 작성된 코드이나 DJango, FastAPI 모두 사용 가능함을 알려드립니다.
Flask로 API 서버를 개발하다 보면 이런 상황이 자주 생깁니다.
사용자 요청마다 오래 걸리는 연산(파일 업로드, 대용량 데이터 처리, 외부 API 호출 등)이 있음
요청이 끝날 때까지 기다리면 응답이 지연되고, 서버 리소스가 묶임
같은 작업이 동시에 여러 개 들어오면 서버가 과부하됨
이럴 때 필요한 게 바로 비동기(Asynchronous) 처리입니다.
즉, “요청을 받자마자 결과를 기다리지 않고, 백그라운드에서 따로 작업을 돌리는 방식”이죠.
Flask는 기본적으로 동기 처리 방식이라 이런 구조를 직접 구현하기 어렵습니다.
그래서 등장하는 도구가 바로 Celery입니다.
Celery는 Python 기반의 비동기 작업 큐(Task Queue) 프레임워크입니다.
한마디로, Flask(또는 다른 애플리케이션)가 요청한 “작업(job)”을
큐에 넣고, 백그라운드에서 따로 실행시켜주는 시스템입니다.
구조는 이렇게 생겼습니다
Flask (클라이언트)
↓
Broker (Redis, RabbitMQ 등)
↓
Celery Worker (실제 작업 수행)
Flask → “이 작업 좀 대신 처리해줘” 하고 Celery에 요청
Broker → 작업을 메시지 큐로 전달
Worker → 큐를 감시하다가 작업을 꺼내서 실행
아래는 Flask 앱에 Celery를 연결하는 기본적인 구조 예시입니다.
flask_celery_app/
├── app.py
├── tasks.py
└── requirements.txt
from flask import Flask, jsonify
from tasks import make_celery
app = Flask(__name__)
# Celery 설정
app.config.update(
CELERY_BROKER_URL="redis://localhost:6379/0", # 메시지 큐
CELERY_RESULT_BACKEND="redis://localhost:6379/1" # 결과 저장소
)
celery = make_celery(app)
@app.route("/process/<name>")
def process(name):
task = background_task.delay(name)
return jsonify({"task_id": task.id, "status": "processing..."})
@celery.task(name="tasks.background_task")
def background_task(name):
import time
time.sleep(5)
return f"Hello, {name}! 작업이 완료되었습니다."
if __name__ == "__main__":
app.run(debug=True)
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
broker=app.config["CELERY_BROKER_URL"],
backend=app.config["CELERY_RESULT_BACKEND"]
)
celery.conf.update(app.config)
# Flask context 안에서 Celery 실행
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
# 1️⃣ Redis 실행 (broker 역할)
redis-server
# 2️⃣ Celery Worker 실행
celery -A app.celery worker --loglevel=info
# 3️⃣ Flask 서버 실행
python app.py
1. 요청
curl http://127.0.0.1:5000/process/준규
2. 응답
{"task_id": "abc123", "status": "processing..."}
3. 로그 확인
[INFO] Task tasks.background_task succeeded in 5.002s: '작업이 완료되었습니다.'
이렇게 간단한 사용이 가능하나, 고려할 부분은 분명히 존재하기 마련입니다.
| 항목 | 설명 |
|---|---|
| 비동기 처리 | Flask 요청을 블로킹하지 않고, 작업을 백그라운드로 분리 |
| 분산 확장성 | 여러 Worker를 띄워 동시에 수백 개의 작업 병렬 처리 가능 |
| Retry 및 에러 복구 | 네트워크 오류나 예외 발생 시 자동 재시도 가능 |
| 결과 추적 가능 | Redis나 DB에 결과를 저장해 나중에 조회 가능 |
| 스케줄링 기능 내장 | 주기적 작업(cron, interval) 등록 가능 |
| 모니터링 도구(Flower) | 실시간으로 태스크 상태/실패/성공 모니터링 가능 |
| 항목 | 설명 |
|---|---|
| Broker 의존성 | Redis, RabbitMQ가 멈추면 큐 전달 불가 |
| 상태 관리 복잡성 | 여러 Worker 간 결과 동기화 필요 |
| 리소스 관리 필요 | Worker 프로세스가 많아지면 CPU 부담 |
| 초기 설정 복잡 | 간단한 작업엔 오히려 오버엔지니어링일 수 있음 |
실무에서 느낀건 다음과 같다. 백그라운드 처리가 가능함에 따라 부하 감소, 하지만 CPU 부담은 증가한다 하여 측정을 해보았을 때 무리 없음, 모니터링이 가능함과 동시에 제법 복잡한 설계가 필요한 정도?
그리고 저처럼 1초마다 task를 실행시키는 로직을 만든다 했을 때 가장 치명적인건 state를 가질 수 없다는 것이었습니다....
그래서 어떻게 했느냐? redis를 캐시처럼 쓰려했다가 혼났다.. IO 너무 늘리는 방법 쓰지말고 다른 방법을 찾아라..
결국 어떻게 했냐.. 더하기 빼기 곱하기 나누기 등의 계산은 별도의 모듈로 하고 해당 값을 task에 값으로 넘겨 해결했습니다.. 솔직히 맞는 방법인지는 모르겠습니다 여전히 모듈에 대한 부하 목적을 위해 개발한건데 이걸 이렇게 쓰면 문제가 되지 않을까 하는 생각을 계속 하게 되더군요..
@celery.task(bind=True, max_retries=3, default_retry_delay=10)
def unstable_task(self):
try:
...
except Exception as e:
self.retry(exc=e)
from celery import chain
chain(task_a.s(), task_b.s(), task_c.s())()
celery -A app.celery beat --loglevel=info
위 기능은 제법 많이 사용했고 유용했다.. 특히 체이닝이 편하더라
Queue 분리
긴 작업과 짧은 작업을 서로 다른 큐로 분리해 처리량 조절
celery -A app.celery worker -Q long_tasks --concurrency=2
celery -A app.celery worker -Q short_tasks --concurrency=4
Prefetch 조정
--prefetch-multiplier=1로 설정하면 Worker 간 작업 분배가 더 공정해짐
결과 자동 만료 설정
celery.conf.result_expires = 3600 # 1시간 후 자동 삭제
모니터링 툴 Flower
celery -A app.celery flower --port=5555
→ 웹 UI에서 태스크 상태, 재시도, 에러 로그 확인 가능
비동기 모듈을 사용하여 부하를 줄이고 싶다 생각이 들 경우엔 Celery 도입을 생각해보는 것도 좋은 방안이라 판단되었습니다.
물론 state를 어떻게 관리할 것인지에 대한 생각은 아직 해결 못했지만 추후 다시 사용방법에 대해 고민할 듯 싶습니다..