Pytest 개념 및 Mocking

정환우·2025년 5월 25일
1

Python 공부

목록 보기
6/6
post-thumbnail

회사에서 기존에 Pytest를 이용해 테스트를 진행하고 있었는데, 라이브러리 업데이트 하면서 라이브러리 내부 구현이 변경되어 Unit Test가 갑자기 Fail되는 상황이 발생했다.

이 상황을 해결하기 위해 Pytest 및 Thread 관련 개념을 좀 알아봐야겠다.

Pytest

  • Pytest는 이름에서 알 수 있듯이 Python에서 테스트를 작성하고 실행할 수 있는 테스팅 프레임워크다.

기본 활용

  • Test 함수 작성 : test_로 시작하는 이름의 함수를 작성하면, 함수 안에 로직을 테스트 할 수 있음
  • assert 문 : 테스트의 기초 중 기초. 예상되는 결과와 실제 결과 비교
  • 디렉토리 순회 : test_*.py 처럼 test로 시작하거나 _test.py같은 파일명을 자동으로 디렉토리에서 찾음.

이런 식으로 파일 및 테스트를 작성하고 pytest 명령어를 실행하면 테스트를 진행할 수 있음.

Fixtures 활용하기

Fixtures 란?

  • 테스트 함수 실행 전에 필요한 자원(예를 들면 DB 연결, File System 접근, 외부 API 요청 등)을 준비하고, 테스트가 끝난 후 자원을 정리하는데 사용할 수 있는 메커니즘
  • Fixtures를 활용하면 테스트 코드를 더 깔끔하게 작성 가능하고, 재사용 가능하며 테스트 환경을 독립적으로 설정, 종료하는데 유용함

Fixture의 Scope

  • function: fixture의 기본값으로, 각 테스트 함수마다 새로 생성
  • class : 테스트 클래스 당 한 번만 실행
  • module : 테스트 모듈 당 한 번만 실행
  • session : 테스트 세션당 한 번만 실행

뭘 하는 지 대충은 알겠는데, 말로만 들으면 잘 모르겠으니 자세한 예시를 살펴보자.

예시

1. 기본 예시

import pytest

# Fixture 정의
@pytest.fixture
def sample_data():
    return {"name": "Alice", "age": 25}

# Fixture를 사용하는 테스트 함수
def test_name(sample_data):
    assert sample_data["name"] == "Alice"

def test_age(sample_data):
    assert sample_data["age"] == 25
  • sample_data는 function fixture임.
  • pytest는 테스트가 실행되기 전에 sample_data 를 호출하여 제공

2. rabbitmq를 활용한 예시

import pytest
import pika

# RabbitMQ 서버와 연결하는 fixture
@pytest.fixture
def rabbitmq_connection():
    # RabbitMQ 서버에 연결
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 테스트 후 연결을 종료하기 위해 yield 사용
    yield channel

    # 테스트 후 RabbitMQ 연결 종료
    connection.close()

# 메시지 전송 테스트 함수
def test_send_message(rabbitmq_connection):
    queue_name = 'test_queue'
    rabbitmq_connection.queue_declare(queue=queue_name)
    
    # 메시지 보내기
    rabbitmq_connection.basic_publish(exchange='',
                                      routing_key=queue_name,
                                      body='Hello RabbitMQ')

    # 메시지가 정상적으로 전송되었는지 확인
    method, properties, body = rabbitmq_connection.basic_get(queue=queue_name)
    assert body == b'Hello RabbitMQ'
  1. Fixture 정의
  • rabbitmq_connection 은 코드에서 보는 것과 같이 pika.bBlockingConnection을 이용해 RabbitMQ 서버에 연결하고, 테스트가 끝난 후 연결을 종료한다.
  • yield를 사용해 테스트 함수에서 rabbitmq_conection을 인자로 받아 사용할 수 있도록 하고, 테스트 후 연결을 종료할 수 있게 한다.
    - yield 이전의 코드는 RabbitMQ 연결을 설정하는 부분이고, 이후의 코드는 테스트가 끝난 후 연결을 종료하는 부분이라고 이해하면 된다.

Mocking

  • Mocking은 당연히 DB나 API 등을 실제로 사용하지 않고 그 동작을 가짜(mock)로 대체하여, 의존성 없는 테스트를 수행하여 Flow가 맞는 지 확인하는 방법이다.
  • 당연히 실제 자원을 사용하지 않기 때문에 더 빠르고 효율적으로 테스트 가능

내가 아는 건 단순히 객체만 가짜로 만드는 게 있었는데, 3가지 정도 사용하는 방식이 있었다.

  1. Mock : 테스트에서 가짜로 만든 객체, 실제 시스템이나 함수 호출을 대체
    • 단순히 값을 반환하는 게 아니라, 함수의 호출과 상호작용을 검증하는 데에 중점을 둠
    • assert_called_withassert를 통해 검증할 수 있는 메서드를 기본적으로 제공
  2. Stub : 테스트 대상 함수가 의존하는 다른 함수나 메서드를 미리 정의된 값으로 대체하는 방식
    • 외부 시스템과 상호작용을 모방할 때 사용
    • 필요한 특정 값만 제공하는 역할
  3. Spy : 함수나 메서드가 호출되었는 지, 몇 번 호출되었는지를 추적하는 객체

MockStub가 좀 헷갈릴 수 있는데, 이런 차이점을 잘 알고 있으면 좋을 것 같다.

GPT한테 예제 코드를 작성해 달라고 하니, 아래처럼 친절하게 작성해주었다.

Mock 예제

from unittest import mock

class WeatherService:
    def get_weather(self, city):
        # 실제 API 호출을 통해 날씨 데이터를 가져옴
        pass

def test_weather_app():
    weather_service_mock = mock.Mock()
    weather_service_mock.get_weather.return_value = "sunny"  # Mock: 반환 값 설정

    result = weather_service_mock.get_weather("Paris")
    
    assert result == "sunny"
    weather_service_mock.get_weather.assert_called_once_with("Paris")  # Mock: 호출 검증
  • Mock은 assert_called_once_with 같은 메서드를 사용하여 get_weather가 "Paris"라는 인자를 가지고 한 번만 호출되었는지 확인함.
  • Mock은 호출 여부, 호출 횟수, 인자 등을 추적하여 테스트의 정확성을 검증하는 데 유용하게 사용됨.

Stub 예제

class WeatherService:
    def get_weather(self, city):
        # 실제 API 호출을 통해 날씨 데이터를 가져옴
        pass

class WeatherApp:
    def __init__(self, weather_service):
        self.weather_service = weather_service

    def get_weather_info(self, city):
        weather = self.weather_service.get_weather(city)
        return f"The weather in {city} is {weather}"

# Stub을 사용하여 외부 의존성을 대체
def test_weather_app():
    # Stub: get_weather 함수가 항상 "sunny"를 반환하도록 설정
    weather_service_stub = WeatherService()
    weather_service_stub.get_weather = lambda city: "sunny"

    app = WeatherApp(weather_service_stub)
    result = app.get_weather_info("Paris")
    assert result == "The weather in Paris is sunny"
  • 테스트에서는 get_weather 메서드를 Stub으로 대체하여 항상 "sunny"를 반환하게 설정 -> 실제 날씨 API 호출 없이 테스트가 독립적으로 실행됨.

Spy 예제

class UserService:
    def login(self, username, password):
        # 실제 로그인 로직이 들어감
        return f"Logged in as {username}"

def test_login():
    user_service = UserService()

    # Spy를 사용하여 login 메서드의 호출 추적
    spy = mock.spy(user_service, 'login')

    # 메서드 호출
    result = user_service.login("user1", "password123")
    
    # Spy를 통해 메서드 호출 여부, 호출된 인자 등을 검증
    assert spy.call_count == 1
    spy.assert_called_with("user1", "password123")
    assert result == "Logged in as user1"
  • UserService 클래스는 login 메서드를 통해 사용자 로그인을 처리함
  • 테스트에서는 mock.spy()를 사용하여 login 메서드를 추적한다. spy는 메서드가 호출될 때마다 호출 횟수, 인자 등을 기록합니다.
  • spy.call_count로 메서드가 몇 번 호출되었는지 확인하고, spy.assert_called_with()로 특정 인자와 함께 호출되었는지 검증함

현업에서 겪은 문제 상황

기존 레거시 코드고 해외 연구소에서 작성한 코드라 왜 이렇게 작성했는 지 한번에 이해하지는 못했지만, threading.thread를 모킹하여 테스트를 작성한 경우가 있었다.

threading.Thread를 모킹하는 이유

threading.Thread를 모킹하면 쓰레드가 실행되지 않고, 우리가 설정한 가짜 동작이 실행된다.

그러면 왜 이런 걸 사용할까?

  1. Multi-Threading을 사용하는 코드에서는 여러 스레드가 동시에 실행되기 때문에 테스트가 복잡해질 수 있음 -> Mocking 사용 시 테스트가 예상하는 대로 동기적으로 동작하도록 만들 수 있음.
  2. 객체가 쓰레드를 사용하는 경우, Thread가 실제로 실행되는 것이 중요한 것이 아니라 연결이 제대로 이루어졌거나 잘 생성되었는지 등을 테스트하고 싶을 때 유용함.

ChatGPT가 들어준 예시는 아래와 같음

import threading
import time
import pytest
from unittest import mock

def background_task():
    time.sleep(1)
    return "task completed"

def main_task():
    thread = threading.Thread(target=background_task)
    thread.start()
    thread.join()  # 기다려서 백그라운드 작업이 끝날 때까지 기다림
    return "main task finished"

def test_main_task_with_mocked_thread():
    # Threading.Thread를 모의하여 실제 스레드가 시작되지 않도록 함
    with mock.patch('threading.Thread') as MockThread:
        mock_thread_instance = MockThread.return_value
        mock_thread_instance.start = mock.MagicMock()  # start 메서드를 모의
        
        # 스레드의 target을 실행하는 부분을 mock으로 설정
        mock_thread_instance._target = mock.MagicMock()
        
        # main_task 함수 실행 (실제 스레드는 실행되지 않음)
        result = main_task()

        # 스레드의 start 메서드가 호출되지 않았는지 확인
        mock_thread_instance.start.assert_called_once()
        # 메인 작업의 결과가 예상대로 반환되는지 확인
        assert result == "main task finished"

결국 코드에서 보면 threading.Thread의 return 값을 mcok_thread_instance로 모킹함.
고로 thread.start()도 mocking되고 thread.target도 mocking이 된다.

결국 main_task() 실행 시 threading.Thread(target=backgorund_task) 여기서 리턴 값 자체가 모킹이 되어있기 때문에, target도 mocking 됨 + thread.start() 여기도 mocking 됨 -> thread.join()까지 모킹 -> 실제 쓰레드 실행되는 게 아니고 호출 여부만 확인하며, main_task의 return이 정상적으로 이루어짐.

이런 과정으로 진행된다.

다른 라이브러리 내부에서 threading을 사용하는 경우

이게 내가 겪은 문제점이다. pika의 BlockingConnection의 내부 구현이 버전 업그레이드 되면서 Thread-safe 하게 동작하기 위해 threading 모듈 사용이 추가 됐다.(물론 한참됐는데, 우리 코드가 라이브러리 버전 업그레이드를 아예 진행하지 않아서 대응이 늦어졌다.)

이 건에 대한 해결 방법은 해결 완료 후에 업데이트 할 예정

예상 되는 해결 방안은

  1. threading 모킹을 제외하고 필요한 부분만 모킹하도록 전체 수정
  2. 모킹은 진행하지만 BlockingConnection 연결은 진행되도록(가능한지는 모르겠음)

이 2개 정도 생각된다.

0개의 댓글