Flask-celery 비동기 처리

JinWooHyun·2021년 1월 18일
1

Celery는?

Flask는 프로세스를 동기적(Synchronous)으로 처리하기 때문에 업로드된 데이터 처리 또는 이메일 전송과 같이 다소 오래 걸리는 작업이나 무거운 연산 같은 경우 사용자는 웹 서버의 처리가 모두 마무리될 때까지 기다려야 합니다.

따라서 비동기로 실행하도록 직접 구현할 수 도 있지만 미리 구현된 비동기 작업 큐 라이브러리를 사용하는 것이 능률적인 면에서 뛰어납니다.

Celery는 Python으로 작성된 비동기 작업 큐(Asynchronous task queue/job queue)이기 때문에 Flask와 같은 Python Web Framework에 붙여서 사용하기 수월합니다.

Celery 구성

Celery는 크게 3가지 구성 요소가 있습니다.

1. Celery Client
백그라운드 작업을 요청하는데 사용 됩니다. 이 문서에서는 Flask가 Celery Client가 됩니다.

2. Celery Workers
Flask와 동일한 서버에서 백그라운드 작업을 실행하는데 사용 됩니다.

3. Message Broker
클라이언트는 메시지 큐를 통해 작업자와 통신 하며 Celery는 이러한 큐를 구현하는 여러 가지 방법을 지원합니다. 가장 일반적으로 사용되는 브로커는 RabbitMQ 및 Redis 입니다.

설치

$ sudo apt install python-celery-common -y
$ sudo apt-get install redis-server -y

$ pip install celery
$ pip install redis

우선 celery를 사용하기 위해서 Celery Workers인 celery를 설치하고 Message Broker로 redis를 설치해줍니다.

예제

# tasks.py
from celery import Celery

BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
app = Celery('tasks', broker=BROKER_URL, backend=CELERY_RESULT_BACKEND)

@app.task
def add(x, y):
  return x + y

BROKER_URL은 Message Brocker의 주소로 redis를 사용하기 때문에 redis 주소를 넣어줍니다.

CELERY_RESULT_BACKEND은 작업을 저장할 backend 주소로 똑같이 redis를 사용하므로 redis 주소를 넣어줍니다.

app이라는 객체를 Celery로 만들어줍니다. Celery의 첫 번째 인자는 Celery에서 사용할 이름입니다. 또한 Celery의 task로 add 함수를 사용하기 위해서 add 함수를 작성해줍니다.

$ celery -A tasks worker --loglevel=info
 -------------- celery@piclick-ai v5.0.5 (singularity)
--- ***** ----- 
-- ******* ---- Linux-5.4.0-58-generic-x86_64-with-debian-buster-sid 2021-01-15 15:50:37
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7fcaf4cd1f60
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/0
- *** --- * --- .> concurrency: 24 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . tasks.add

[2021-01-15 15:50:38,353: INFO/MainProcess] Connected to redis://localhost:6379/0
[2021-01-15 15:50:38,359: INFO/MainProcess] mingle: searching for neighbors
[2021-01-15 15:50:39,372: INFO/MainProcess] mingle: all alone
[2021-01-15 15:50:39,389: INFO/MainProcess] celery@piclick-ai ready.

실전

# slackbot.py
import requests
import json
import time
from flask import Flask, request, make_response, jsonify
from slacker import Slacker
from slack_cleaner2 import *
from celery import Celery

# ...

s = SlackCleaner(OAUTH_ACCESS_TOKEN)
# list of users
s.users
# list of all kind of channels
s.conversations

slack = Slacker(BOT_USER_OAUTH_ACCESS_TOKEN)

app = Flask(__name__)
app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task()
def asyncDeleteMessage(channel):
  for msg in s.msgs(filter(match(channel), s.conversations)):
    msg.delete(replies=True, files=True)

@app.route('/delete/channel_message', methods=['GET'])
def deleteMessage():
  channel = request.args.get('channel')
  task = asyncDeleteMessage.delay(channel)
  return make_response("Delete Channel Message END", 200)
  
# ...

슬랙 봇에 채널메세지를 삭제라하는 요청을 celery를 통해 비동기로 요청했습니다.

요청 전에 celery는 background에 켜둔 상태로 진행합니다.

celery -A slackbot.celery worker --loglevel=debug &

참고 사이트

https://core-research-team.github.io/2020-03-01/Celery-Flask-30e28a8974974f6cb55ed0c07d042671

profile
Unicorn Developer

0개의 댓글