[Celery]window에서 celery task가 실행되지 않는 문제

Jay·2022년 11월 4일
0
post-thumbnail

문제

Celery와 RabbitMQ를 통해 비동기 Task queue를 사용하여 알람을 생성하는 task들을 실행하였다. 경기 참여자들에게 전송할 알람을 생성하기 위한 create_alarms() task와 알람을 삭제하기 위한 delete_alarm() task를 각각 구현하였다.

@shared_task
def create_alarms(game_id):
	participations = Participation.objects.filter(game=game_id)
    for participation in participations:
        alarm, created = Alarm.objects.get_or_create(
            game=participation.game,
            user=participation.user,
            valid_until=participation.game.start_datetime
            )
        if not created:
            alarm.set_unsent()

@shared_task
def delete_alarm(game_id, user_id):
    alarm = Alarm.objects.filter(game=game_id, user=user_id)
    alarm.delete()

위의 메소드는 Participation 객체 생성시, 리시버가 최소 인원 모집 여부를 확인하여 모집이 완료된 경우 호출된다. 만약 최소 모집 인원이 3명인 경우, 3번째 모집시 create_alarms(경기id)를 호출하여 task를 전달한다.

[2022-10-27 14:46:19,255: INFO/MainProcess] Task games.tasks.delete_alarm[fb5647ba-a5d7-42bc-a623-b35ddc707ae2] received
[2022-10-27 14:46:19,257: INFO/MainProcess] Task games.tasks.create_alarms[b6b4a131-9f9e-4e26-a451-4bc36e46a631] received

celery 로그를 통해 create_alarms, delete_alarm task가 Celery main 프로세스로 전달된 것을 확인했다. 하지만 DB를 조회하니 알람은 생성되지도, 삭제되지도 않았다.



문제 분석

Celery가 Django에 설정된 DB를 공유하지 않는가?

가장 처음으로 생각했던 이유였다. django 내부에서 celery 앱을 생성할때, django의 여러 setting 값들을 가져와 인스턴스를 생성하였다. 하지만 celery worker에서 django의 DB 자원들을 가져와 DB에 결과를 저장하지 않는건 아닌지 의심했다.
결과는 그렇지 않았다. Celery는 Django 위에서 생성되어 실행되었기 때문에 그럴 가능성은 낮다고 판단하였다. 단지 Celery가 task를 정상적으로 실행하지 않은 것이었다.


task가 제대로 실행되었는가?

rabbitMQ 서버에 접속하여 Queue들의 상태를 조회했다. Queue들을 보니 지금까지 테스트를 broker에게 전달했던 task들에 대하여 Unacked된 것을 확인할 수 있었다.

Acked/Unacked

브로커와 워커는 message(task)의 전달을 보장하기 위해 acked,unacked를 사용한다. acked에 대한 설정을 따로 하지 않았다면 celery에서는 보통 worker가 전달 받은 task를 실행하기 직전에 acked 된다고 한다. 따라서 브로커에게 전달된 message가 worker로 전달되지 않았거나, 전달되었지만 제대로 실행되지 않았다고 판단하였다.


원인

원인은 celery가 버전 4부터 window를 지원하지 않았기 때문이다. celery의 기본 concurrency pool 설정은 prefork 방식이다. prefork는 celery worker 프로세스중 메인 프로세스가 새로운 프로세스를 fork 하여 task의 concurrent한 동작을 수행하는 방식이다. 하지만 윈도우는 UNIX, Linux와는 다르게 fork(부모 프로세스가 자식 프로세스를 생성하는 기능)를 지원하지 않는다. 따라서 celery worker들을 관리하는 프로세스가 실제로 task들을 수행하는 worker 프로세스들을 생성할 수 없었던 것이다.



해결방법

이를 해결하기 위한 방법으로는 2가지를 찾을 수 있었다. 하나는 process가 fork할 수 있도록 환경 변수를 설정해 주는 것이고, 다른 하나는 새로운 process를 생성하는 것 대신, 하나의 프로세스 내에서 multi threading을 통해 concurrent한 동작을 수행하는 것이다.

프로세스의 fork를 허용하는 방식

첫번째로는 process의 fork가 되지 않으니 이를 가능하게 하는 방법이다. django 환경 변수로 해당 환경 변수를 추가하거나 사용자 컴퓨터의 환경변수에 참조할 환경 변수를 설정해주어 fork를 가능하게 하면 된다.

import os

from celery import Celery


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoapi.settings.dev')
os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')

app = Celery('miti',
             broker='amqp://miti:miti@localhost/',
             include=['games.tasks'])

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

Celery에서 사용하는 billiard라는 패키지가 존재한다. 이러한 billiard 패키지가 process를 fork하는 방식을 가능하게 하도록 하기 위해 FORKED_BY_MULTIPROCESSING이라는 환경 변수를 설정한다고 한다. 하지만 celery 버전 4부터는 이러한 billiard 패키지가 FORKED_BY_MULTIPROCESSING 환경 변수를 설정하지 않기 때문에 윈도우에서 prefork 방식을 사용할 수 없다는 것이다. 따라서 직접 코드를 통해 해당 환경변수를 설정해주어 prefork 방식을 사용할 수 있다.
위의 명령어를 celery 객체가 생성하기 전에 실행되면 해당 변수값을 참조하여 fork가 가능하게 되어 child process를 생성하게 할 수 있다.

혹은 윈도우 환경 변수 설정을 통해 직접 FORKED_BY_MULTIPROCESSING 를 1로 설정하는 방법도 있다. 해당 방법은 아래의 링크에 설명되어있다.

eventlet/gevent등 thread pool을 사용하는 방식

샐러리를 윈도우에서 실행할 다음 방법은 concurrency pool을 thread를 사용하는 방식이다. celery 동작에서 concurrency pool 설정하여 프로세스를 fork하는 대신 thread를 생성하여 task를 수행할 수 있도록 하는 것이다. celery의 concurrency pool 기본 값은 앞서 말했던 prefork이다. 이를 바꾸어 thread를 지원하는 방식으로 설정하여 main 프로세스 내의 여러 thread에서 task를 실행하는 방식으로 변경하는 것이다.

celery -A <project명> worker -l info -P <eventlet or gevent or solo>

필자의 경우 eventlet을 사용하여 celery를 실행하였다.

[2022-11-03 14:30:15,783: INFO/MainProcess] Task games.tasks.delete_alarm[5ce227d8-256d-4280-8474-87e7bdf6d8b1] received
[2022-11-03 14:30:15,877: INFO/MainProcess] Task games.tasks.delete_alarm[5ce227d8-256d-4280-8474-87e7bdf6d8b1] succeeded in 0.0779999999795109s: None

그 결과는 아래와 같이 task가 성공적으로 실행되었음을 로그를 통해 알 수 있었다. 또한 task를 실행한 프로세스는 MainProcess로 새로운 프로세스를 생성하여 task를 실행한 것이 아니라 thread를 통해 수행했음을 알 수 있었다.

나는 후자인 eventlet을 사용하는 방식을 사용하였다. 높은 CPU자원을 필요로 하는 task의 경우에는 프로세스를 사용하는 prefork 방식을 사용하고, I/O가 무거운 task에서는 eventlet과 같은 thread pool을 사용하는 것이 적절하다. 따라서 alarm 을 저장하는 작업인 나의 task를 수행하기 위해서 eventlet을 사용하였다.

후기

구글링을 통해 한번에 해결할 수 있는 문제였지만 정확한 원인을 파악하지 못하여 올바른 질문을 던질 수 없어 해결이 꽤 늦어진 문제였다. 다만 로깅과 사용한 기술들의 동작을 자세히 공부하며 점차 문제의 원인을 정확하게 파악한 과정은 꽤나 값지다고 생각한다.





reference

https://www.distributedpython.com/2018/08/21/celery-4-windows/

0개의 댓글