[python] Celery란?

손채윤·2024년 4월 30일

수행에 오랜 시간과 메모리가 필요한 작업을 따로 실행하고 그러한 작업들을 스케줄링하기위해서 Celery에 대해서 알아볼 것이다.

1.Celery란?

-Celery는 python application에서 많은 양의 작업들을 나눠서 처리할 수 있도록 해주는 분산 시스템이다.
-python application에서 생성된 작업들의 실시간 처리와 작업 스케줄링등을 제공하는 task queue이다.

-Celery는 메시지를 전달하는 역할(publisher)과 메시지를 message broker에서 가져와 작업을 수행하는 worker역할을 담당한다.


1.1 Task Queue란?

-Task Queue는 멀티 쓰레드, 또는 멀티 디바이스를 통한 분산 처리에서 사용되는 개념이다.

-Task Queue에는 처리해야할 작업들, task들이 저장되고 이 task들이 순차적으로 worker process에 의해 처리된다. worker process들은 task를 처리하는 프로세스로, task queue에 수행해야할 task가 있는지 모니터링 한다.


-샐러리에서 client와 worker는 message broker를 통해서 소통한다. client에서 처리해야할 새로운 task를 생성하면 celery는 broker를 통해 worker에게 메시지를 전달하여 task를 처리하도록 한다.

-celery시스템은 여러개의 worker와 broker들로 구성될 수 있다.

-celery시스템을 구축하기 위해서 메시지들을 전달할 수 있는 broker가 필요한데, 보통 RabbitMQ또는 Redis broker를 사용한다.


1.2 Celery특징

-Celery는 운영하기 간단.

-worker와 client간의 통신문제에 대하여 자동으로 재시도하고, broker들을 primary/primary 또는 primary/replica등의 구조로 사용하여 고가용성을 보장할 수 있다.

-단일 Celery는 1분에 수맥만건의 task를 처리할 수 있는 속도를 제공한다. 그리고 Custom pool구현, 직렬변화기, 압축 구조, 로깅, 스케줄러, consumer, producer등등 다양한 기능을 사용하거나 확장 가능하다.


2.Celery동작 구조


웹서비스에서 발생한 요청(Task)를 message broker에서 받아 celery를 이용하여 분산처리를 진행한다. celery에서는 작업이 완료되는 특정이벤트에 db task를 수행한다.

[broker]->송신자의 이전 메시지 프로토콜로부터의 메시지를 수신자의 이전 메시지 프로토콜로 변환하는 중간 모듈이다.

3.Celery 사용법

3.1 설치

pip install celery

3.2 broker설정

celery를 사용하기 위해서는 task메시지들을 주고받기 위한 message broker가 필요하다.
이 예제에서는 redis사용.
아래의 명령어를 통해서 redis컨테이너를 실행한다. 이때 redis는 6379포트를 사용하도록 설정하였다.

docker run -d -p 6379:6379 redis

3.3 celery application

celery를 사용하기 위해서는 먼저 celery인스턴스를 생성해야 한다. celery application또는 app이라고 부르는데, 인스턴스는 celery에서 task를 생성하거나 worker를 관리하는 등의 작업을 하기위한 entry-point역할을 한다. 다른 모듈에서는 해당 인스턴스를 import하여 사용한다.

# tasks.py
from celery import Celery

app = Celery('tasks', backend='redis://localhost:6379', broker='redis://localhost:6379')
 
@app.task
def add(x, y):
  return x + y

위의 예제는 Celery인스턴스를 생성하고 간단한 task를 등록한 코드 예제이다.
Celery 인스턴스의 첫번째 매개변수는 해당 모듈의 이름(파일명)이고 그 다음에 오는 backend와 broker는 각각 worker의 작업결과를 저장할 백엔드와 message broker에 대한 정보이다. 인자값으로는 해당 서버들의 url을 입력한다.

3.4 celery worker server 실행

celery명령어의 worker 인자를 사용하여 celery server를 실행한다.

celery -A tasks worker -loglevel=INFO

위 명령어에서 -A는 application을 의미하는데, tasks모듈을 읽어서 해당 모듈의 application을 실행한다는 의미이다. worker는 worker인스턴스를 실행한다는 의미이고 loglevel은 celery서버에서 출력될 로그의 레벨을 지정하는 인자이다.

위의 명령어 실행하면 celery서버가 foreground로 실행되어 화면에 서버 로그들이 출력되게 된다.

3.5 task 실행

-celery 인스턴스에 등록된 task를 실행하기 위해서는 delay()메서드를 사용한다. delay()메서드를 사용하면 task수행 요청이 celery서버로 전달되어 worker에서 수행된다.

# client.py
from tasks import add, app

if __name__ == "__main__":
  result = add.delay(4, 4)

  print(result.ready())
  print(result.ready())
  print(result.ready())
  print(result.ready())
  print(result.get(timeout=1))
  print(result.get(timeout=1))
  print(result.get(timeout=1))

위의 예제는 이전에 tasks의 add task를 실행하고 그 결과를 출력하는 예제이다.
add.delay(4,4)를 실해앟면 delay()메서드의 인자들이 task에 입력되어 add(4,4)가 실행되고 celery server의 로그에 결과가 출력된다.

delay()메서드는 AsyncResult객체를 반환한다. 이 객체는 task의 상태와 결과를 확인할 수 있다. 이를 확인하기 위해서는 화면에서와 같이 ready(), get()메서드를 사용하면 된다. ready()는 task가 완료되었는지 여부를 반환하고, get()은 task의 결과를 반환한다.

이때 결과를 확인하기 위해서는 celery의 result_backend가 설정이 되어있어야 한다. 앞의 celery application 항목에서 celery인스턴스를 생성할 때 redis를 backend로 설정을 해주었다. 이 설정을 통해서 celery의 작업결과들이 redis에 저장되고 이를 통해 task의 상태와 결과를 조회할 수 있게 된다. 만약 result_backend설정 안하면 ready(),get()메서드에서 에러가 발생한다.

3.6 configuration

celery는 많은 설정이 필요없지만 input과 output에 대한 설정이 필요한 경우가 있다. 우선 input은 broker를 통해서 입력받기 때문에 무조건 연결 설정이 필요하다. output은 result_backend에 task결과 저장이 필요하다면 result_backend를 설정해주어야 한다.

celery의 설정을 변경하기 위해서는 아래의 예제와 같이 celery인스턴스에 직접 접근하여 설정을 해줄 수 있다. app.conf에서 해당 설정을 직접 변경해주거나 여러건의 설정을 변경하는 경우 update()메서드 사용 가능하다.

app.conf.task_serializer = 'json'

app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

만약 큰 프로젝트에서 많은 설정을 해야하는 경우에는 이를 모듈로 분리하여 설정에 사용 가능하다. 따로 모듈에 celery인스턴스의 설정들을 기술하고 config_from_object()메서드를 사용하여 해당 설정 모듈을 celery인스턴스에 적용할 수 있다.

아래는 config_from_object예제이다. celeryconfig.py모듈에 celery설정을 기술하고 이를 config_from_object()메서드를 통해 celery인스턴스에 적용한다.

# celeryconfig.py
broker_url = 'pyamqp://'
result_backend = 'rpc://'
 
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
# celery application code
...
app.config_from_object('celeryconfig')
...

0개의 댓글