회사에서 기존에 Pytest를 이용해 테스트를 진행하고 있었는데, 라이브러리 업데이트 하면서 라이브러리 내부 구현이 변경되어 Unit Test가 갑자기 Fail되는 상황이 발생했다.
이 상황을 해결하기 위해 Pytest 및 Thread 관련 개념을 좀 알아봐야겠다.
test_
로 시작하는 이름의 함수를 작성하면, 함수 안에 로직을 테스트 할 수 있음assert
문 : 테스트의 기초 중 기초. 예상되는 결과와 실제 결과 비교test_*.py
처럼 test
로 시작하거나 _test.py
같은 파일명을 자동으로 디렉토리에서 찾음.이런 식으로 파일 및 테스트를 작성하고 pytest
명령어를 실행하면 테스트를 진행할 수 있음.
Fixtures
를 활용하면 테스트 코드를 더 깔끔하게 작성 가능하고, 재사용 가능하며 테스트 환경을 독립적으로 설정, 종료하는데 유용함function
: fixture
의 기본값으로, 각 테스트 함수마다 새로 생성class
: 테스트 클래스 당 한 번만 실행module
: 테스트 모듈 당 한 번만 실행session
: 테스트 세션당 한 번만 실행뭘 하는 지 대충은 알겠는데, 말로만 들으면 잘 모르겠으니 자세한 예시를 살펴보자.
import pytest
# Fixture 정의
@pytest.fixture
def sample_data():
return {"name": "Alice", "age": 25}
# Fixture를 사용하는 테스트 함수
def test_name(sample_data):
assert sample_data["name"] == "Alice"
def test_age(sample_data):
assert sample_data["age"] == 25
sample_data
는 function fixture임. pytest
는 테스트가 실행되기 전에 sample_data
를 호출하여 제공import pytest
import pika
# RabbitMQ 서버와 연결하는 fixture
@pytest.fixture
def rabbitmq_connection():
# RabbitMQ 서버에 연결
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 테스트 후 연결을 종료하기 위해 yield 사용
yield channel
# 테스트 후 RabbitMQ 연결 종료
connection.close()
# 메시지 전송 테스트 함수
def test_send_message(rabbitmq_connection):
queue_name = 'test_queue'
rabbitmq_connection.queue_declare(queue=queue_name)
# 메시지 보내기
rabbitmq_connection.basic_publish(exchange='',
routing_key=queue_name,
body='Hello RabbitMQ')
# 메시지가 정상적으로 전송되었는지 확인
method, properties, body = rabbitmq_connection.basic_get(queue=queue_name)
assert body == b'Hello RabbitMQ'
Fixture
정의rabbitmq_connection
은 코드에서 보는 것과 같이 pika.bBlockingConnection
을 이용해 RabbitMQ 서버에 연결하고, 테스트가 끝난 후 연결을 종료한다.yield
를 사용해 테스트 함수에서 rabbitmq_conection
을 인자로 받아 사용할 수 있도록 하고, 테스트 후 연결을 종료할 수 있게 한다.yield
이전의 코드는 RabbitMQ 연결을 설정하는 부분이고, 이후의 코드는 테스트가 끝난 후 연결을 종료하는 부분이라고 이해하면 된다.Mocking
은 당연히 DB나 API 등을 실제로 사용하지 않고 그 동작을 가짜(mock)로 대체하여, 의존성 없는 테스트를 수행하여 Flow가 맞는 지 확인하는 방법이다.내가 아는 건 단순히 객체만 가짜로 만드는 게 있었는데, 3가지 정도 사용하는 방식이 있었다.
assert_called_with
등 assert
를 통해 검증할 수 있는 메서드를 기본적으로 제공Mock
과 Stub
가 좀 헷갈릴 수 있는데, 이런 차이점을 잘 알고 있으면 좋을 것 같다.
GPT한테 예제 코드를 작성해 달라고 하니, 아래처럼 친절하게 작성해주었다.
from unittest import mock
class WeatherService:
def get_weather(self, city):
# 실제 API 호출을 통해 날씨 데이터를 가져옴
pass
def test_weather_app():
weather_service_mock = mock.Mock()
weather_service_mock.get_weather.return_value = "sunny" # Mock: 반환 값 설정
result = weather_service_mock.get_weather("Paris")
assert result == "sunny"
weather_service_mock.get_weather.assert_called_once_with("Paris") # Mock: 호출 검증
assert_called_once_with
같은 메서드를 사용하여 get_weather가 "Paris"라는 인자를 가지고 한 번만 호출되었는지 확인함.class WeatherService:
def get_weather(self, city):
# 실제 API 호출을 통해 날씨 데이터를 가져옴
pass
class WeatherApp:
def __init__(self, weather_service):
self.weather_service = weather_service
def get_weather_info(self, city):
weather = self.weather_service.get_weather(city)
return f"The weather in {city} is {weather}"
# Stub을 사용하여 외부 의존성을 대체
def test_weather_app():
# Stub: get_weather 함수가 항상 "sunny"를 반환하도록 설정
weather_service_stub = WeatherService()
weather_service_stub.get_weather = lambda city: "sunny"
app = WeatherApp(weather_service_stub)
result = app.get_weather_info("Paris")
assert result == "The weather in Paris is sunny"
get_weather
메서드를 Stub으로 대체하여 항상 "sunny"를 반환하게 설정 -> 실제 날씨 API 호출 없이 테스트가 독립적으로 실행됨.class UserService:
def login(self, username, password):
# 실제 로그인 로직이 들어감
return f"Logged in as {username}"
def test_login():
user_service = UserService()
# Spy를 사용하여 login 메서드의 호출 추적
spy = mock.spy(user_service, 'login')
# 메서드 호출
result = user_service.login("user1", "password123")
# Spy를 통해 메서드 호출 여부, 호출된 인자 등을 검증
assert spy.call_count == 1
spy.assert_called_with("user1", "password123")
assert result == "Logged in as user1"
spy.call_count
로 메서드가 몇 번 호출되었는지 확인하고, spy.assert_called_with()
로 특정 인자와 함께 호출되었는지 검증함기존 레거시 코드고 해외 연구소에서 작성한 코드라 왜 이렇게 작성했는 지 한번에 이해하지는 못했지만, threading.thread
를 모킹하여 테스트를 작성한 경우가 있었다.
threading.Thread
를 모킹하는 이유threading.Thread
를 모킹하면 쓰레드가 실행되지 않고, 우리가 설정한 가짜 동작이 실행된다.
그러면 왜 이런 걸 사용할까?
ChatGPT가 들어준 예시는 아래와 같음
import threading
import time
import pytest
from unittest import mock
def background_task():
time.sleep(1)
return "task completed"
def main_task():
thread = threading.Thread(target=background_task)
thread.start()
thread.join() # 기다려서 백그라운드 작업이 끝날 때까지 기다림
return "main task finished"
def test_main_task_with_mocked_thread():
# Threading.Thread를 모의하여 실제 스레드가 시작되지 않도록 함
with mock.patch('threading.Thread') as MockThread:
mock_thread_instance = MockThread.return_value
mock_thread_instance.start = mock.MagicMock() # start 메서드를 모의
# 스레드의 target을 실행하는 부분을 mock으로 설정
mock_thread_instance._target = mock.MagicMock()
# main_task 함수 실행 (실제 스레드는 실행되지 않음)
result = main_task()
# 스레드의 start 메서드가 호출되지 않았는지 확인
mock_thread_instance.start.assert_called_once()
# 메인 작업의 결과가 예상대로 반환되는지 확인
assert result == "main task finished"
결국 코드에서 보면 threading.Thread
의 return 값을 mcok_thread_instance
로 모킹함.
고로 thread.start()
도 mocking되고 thread.target
도 mocking이 된다.
결국 main_task() 실행 시 threading.Thread(target=backgorund_task) 여기서 리턴 값 자체가 모킹이 되어있기 때문에, target도 mocking 됨 + thread.start()
여기도 mocking 됨 -> thread.join()까지 모킹 -> 실제 쓰레드 실행되는 게 아니고 호출 여부만 확인하며, main_task의 return이 정상적으로 이루어짐.
이런 과정으로 진행된다.
이게 내가 겪은 문제점이다. pika의 BlockingConnection의 내부 구현이 버전 업그레이드 되면서 Thread-safe 하게 동작하기 위해 threading 모듈 사용이 추가 됐다.(물론 한참됐는데, 우리 코드가 라이브러리 버전 업그레이드를 아예 진행하지 않아서 대응이 늦어졌다.)
이 건에 대한 해결 방법은 해결 완료 후에 업데이트 할 예정
예상 되는 해결 방안은
이 2개 정도 생각된다.