왓챠피디아 클론 WatchB 개발기: 영화 데이터 수집 (백엔드 3-2편)

mynghn·2022년 10월 13일
2

🐣 DRF Serializer를 활용한 API 응답 역직렬화

이전 포스팅에서 언급했듯이,
이제 Open API로부터 받아온 데이터를 👉 모델 인스턴스로 만들어줄 중간 과정이 필요하다.

먼저 영화 데이터를 수집해 모델에 담는 과정을 큰 그림에서 바라보면,

데이터를 받아오기 위한 API 요청과 이를 모델 인스턴스에 담아 DB에 쓰는 모든 과정이 하나의 파이썬 런타임 안에서 프로그래매틱하게 이루어진다.

그래서 외부에서 들어오는 매뉴얼한 요청에 대응하는 API 엔드포인트는 필요가 없다.

그런 고로 DRF의 Serializer를 꼭 써야 하는 상황은 아니었지만,
외부에서 들어오는 데이터의 유효성 검증을 거쳐 쟝고의 모델 인스턴스로 역직렬화하는 Serializer의 메인 태스크는 그대로였기 때문에

중간 과정을 DRF의 Serializer를 사용해 해결해 보기로 결정했다.

일반적인 Serializer의 유즈 케이스와는 거리가 좀 있었기 때문에,
프로젝트의 요구사항에 맞게 커스텀해야 할 사항들이 꽤나 있었는데 그 과정을 차근차근 한번 설명해보겠다.

🪚 DRF Serializer의 역직렬화 프로세스 해부

원하는 부분만 고쳐서 쓰려면 먼저 DRF의 구현 상 어디서 어떤 일이 일어나는지를 먼저 파악해야 하기 때문에,

그 과정에서 알게 된 DRF Serializer의 유효성 검증 프로세스를 쭉 정리해보자.

일단 Serializer의 두 가지 역할 중 하나인 역직렬화(당연히 나머지 하나는 직렬화)라는 일을 수행하는 장면을 생각해보면,

가장 겉에선 이런 흐름이다.

  1. JSON 등 포맷의 데이터를 data 인자로 넘겨 Serializer 객체 생성
    >>> serializer = MySerializer(insatnce=..., data={...})
  2. 역직렬화 이전에, 넘겨 받은 데이터의 유효성 검사부터 진행
    >>> serializer.is_valid()
  3. 유효성 검사를 통과한 경우, save() 메소드를 통해 역직렬화.
    >>> deserialized_object = serializer.save()

1에서 instance 인자도 같이 넘기면 수정 시나리오, data 인자만 넘기면 생성 시나리오가 된다. instance 인자로는 최종 역직렬화 결과와 같은 타입의 객체를 넘긴다.

DRF의 구현 상 역직렬화 이전에 is_valid() 메소드를 실행해 유효성 검사를 먼저 하도록 강제한다. 그렇지 않으면 예외 발생.

그리고 앞으로 할 일은 영화 데이터를 쟝고 모델에 담는 거니까,
3번 단계에서 미리 지정한 쟝고 모델의 인스턴스로 데이터를 역직렬화하는 ModelSerializer를 활용하면 된다.

그리고 ModelSerializer의 구현을 보면 파이썬 런타임 안에서 모델 인스턴스를 생성/수정한 뒤 꼭 인스턴스의 save() 메소드까지 실행한다.
즉, ModelSerializer에게 역직렬화의 결과로 반환할 원본 객체의 정의는 이미 DB에 반영까지 완료된 모델 인스턴스.

어쨌든 이 안을 들여다보면
2번과 3번 단계 안에서 사실 엄청 많은 일들이 일어나고,
여러 메소드에 나눠 모듈화된 이 흐름을 처음부터 쭉 파악하고 나서야 원하는 부분만 고쳐볼 수가 있겠다.

is_valid(): 유효성 검사

먼저 2번 단계인 유효성 검사 과정의 흐름부터 살펴보자. 유효성 검사 프로세스에서 내부 메소드가 순서대로 실행되는 순간들을 기준으로 들어가 보면 좋겠다.

  • serializer.is_valid()
    • self.run_validation(data=self.initial_data)
      1. (is_empty_value, data) = self.validate_empty_values(data)
      2. value = self.to_internal_value(data)
      3. self.run_validators(value)
      4. value = self.validate(value)

is_valid()run_validation()을 쭉 실행한 뒤 에러 발생을 체크해 bool 값으로 유효성 여부를 리턴하는 역할이고
run_validation()에서는 위의 4번 라인까지 실행 후 value 값을 리턴한다.

어쨌든 중요한 건 여기서 실질적인 유효성 검사 프로세스를 정의하는 메소드는 run_validation()이고,
그 중에서도 대부분의 일이 일어나는 요체는 to_internal_value()라는 점이다.

1의 validate_empty_values()에서는 Serializer로 넘어온 데이터 전체가 비었는지를 확인하기 때문에 보통은 문제가 될 일이 없고,
3의 run_validators()는 내부 필드 레벨이 아닌 Serializer 레벨의 validators 속성(보통 없음)을 본다.
4의 validate()에서 그나마 직접 정의한 필드 전체 레벨에서의 유효성 검사가 일어나고,

2의 to_internal_value()에서 거의 대부분의 유효성 검증이 내부 필드 레벨에서 진행된다.

그리고 to_internal_value()의 내부를 요약해보면 다음과 같다.

read_only가 아닌 내부 필드마다,

  1. validated_value = Field.run_validation(primitive_value)
  2. "validate_필드명" 형태의 커스텀 메소드가 정의되어 있으면, validated_value = 커스텀메소드()

primitive_value는 처음에 Serializer 생성 시 넘어온 데이터 딕셔너리로부터 해당 필드에 대한 값을 읽어온 것 뿐이고 2의 직접 구현한 커스텀 메소드는 차치하면,
DRF가 제공하는 필드 레벨의 유효성 검사가 진행되는 Field.run_validation()에서 역시 해당 필드의 거의 모든 유효성 검증이 진행된다.

그리고 메소드 이름이 같은 것에서 알 수 있듯이
필드 객체의 Field.run_validation() 역시,
4번 self.validate(value) 단계만 제외하면
위에서 정리한 Serializer.run_validation() 프로세스와 같은 코드가 실행된다.

심지어 이후 validate_필드명의 커스텀 메소드가 실행되는 걸 보면 완전히 똑같은 흐름으로 설계됐음을 알 수 있다.

사실 그럴수밖에 없는 게 DRF의 Serializer는 Field 클래스를 상속 받아 정의되기 때문에 그 자체로 하나의 필드이다.

그래서 Field.run_validation() 안에서 똑같이 실행되는
1의 Field.validate_empty_values()에선 required 옵션과 같은 이슈들을 살펴보고
3의 Field.run_validators()에선 필드 선언할 때 직접 넘겨줬거나 필드의 구현 상 기본 탑재된 Field.validators 속성 안의 Validator 객체를 돌며 유효성 검사 로직을 실행하기 때문에,

이전에 Serializer 레벨에서 실행됐을 때와 다르게 쩌리(?) 단계가 아닌 것이다.

그리고 이후 더 자세하게 살펴보겠지만 Serializer의 필드를 또 다른 Serializer로 선언할 수 있는 이유도 Field 클래스를 상속 받기 때문이다.

그래서 정리해보자면,
Serializer의 유효성 검사 과정은

  1. 내부 필드마다,
    1. DRF가 제공하는 필드 레벨 유효성 검사
    1. 직접 구현한 필드 레벨 유효성 검사: validate_필드명()
  2. 직접 구현한 전체 레벨 유효성 검사: validate()

의 순서로 간소화해 이해할 수 있겠다.

save(): 역직렬화

이후 역직렬화 프로세스는 비교적 심플하다.

  1. self.instance가 있으면 self.update(self.instance, validated_data)
  2. self.instance가 없으면 self.create(validated_data)

self.instance는 Serializer 생성 당시 첫번째 인자로 넘기는 기존의 원본 객체를 뜻하고,
validated_data는 유효성 검사 후 그 결과로 저장된 self.validated_datasave() 메소드에서 받은 키워드 인자를 합친 딕셔너리다.

어쨌든 원본 객체 수정과 신규 생성의 두 가지 갈래로 역직렬화를 진행할 수 있도록 해둔 것이고,

create()update() 메소드는 기본 Serializer의 경우 따로 구현을 직접 해줘야 하지만 (원본 객체가 무엇이 될지 알 수 없기 때문에)
ModelSerializer의 경우 쟝고 모델 인스턴스로 역직렬화가 일어나도록 이미 구현이 되어있다.

그리고 이번 프로젝트에선 ModelSerializercreate()update() 메소드를 그대로 사용할 것이기 때문에,
역직렬화 프로세스는 여기까지만 들여다보도록 하겠다.

CreateOrMergeWithDataMixin

이제 실제로 프로젝트하면서 커스텀한 부분들을 하나씩 살펴볼 건데

전체 데이터 수집 프로세스 중 Serializer를 통한 역직렬화가 필요한 순간의 상황적 특성에 따라 대부분의 요구사항이 생겨났다.

일단 영화가 메인 모델이었기 때문에,
주변 모델의 데이터는 들어오는 영화 데이터를 하나씩 보면서 기존에 못 본 것들을 하나씩 추가하는 것이 중복 없는 효율적인 프로세스가 된다.

따라서 새로운 영화를 하나씩 만나면서 중첩된 구조로 다른 주변 모델의 데이터도 같이 들어올 때,
기존에 이미 추가했던 데이터를 다시 만날 가능성이 있는 모델을 담당하는 Serializer의 경우,
이런 케이스를 알아챌 방법이 필요했다.

그런 가능성이 있는 모델은 테이블 간 관계에서 Movie 모델이 N 사이드가 되는 모델인데,
실제로는 영화 모델과 M:N 관계인 Genre/Country/Person, 세 모델이었다. (하나의 장르/국가/영화인이 여러 개의 영화와 연결)

어쨌든 위의 세 모델을 담당하는 Serializer들은,

생성자로 넘어온 JSON 데이터만을 보고

  • 기존 모델 인스턴스 중 매치되는 인스턴스(들)을 불러오고
  • 이들과 새로 들어온 데이터의 내용을 종합해 만든 최신 버전 데이터를 그 중 하나의 모델 인스턴스에 업데이트

하는 방식으로 역직렬화 과정을 수행할 수 있도록 구현한 CreateOrMergeWithDataMixin 클래스를 상속 받아 Serializer 동작을 커스텀했다.

class CreateOrMergeWithDataMixin:
    def save(self, **kwargs) -> Model:
        self.instance, extra_kwargs = self.search_instance(**self.validated_data)
        self.instance = super().save(**extra_kwargs | kwargs)
        return self.instance

    def search_instance(
        self, **validated_data
    ) -> tuple[Optional[Model], dict[str, Any]]:
        ...
        return instance, extra_kwargs

일단 들어온 데이터로 기존의 모델 인스턴스를 찾고,
해당 인스턴스를 self.instance로 넣은 뒤 원래 ModelSerializersave() 메소드를 실행하는 크게 2단계의 흐름인데,

첫 단계에서 기존 인스턴스를 찾는 search_instance()가 로직의 대부분을 차지한다.
간략하게 설명하면 들어온 데이터 중 유니크한 필드 값이 있다면 이를 이용해 매치되는 기존 인스턴스들을 찾는 방식이고, 여러 인스턴스가 매치되면 그들의 필드 값을 합집합하듯이 종합해서 extra_kwargs에 넣어 리턴한다.

SkipFieldsMixin & SkipChildsMixin

Open API로부터 넘어오는 데이터였기 때문에 상태가 안 좋은 경우도 더러 있었다.

그 중에는 치명적인 케이스라 해당하는 영화 전체를 못 쓰게 되는 경우도 있었지만
결함이 있는 데이터만 누락하고 나머지는 살리면 되는 케이스가 꽤 많았다.

이런 요구사항을 해결하기 위해 SkipFieldsMixinSkipChildsMixin,
두 클래스를 만들었다.

  • SkipFieldsMixin모델 수준에서 빈 값이 허용되는 필드에 한해 필드 레벨의 유효성 검사 실패를 넘어갈 수 있도록 한다. 실패한 필드는 생략되어 self.validated_data에 포함되지 않는다.
  • SkipChildsMixin은 다른 Serializer에 many=True로 선언된 중첩 구조 필드에 대해 유효성 검사에 실패한 데이터는 리스트에서 생략되는 것으로 실패를 넘어갈 수 있도록 한다.

RequiredTogetherMixin

Open API로부터 비롯된 레코드임을 표시하기 위해,
데이터 수집을 하면서 한 레코드가 이후에도 반복적으로 등장할 수 있는 MoviePerson 모델에는 tmdb_idkmdb_id 필드를 만들어두었다.

특히 Person 모델의 경우
위에서 살펴본 CreateOrMergeWithDataMixin의 시나리오 안에서 데이터만을 보고 기존 Person 객체와 일치 여부를 따질 때,
식별자를 통해 정확하게 식별되고 있음을 보장할 필요가 있었는데

따라서 Open API로부터 비롯된 데이터의 경우 두 식별자 중 하나는 무조건 값이 있어야 함을 검증하고자 했다.

이는 단순하게 Serializer 전체 레벨의 커스텀 메소드 validate()를 작성하면 해결되는 문제였는데,
반복되는 로직이기 때문에 Mixin 클래스로 따로 구현해서 활용했다.

class RequiredTogetherMixin:
    class Meta:
        required_together_fields: Iterable[str]

    def validate(self, attrs: dict[str, Any]):
        if all(attrs.get(f) in (None, "") for f in self.Meta.required_together_fields):
            raise ValidationError(
                {
                    api_settings.NON_FIELD_ERRORS_KEY: [
                        f"At least one of {self.Meta.required_together_fields} needed"
                    ]
                },
                code="required",
            )
        return super().validate(attrs)

🗃 DRF Writable Nested

이전 포스팅에서 NestedInitMixin을 활용해 Open API의 응답을 중첩된 데이터클래스로 역직렬화했듯이,

비슷하게 Serializer가 받은 중첩된 구조의 JSON 데이터를 recursive하게 쟝고 모델 인스턴스로 역직렬화할 방법이 필요하다.

중첩된 구조의 데이터를 받는다는 건,
앞서 언급했던 Serializer 내부 필드로 다른 Serializer 객체를 선언하는 케이스
를 말하는데

여기서 문제가 발생하는 건 ModelSerializer를 사용하고 있기 때문이라고 볼 수 있다.

ModelSerializer에서 중첩된 구조의 데이터를 역직렬화하는 게 문제가 되는 이유를 뜯어보면,

  • 일단 유효성 검사 단계에선 중첩된 구조가 문제가 되지 않는다.
  • 이후 역직렬화 단계에서
    유효성 검사를 거친 self.validated_data를 재료로 create()update()든 하는데
    여기에 중첩된 구조로 실제로는 모델 인스턴스를 뜻하는 데이터가 들어가 있으면,
    쟝고 모델 API 쪽에서 모델 인스턴스를 이용해서 진행하지 않았다는 예외가 발생하게 된다.

그리고 DRF의 기본 구현에선 중첩된 구조의 데이터 역직렬화를 지원하지 않지만 이를 다루는 서드파티 패키지를 추천하고 있고 그 패키지가 바로 DRF Writable Nested다.

WatchB에선 이 패키지를 사용해 영화 Serializer에 중첩된 형태의 데이터로 들어오는 주변 모델들을 역직렬화했다.

⛏ 전체 크롤링 프로세스 디자인

일단 Open API 데이터와 모델 인스턴스 사이를 잇는 가교로 DRF의 Serializer를 위와 같이 사용하기로 했는데,

사실 그 앞에 중간 과정 한 단계가 더 필요하다.

  1. Open API 응답이 담긴 데이터클래스를 👉 모델 스펙에 맞는 딕셔너리
  2. Open API 응답이 담긴 모델 스펙의 딕셔너리를 👉 모델 인스턴스

(여기서 모델 스펙이란 쟝고 모델의 내부 필드 구성과 같은 구조를 갖는 것을 말한다.)

즉, 2단계가 앞서 설명한 Serializer의 담당 레이어고 1단계 레이어에서의 작업도 추가적으로 필요하다.

현재 1단계 레이어에서의 작업은 모델 필드마다 데이터 클래스로부터의 직렬화 로직을 메소드로 작성해둔 클래스를 사용해 진행하고 있다.
TMDB와 KMDb API 각각에 대한 직렬화 담당 클래스를 구현했으며, 역할을 봤을 때 일종의 Serializer를 직접 구현했다고 볼 수도 있겠다.

그럼 이제 영화 데이터 수집 프로세스의 전체 과정을 처음부터 끝까지 다시 한번 쭉 정리해보자.

  1. Open API에 데이터 요청 후 응답
  2. 받아온 응답을 데이터클래스로 역직렬화하고 그 과정에서 필요한 데이터를 선별
  3. 선별된 데이터가 담긴 데이터클래스를 모델 스펙에 맞는 딕셔너리(JSON)로 직렬화
  4. 선별된 데이터가 담긴 모델 스펙의 JSON을 모델 인스턴스로 역직렬화. 그 과정에서 DB에 데이터 적재.

이러한 흐름을 토대로 추상 클래스를 구현해 데이터 수집 프로세스 전체를 실행할 Crawler 클래스의 인터페이스를 설계했다.

class APICrawler(metaclass=ABCMeta):
    serializer_class: Type[ModelSerializer] = MovieFromAPISerializer
	...
    
    @abstractmethod
    def fetch(self, *args, **kwargs) -> list[MovieFromAPI]:
        raise NotImplementedError

    @abstractmethod
    def serialize(self, movie_fetched: MovieFromAPI) -> dict[str, Any]:
        raise NotImplementedError

    def get_or_register(
        self, movie_data: dict[str, Any]
    ) -> tuple[Optional[Movie], Optional[MovieFromAPISerializer]]:
        if (
            (tmdb_id := movie_data.get("tmdb_id"))
            and (movie_filtered := Movie.objects.filter(tmdb_id=tmdb_id))
        ) or (
            (kmdb_id := movie_data.get("kmdb_id"))
            and (movie_filtered := Movie.objects.filter(kmdb_id=kmdb_id))
        ):
            return movie_filtered.get(), None
        else:
            serializer = self.serializer_class(data=movie_data)
            if serializer.is_valid():
                return serializer.save(), serializer
            else:
                return None, serializer

    def run(
        self, *args, **kwargs
    ) -> list[tuple[Optional[Movie], Optional[MovieFromAPISerializer]]]:
    	...
        else:
            movies_fetched = self.fetch(*args, **kwargs)
        return [
            self.get_or_register(self.serialize(fetched)) for fetched in movies_fetched
        ]

fetch() 👉 serialize() 👉 get_or_register() 의 흐름이고,

  • fetch(): 1+2단계
  • serialize(): 3단계
  • register(): 4단계

에 해당하겠다.

✔️ APICrawler를 상속 받아 추상 메소드들을 실제 구현한 애플리케이션 케이스들은 movies/crawlers/mixins/cralwer.py 모듈에서 확인 가능

🤖 커스텀 django-admin 커맨드

지금껏 영화 데이터 수집을 위한 하위 프로그램 요소들을 완성했으니,
이제 이를 엮어 WatchB의 서비스 관리자가 실제로 데이터 수집을 실행할 수 있도록 해주는 가장 바깥의 인터페이스가 필요하겠다.

ORM 등의 작업을 하려면 쟝고 환경 안에서 프로그램이 실행되어야 하는데,
django-admin 커맨드를 구현하면 문제가 해결된다.

프로젝트 안에 커스텀 커맨드를 구현하는 방법은 간단하다.

INSTALLED_APPS에 명시된 쟝고 앱의 디렉토리 내부에서,

  1. management라는 이름의 패키지를 만들고
  2. 그 안에 commands라는 패키지를 하나 더 만든 뒤
  3. 그 안에 커스텀 커맨드의 이름으로 파이썬 모듈을 만들면 된다.
  4. 그리고 모듈 안에는 BaseCommand를 상속 받아 Command라는 이름으로 클래스를 선언하고,
    add_arguments() 메소드와 handle() 메소드를 오버라이드하면 된다.
  • add_arguments()에선 커스텀 커맨드가 CLI를 통해 받을 인자들을 정의한다. 파이썬의 ArgParser를 확장한 클래스를 내부적으로 사용하기 때문에 기본적으로는 ArgParser의 인터페이스와 상당히 비슷하다.
  • handle() 내부에는 CLI를 통해 받은 인자들을 토대로 실제 애플리케이션의 실행을 정의한다.

✔️ WatchB에서는 crawlmovies 커맨드를 구현해 영화 데이터 수집 애플리케이션을 마련했다.

🏁 마지막, TODO

  • 데이터클래스 직렬화 레이어도 DRF의 Serializer를 사용해 정의
  • Movie 모델 Serializer에도 적용할 수 있도록 CreateOrMergeWithDataMixin 로직 업그레이드

이제 다음 편에선 유저 인증 도메인 이외의 다양한 유저 액션들을 처리하는 API 엔드포인트들 구축기로 돌아오겠다.

To Be Continued...

0개의 댓글