Periodic task 한 번만 실행하기

hyuckhoon.ko·2022년 1월 4일
0

결론적으로 AWS Lambda를 도입하진 않았다.
그러한 결론을 내리기까지의 이야기다.

1️⃣ 해결해야 하는 문제

서버 수만큼 periodic 작업들이 실행됐다.
예를 들어, 총 3대의 서버가 있다면 매 30분마다 특정 로직을 수행하는 작업이
각각 독립적으로 진행되는 것이다.
동일 유저에게 비동기적인 알림톡 발송 작업이 3번 반복되고 있음을 의미한다.

이러한 문제를 해결하고자 celery-singleton 패키지를 사용해왔다.

음... 그래서 뭐가 문제야?

from celery import shared_task
from celery_singleton import Singleton


@shared_task(base=Singleton)
def example_task():
    ...
    생략
    ...

정확한 원인은 찾고 있는 단계지만,
종종 운영팀으로부터 자동 취소 또는 알림톡 발송 등의 기능이 작동되지 않는다는는 피드백을 접수받곤 했다. CloudWatch 로그를 살펴봐도 전날 19시부터 갑자기 celery worker가 작업을 수행하지 않았다.


2️⃣ AWS Lambda 서비스로 해결해볼까?

cronjob은 step function과 함께 Lambda를 적용하기 적합하다.
Cold start에 민감하지 않고, 비교적 가벼운 작업들로 구성 돼 있기 때문이다.

"일단 비동기 periodic 작업들을 람다화 하자!!!!"


3️⃣ 예상하지 못한 문제 발생

❶ models.py의 동기화를 어떻게 할까

AWS 람다 서비스 콘솔화면에서 코드를 바로바로 수정할 수 있다는 것은
매우 큰 장점이다. 빠른 배포/수정 측면에서 말이다.
단점은 실제 production 브랜치의 최신 models.py와 Lambda 콘솔의 models.py의 간극은 선형적으로 벌어진다.

개발팀원 혹은 개발 그룹간 람다 함수를 독립적으로 개발할 때,
서로 상이한 코드 또는 버전의 models.py가 혼재하는 구조가 될 것이다.
복잡도가 증가하고 통제하기가 어려워진다.


람다에서 layer를 공유하거나 S3에 최신코드를
업로드시켜 사용하면 어떨까

일단 람다 콘솔에서 layer를 공유하는 방식은 자동화라고 보기 어려워 관리가 소홀해질 수 있다.
따라서, github action을 통해 production 브랜치 코드가 업데이트될 때마다 소스 코드를 AWS S3에 업로드하는 방법이 좋아보였다.

그러다 문득 이런 생각이 스쳤다.

🧐 나는 비동기 periodic 작업들만 서버 수와는 상관없이
단지 한 번만 실행시키고 싶을 뿐인데?

구현하려는 람다 함수에 필요한 리소스는 최소한의 models.py와 최소한의 requirements.txt파일일 뿐이다. 람다의 동기화를 위해 모든 디렉토리 파일을 S3에 업로드하여 사용하는 것은 적절한 판단인가?
필요한 리소스에 비해 구축하는 인프라와 그 범위가 적절하지 않다는 생각이 들었다.


cronjob 브랜치를 생성하거나 별도의 레포지토리를 만들까

마지막 대안은 이렇다.

여기 production 브랜치cronjob 브랜치
(또는 cronjob 레포지토리)가 있다.

배포 프로세스는 아래처럼
cronjob 브랜치 트리거 ->
1) github action(테스트, 이미지 빌드/푸시) ->
2) ECR ->
3) AWS Lambda 배포

이것이 최선이라 생각하고 작업을 진행하고 있었다!!!!


❸ [연역적 추론] stackoverflow에는 왜 이런 고민과 질문이 없을까

주변에서 관측되는 사례 또는 관찰을 통해 보편성을 만드는 사고방식을 즐겨한다.
stackoverflow뿐만 아니라 구글링, YouTube를 해도
내가 해결하고 싶은 문제에 대한 이야기가 없었다.

"대부분의 서비스가 디폴트 서버는 최소 2대고, 오토스케일링도 되는 인프라일텐데, 주기작업으로 멱등성 이슈에 대한 이야기는 왜 없을까?"

혹시 너무 당연하고 쉬운 길이라 간과하고 넘어간 부분이 있는 것은 아닐까?

"장고 어드민 페이지에서 Periodic task에 대한 CRUD가 가능하지?
아 그럼 관련 테이블이 있다는 거구나.
거기엔 updated_at과 같은 시간 정보가 있겠구나"
(이걸 너무 늦게 깨달았다)


SELECT 
    *
FROM
    django_celery_beat_periodictask;

놀랍게도 last_run_at이라는 필드가 존재했다.

한 번만 실행할 수 있도록 모듈을 만들자.
누구나 쉽게 사용할 수 있게 데코레이터로 만들자.
예를 들어 이렇게 말이다.

#utils.py
from typing import Callable


def singleton_required(func: Callable) -> Callable:
    def inner(*args, **kwargs):
        from django_celery_beat.models import PeriodicTask
        try:
            name = func.__name__
            task = PeriodicTask.objects.get(name=name)
            task.refresh_from_db()
            time_diff = abs((timezone.now() - task.last_run_at).total_seconds())
            if time_diff < 60 * 3: 
                """3분 미만"""
                return
            else:
                func()
        except PeriodicTask.DoesNotExist:
            return
        except TypeError:
            return
    return inner


# tasks.py
@singleton_required
@shared_task
def example_task():
    ...
    생략
    ...

❹ python manage.py inspectdb

예를 들어, 다른 장고 프로젝트에서 현재 장고 프로젝트의 DB 구조를 그대로 가져오고 싶다고 하자.
settings.py의 DATABASES 설정을 가져오고 싶은 DB 주소로 지정한다.
그리고 아래의 명령어로 DB구조를 동기화할 수가 있다.

python manage.py inspectdb

4️⃣ 반전....!


참고자료: https://dev.to/vergeev/django-celery-beat-how-to-get-the-last-time-a-periodictask-was-run-39k9

https://reposhub.com/python/job-scheduler/celery-django-celery-beat.html

0개의 댓글