[ 글의 목적: python 에서 context manager 가 필요한 상황과 원리, with 에 대한 deep dive, python 3.13 기준!! ]
python 에는 with 구문이 있다. (사실 2005년 PEP 343 부터 있었던 고인물). 동시성 얘기와 context manager 얘기하는데 왜 with 냐? 사실 이 질문은.. 고민이 역전된 질문이다. 정확히 말하면 with는 context manager를 사용하기 위한 문법이기때문!
사실 batch 중심 서비스에서 celery 쓰긴 과해서, 기존 운영 DBMS 에 추상화된 task model 로 단일 python runtime process 띄워서 처리하는데, 이 "task" 객체 모델링하다가 삘받아서 쓰는 정리글인건 비밀이다.
이제는 멀티코어 프로세서가 일반화되고 I/O 집약적인 작업을 효율적으로 처리하기 위해 동시성 프로그래밍이 거의 기본 개념이 되어버렸다. (왜 동시성 처리가 필요한지는 이제 너무 식상한 얘기가 되어버린 것 같은..)
이제 멀티스레딩, 멀티프로세싱뿐만 아니라, 단일 스레드 내에서도 이벤트 루프와 코루틴을 활용해 동시성을 극대화할 수 있다. (python 코루틴(coroutine) - 동시성과 병렬성, 동기와 비동기 작업, blocking과 non-blocking 그리고 코루틴 / javascript - 기본 동작 원리와 v8 js 엔진 참조)
근데 with
랑 context manager
얘기하면서 왜 동시성 얘기 부터 하느냐? 사실 동시성 프로그래밍에서 가장 까다로운 문제는 바로 "공유 리소스 관리(Shared Resource Management)" 이기 때문이다.
즉 여러 스레드나 코루틴이 파일, 데이터베이스 커넥션, 네트워크 소켓과 같은 공유 리소스에 동시에 접근할 때, 접근 순서를 제어하고 사용 후 리소스를 안정적으로 해제하지 않으면 경쟁 상태(Race Condition), 데드락(Deadlock), 리소스 누수(Resource Leak) 등 심각한 문제로 이어질 수 있기 때문이다.
import threading
lock = threading.Lock()
shared_resource = 0
def worker():
global shared_resource
# with 구문이 lock의 획득(acquire)과 해제(release)를 보장합니다.
with lock:
# 이 블록은 한 번에 하나의 스레드만 실행할 수 있는 임계 영역(Critical Section)입니다.
data = shared_resource
data += 1
# 다른 스레드가 끼어들 수 있는 잠재적 위험 구간
time.sleep(0.1)
shared_resource = data
# ... 스레드 생성 및 실행 ...
# 또는 장고에서
from django.db import transaction
def my_view():
with transaction.atomic():
# 이 블록 안의 작업이 하나의 트랜잭션으로 묶임
do_something()
do_something_else()
python 하면 위 코드는 식상할 정도로 뭔가 많이 본 형태다. 누군가 "왜 with
를 쓰나요?" 라고 한다면, "lock
을 저 block 에서만 사용하고 빠빠이 하려고요!" 라고만 답하게 된다면, 조금 더 아래를 내려보자!
동시성 환경에서는 여러 실행 흐름이 언제든지 CPU를 점유하고 리소스에 접근할 수 있다. 이때 리소스 접근을 제어하는 '락(Lock)'을 획득하고 사용 후 반드시 '해제'해야 한다.
만약 락을 해제하는 코드가 실행되기 전에 예외가 발생한다면? 해당 락은 영원히 해제되지 않아 다른 스레드들은 무한정 대기하는 데드락 상태에 빠지게 된다.
Context Manager
는 with
블록에 진입할 때 리소스를 획득하고, 블록을 빠져나올 때 예외 발생 여부와 관계없이 반드시 리소스를 해제하는 작업을 수행하도록 보장한다. 이것이 동시성 프로그래밍에서 Context Manager가 필수적인 이유다.
import threading
lock = threading.Lock()
def unsafe_operation():
lock.acquire() # 락 획득
# ... 공유 리소스 작업 ...
if some_error_condition:
raise ValueError("오류 발생!")
lock.release() # 예외 발생 시 이 코드는 절대 실행되지 않음!
# 조금 나아진 예시: try...finally 사용
def slightly_better_operation():
lock.acquire()
try:
# ... 공유 리소스 작업 ...
if some_error_condition:
raise ValueError("오류 발생!")
finally:
lock.release() # finally 블록으로 해제를 보장
try ... finally
구문을 추상화 해서 사용할 수 도 없는 것.. (자주 바뀌게 되어있음...)import threading
lock = threading.Lock()
def safe_operation():
with lock: # 진입 시 lock.acquire(), 탈출 시 lock.release() 자동 호출
# ... 공유 리소스 작업 ...
if some_error_condition:
raise ValueError("오류 발생!")
# with 블록이 끝나면 예외가 발생해도 락은 안전하게 해제됨
with
가 이걸 어떻게 해결한다는 건가? 어떻게 진입 시 lock.acquire()
, 탈출 시 lock.release()
자동 호출을 한다는 것이가!?!?# 리소스 누수 예시
def leak_file_descriptors():
# 이 함수를 반복 호출하면 결국 에러 발생
f = open('temp.txt', 'w')
f.write('leak')
# f.close()를 의도적으로 누락
close()
를 호출하지 않는 코드가 반복 실행되면 운영체제가 프로세스에 할당한 파일 디스크립터(File Descriptor) 개수 제한에 도달 해 "Too many open files" 오류가 발생한다. with
는 이걸 원천적으로 막아준다.# 리소스 누수 방지
def no_leak_example():
with open('temp.txt', 'w') as f:
f.write('safe')
# with 블록이 끝나면 f.close()가 자동으로 호출됨
with
는 "Context Manager"를 위한 "Syntactic Sugar" 이다. contextlib — with 문 컨텍스트를 위한 유틸리티
쉽게 말하면 __enter__()
와 __exit__()
메소드를 구현한 객체로 with
문 사용 시 자동으로 호출되는 메서드들이다. (만약 던더메서드, 매직메서드를 모른다면 이 글은 도움이 못된다.)
__enter__(self)
: with
블록에 진입할 때 호출된다. 리소스를 획득하고 설정하는 역할을 하며, as
키워드로 변수에 할당할 값을 반환한다.
__exit__(self, exc_type, exc_value, traceback)
: with
블록을 "빠져나올 때" 반드시 호출된다. 리소스를 해제하는 역할을 한다. 만약 블록이 예외 없이 정상 종료되었다면 세 인자(exc_type, exc_value, traceback)는 모두 None이 된다. 예외가 발생했다면 해당 예외 정보가 전달된다. __exit__
메서드가 True
를 반환하면 예외가 전파되지 않고 억제된다.
import dis
def my_func():
with open('file.txt', 'w') as f:
f.write('hello')
dis.dis(my_func)
3 RESUME 0
4 LOAD_GLOBAL 1 (open + NULL)
LOAD_CONST 1 ('file.txt')
LOAD_CONST 2 ('w')
CALL 2
BEFORE_WITH
L1: STORE_FAST 0 (f)
...생략...
4 L2: LOAD_CONST 0 (None)
LOAD_CONST 0 (None)
LOAD_CONST 0 (None)
CALL 2
POP_TOP
RETURN_CONST 0 (None)
L3: PUSH_EXC_INFO
WITH_EXCEPT_START
...생략...
왜 갑자기 바이트코드냐면,, 지금 cpython 3.14 이상인 main인 깃헙 레포에는 with
에 대한 코드를 찾기 어렵다.. "Remove the BEFORE_WITH and BEFORE_ASYNC_WITH instructions. Add the new :opcode:LOAD_SPECIAL
instruction" 때문 ㅠ
덧붙이자면 with
문은 Python 3.11
부터 도입된 특화 적응형 인터프리터(Specializing Adaptive Interpreter) 덕분에 고도로 최적화된 바이트코드를 사용한다. - https://peps.python.org/pep-0659/ & https://www.youtube.com/watch?v=shQtrn1v7sQ (아니 그니까 좀 python 최소한 3.11 이상은 써라 제발 좀)
여튼 BEFORE_WITH
가 __enter__
이며 WITH_EXCEPT_START
가 with
에서 예외 발생 시 __exit__(...)
호출 하기 위한 세팅이다. 파이썬이 이렇게나 상위 문법을 지원해줘서 감사할따름
사실 __enter__
와 __exit__
를 가진 클래스를 매번 작성하는 것은 "귀찮다". 이럴때마다 파이써닉이 와닿는데, contextlib
내장 라이브러리가 이 과정을 훨씬 쉽게 만들어주는 @contextmanager
데코레이터를 제공한다.
https://github.com/python/cpython/blob/main/Lib/contextlib.py 에서 실제 해당 내장 라이브러리의 코드 참조!
@contextmanager
데코레이터의 내부 구현@contextmanager
데코레이터는 제너레이터(Generator) 함수를 손쉽게 Context Manager로 변환해 준다. 이 데코레이터는 내부적으로 _GeneratorContextManager
라는 헬퍼 클래스를 사용하여 제너레이터를 컨텍스트 관리 프로토콜에 맞게 래핑한다.
def contextmanager(func):
"""@contextmanager decorator.
Typical usage:
@contextmanager
def some_generator(<arguments>):
<setup>
try:
yield <value>
finally:
<cleanup>
This makes this:
with some_generator(<arguments>) as <variable>:
<body>
equivalent to this:
<setup>
try:
<variable> = <value>
<body>
finally:
<cleanup>
"""
@wraps(func)
def helper(*args, **kwds):
return _GeneratorContextManager(func, args, kwds)
return helper
class _GeneratorContextManager(
_GeneratorContextManagerBase,
AbstractContextManager,
ContextDecorator,
):
"""Helper for @contextmanager decorator."""
def __enter__(self):
# do not keep args and kwds alive unnecessarily
# they are only needed for recreation, which is not possible anymore
del self.args, self.kwds, self.func
try:
return next(self.gen)
except StopIteration:
raise RuntimeError("generator didn't yield") from None
def __exit__(self, typ, value, traceback):
if typ is None:
try:
next(self.gen)
except StopIteration:
return False
else:
try:
raise RuntimeError("generator didn't stop")
finally:
self.gen.close()
else:
if value is None:
# Need to force instantiation so we can reliably
# tell if we get the same exception back
value = typ()
try:
self.gen.throw(value)
except StopIteration as exc:
# Suppress StopIteration *unless* it's the same exception that
# was passed to throw(). This prevents a StopIteration
# raised inside the "with" statement from being suppressed.
return exc is not value
except RuntimeError as exc:
# Don't re-raise the passed in exception. (issue27122)
if exc is value:
exc.__traceback__ = traceback
return False
# Avoid suppressing if a StopIteration exception
# was passed to throw() and later wrapped into a RuntimeError
# (see PEP 479 for sync generators; async generators also
# have this behavior). But do this only if the exception wrapped
# by the RuntimeError is actually Stop(Async)Iteration (see
# issue29692).
if (
isinstance(value, StopIteration)
and exc.__cause__ is value
):
value.__traceback__ = traceback
return False
raise
except BaseException as exc:
# only re-raise if it's *not* the exception that was
# passed to throw(), because __exit__() must not raise
# an exception unless __exit__() itself failed. But throw()
# has to raise the exception to signal propagation, so this
# fixes the impedance mismatch between the throw() protocol
# and the __exit__() protocol.
if exc is not value:
raise
exc.__traceback__ = traceback
return False
try:
raise RuntimeError("generator didn't stop after throw()")
finally:
self.gen.close()
_GeneratorContextManager
가 이미 매우, 충분히 잘 만들어져 있기때문에, (제발 다시 처음부터 만들지 말고) @contextmanager
이거 부터 사용할지 고민해봐야 한다.
__enter__
메서드__exit__
메서드간단한 해당 class 의 flow chart 는 아래와 같다.
gen.throw(value)
조금만 더 보자!예외가 있으면, __exit__
메서드는 전달받은 예외 정보(typ, value, traceback)를 사용하여 제너레이터의 throw()
메서드를 호출한다.
즉, gen.throw(value)
를 통해 제너레이터가 yield
에서 멈춰있던 지점으로 예외를 '주입' 한다. 그러면 제너레이터 함수 안의 try...except
블록에 의해 잡히게 된다.
그러니까 @contextmanager
를 사용한 함수에서 try...except
에서 잡히게 된다는 거고, try...except
를 써야 좀 더 depth 있는 디버깅을 할 수 있다는 것!
# 방법 1: 클래스로 직접 구현
class Timer:
def __enter__(self):
self.start = time.time()
print("타이머 시작")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
end = time.time()
print(f"소요 시간: {end - self.start:.2f}초")
return False # 예외를 억제하지 않음
# 방법 2: @contextmanager 데코레이터 사용
from contextlib import contextmanager
import time
@contextmanager
def timer():
start = time.time()
print("타이머 시작")
try:
yield # __enter__의 반환값 (여기선 None), 이 지점에서 with 블록 코드가 실행됨
finally:
# with 블록을 빠져나오면 이 코드가 실행됨
end = time.time()
print(f"소요 시간: {end - start:.2f}초")
# 사용법은 동일
with timer():
time.sleep(1)
@contextmanager
를 사용하면 try...yield...finally
패턴으로 __enter__
와 __exit__
의 로직을 훨씬 직관적으로 표현할 수 있다.
__enter__
에 해당__exit__
에 해당_GeneratorContextManager
는 __enter__
가 호출되면 제너레이터를 yield
지점까지 실행하고, __exit__
가 호출되면 예외 정보를 제너레이터의 throw()
메서드로 주입하거나 next()
를 호출하여 finally
블록이 실행되도록 한다.
더욱이 이제 "바이트코드" 로 구현되어 있어서 최소한의 오버헤드를 추가 한다고 한다.
사실 제네레이터를 모르면 위 예제들이나 설명이 와닿지가 않는다. (python 코루틴(coroutine) - iterator, generator, asyncio, async, await 그리고 코루틴 (2) 참조) 단순한 정의를 보면 python 에서 제네레이터는 "이터레이터를 생성하는 함수" 이다. yield
표현식을 사용하고 다음 호출 시 마지막으로 실행된 yield
표현식 이후부터 실행을 재개한다.
실행 흐름의 일시 중단 및 재개: yield 키워드는 함수의 실행을 잠시 멈추고 제어권을 호출자에게 넘겨준다. with
블록의 코드가 실행되는 동안 제너레이터는 yield
지점에서 대기하게 된다.
상태 유지: 제너레이터 함수 내의 "지역 변수"는 yield
를 통해 중단되었다가 다시 재개될 때까지 그 상태를 그대로 유지한다. (start 변수처럼)
예외 주입: 제너레이터의 throw()
메서드를 사용하면 제너레이터가 멈춰있는 yield
지점 외부에서 예외를 발생시킬 수 있다. @contextmanager
는 이 기능을 활용해 with
블록의 예외를 제너레이터 내부로 전달한다. 이 얘기가 바로 위에서 본 "예외 발생시의 gen.throw(value)" 얘기다.
그리고 사실 대용량 처리일 수 록, "메모리 관점의 이점이 상당하다." 이건 당연히 제네레이터의 특성이자 이를 기반으로 할 수 있는 with
의 결과론적인 이점이다.
commit
, 실패 시 rollback
을 수행해야한다. from contextlib import contextmanager
import sqlite3
@contextmanager
def db_transaction(db_path):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
print("DB 커넥션 및 트랜잭션 시작")
try:
yield cursor # with 블록에서 사용할 커서 객체를 반환
print("트랜잭션 커밋")
conn.commit()
except Exception as e:
print(f"예외 발생: {e}, 트랜잭션 롤백")
conn.rollback()
raise # 예외를 다시 발생시켜 호출자에게 알림
finally:
print("DB 커넥션 종료")
conn.close()
# 사용 예시
with db_transaction('app.db') as cursor:
cursor.execute("INSERT INTO users (name) VALUES ('Alice')")
# 만약 여기서 예외가 발생하면 자동으로 롤백됩니다.
# SQLAlchemy의 내장 Context Manager 예시
from sqlalchemy.orm import Session
with Session(engine) as session:
session.add(User(name="Bob"))
# 예외 발생 시 자동으로 롤백, 정상 종료 시 커밋 (설정에 따라 다름)
session.commit()
# 세션은 자동으로 닫힘
여러 서버에 걸쳐 리소스 접근을 동기화해야 할 때 Redis 같은 외부 저장소를 이용해 분산 락을 구현할 수 있다. 이때도 Context Manager는 매우 유용하다.
(나쁜예)
import redis
# 락을 획득했지만, 프로세스가 갑자기 죽으면 락이 해제되지 않을 수 있음
def bad_distributed_lock(r, lock_key):
is_acquired = r.set(lock_key, "locked", nx=True, ex=30)
if is_acquired:
# ... 임계 영역 작업 ...
r.delete(lock_key) # 작업 중 오류가 나면 실행되지 않음
else:
print("락 획득 실패")
(context manager 예시)
from contextlib import contextmanager
import redis
import uuid
@contextmanager
def distributed_lock(r: redis.Redis, lock_key: str, timeout: int = 30):
lock_id = str(uuid.uuid4())
# 락 획득 시도 (nx=True는 키가 없을 때만 set)
if not r.set(lock_key, lock_id, nx=True, ex=timeout):
raise TimeoutError("분산 락을 획득할 수 없습니다.")
try:
yield lock_id # 락을 획득했음을 알림
finally:
# 내가 획득한 락이 맞는지 확인하고 안전하게 삭제 (Lua 스크립트 사용)
release_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
r.eval(release_script, 1, lock_key, lock_id)
# 사용 예시
r = redis.Redis()
try:
with distributed_lock(r, "my-distributed-lock") as lock_id:
print(f"락 획득 성공 (ID: {lock_id})")
# 여러 서버에서 공유하는 중요한 작업 수행
except TimeoutError as e:
print(e)
외부 API 호출 시 "비율 제한(Rate Limiting)" 이 필요하거나, 스레드 풀 같은 리소스를 사용하고 안전하게 종료(shutdown)하는 데도 Context Manager가 이상적이다.
(나쁜예)
import time
import requests
from concurrent.futures import ThreadPoolExecutor
# 스레드에 안전하지 않은(non-thread-safe) API 클라이언트
class UnsafeAPIClient:
def __init__(self, requests_per_second=1):
self.interval = 1.0 / requests_per_second
self.last_request_time = 0
def make_request(self, url):
# 여러 스레드가 이 부분을 동시에 통과할 수 있어 비율 제한이 깨짐 (Race Condition)
time_since_last = time.time() - self.last_request_time
if time_since_last < self.interval:
time.sleep(self.interval - time_since_last)
self.last_request_time = time.time()
print(f"{threading.current_thread().name}: Requesting {url} at {self.last_request_time:.2f}")
return requests.get(url)
# 스레드 풀을 생성하고 제대로 종료하지 않음
def bad_api_usage():
client = UnsafeAPIClient(requests_per_second=2) # 초당 2회 제한
executor = ThreadPoolExecutor(max_workers=5)
urls = ['https://api.example.com/data'] * 5
# 여러 스레드에서 안전하지 않은 클라이언트를 공유하며 작업 제출
for url in urls:
executor.submit(client.make_request, url)
# executor.shutdown(wait=True) 호출을 잊어버림!
# 프로그램이 즉시 종료되지 않거나, 스레드 리소스가 누수될 수 있음.
print("모든 작업을 제출했지만, 스레드 풀을 종료하지 않았습니다.")
위와 같이 스레드 풀을 수동으로 관리하고, 비율 제한이 스레드에 안전하지 않은 경우는
경쟁 상태 (Race Condition): UnsafeAPIClient
의 비율 제한 로직은 여러 스레드가 동시에 접근하면 바로 꺠진다. 여러 스레드가 거의 동시에 time_since_last
를 확인하고 sleep
없이 바로 요청을 보내기 때문에 의도한 비율 제한을 바아로 초과하게 된다.
리소스 누수 (Resource Leak): ThreadPoolExecutor
를 생성한 후 shutdown()
메서드가 호출되지 않았다! -> 백그라운드 스레드가 정리되지 않아 프로그램이 비정상적으로 대기하거나 리소스가 계속 점유되는 문제가 발생한다. (이거 생각보다 그냥 놓치면 나중에 디버깅하기 어렵다. 평소 습관이 중요...)
(context manager 예시)
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
import threading
# 스레드 풀을 안전하게 관리하는 Context Manager
@contextmanager
def thread_pool(max_workers):
executor = ThreadPoolExecutor(max_workers=max_workers)
try:
yield executor
finally:
# with 블록이 끝나면 반드시 shutdown이 호출됨
executor.shutdown(wait=True)
# 스레드에 안전한 비율 제한 로직을 제공하는 Context Manager
@contextmanager
def rate_limiter(client_instance):
with client_instance.lock: # 스레드 락으로 임계 영역 보호
time_since_last = time.time() - client_instance.last_request_time
if time_since_last < client_instance.interval:
time.sleep(client_instance.interval - time_since_last)
client_instance.last_request_time = time.time()
yield
class SafeAPIClient:
def __init__(self, requests_per_second=1):
self.interval = 1.0 / requests_per_second
self.last_request_time = 0
self.lock = threading.Lock() # 스레드 동기화를 위한 락
def make_request(self, url):
with rate_limiter(self): # 컨텍스트 매니저로 비율 제한
print(f"{threading.current_thread().name}: Requesting {url} at {time.time():.2f}")
return requests.get(url)
# Context Manager를 활용한 안전한 병렬 API 요청
def good_api_usage():
client = SafeAPIClient(requests_per_second=2)
urls = ['https://api.example.com/data'] * 5
with thread_pool(max_workers=5) as executor:
futures = [executor.submit(client.make_request, url) for url in urls]
# 결과 처리...
print("모든 작업이 완료되고 스레드 풀이 안전하게 종료되었습니다.")
thread_pool
은 좀 과하다 ㅎㅎ; ThreadPoolExecutor
가 이미 __enter__
와 __exit__
메서드를 구현하고 있기 때문에 불필요한 중복이다 ㅎ. 하나의 예시를 위해 가져와서 사용했다.with ThreadPoolExecutor(max_workers=5) as executor
로 처리 가능하다.# ThreadPoolExecutor를 직접 Context Manager로 활용
def good_api_usage_simplified():
client = SafeAPIClient(requests_per_second=2)
urls = ['https://api.example.com/data'] * 5
# ThreadPoolExecutor 자체가 Context Manager이므로 별도 래퍼 함수 불필요
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(client.make_request, url) for url in urls]
# 결과 처리...
# concurrent.futures.as_completed(futures) 등을 사용하여 완료된 순서대로 결과 처리 가능
print("모든 작업이 완료되고 스레드 풀이 안전하게 종료되었습니다.")
결국 Context Manager는 "리소스의 안전한 획득과 해제를 보장하는 파이썬의 핵심 메커니즘" 이다. 그리고 이걸 with
구문으로 사용할 수 있다. (__enter__
와 __exit__
자동 호출)
결국 with
문은 동시성 환경에서 발생할 수 있는 경쟁 상태(Race Condition), 데드락(Deadlock), 리소스 누수(Resource Leak) 등의 심각한 문제들을 원천적으로 방지 하는 짱편한 문법이다.
특히 @contextmanager
데코레이터와 제너레이터의 조합은 복잡한 클래스 구현 없이도 직관적인 try...yield...finally
패턴으로 리소스 관리 로직을 표현할 수 있게 해준다.
특히 3.11 이상의 특화 적응형 인터프리터 덕분에 성능 오버헤드도 최소화되었으니, 파일 I/O, 데이터베이스 트랜잭션, 스레드 동기화, 분산 락 등 리소스가 관련된 모든 곳에서 Context Manager를 적극 활용해보자!
글 초반부 읽으며 이거 generator 이야기가 빠질 수 없겠는데.. 하면서 읽었는데 역시 나오는군요 ㅋㅋㅋ|
3.14 부터 gil 해제가 공식 지원 되면서 여러 변경사항이 with에도 영향이 갔나보네요
재미있게 잘 읽었습니다!