Python에서 동시성(Concurrency)이나 병렬성(Parallelism)을 구현할 때
가장 많이 사용되는 도구가 concurrent.futures의
ThreadPoolExecutorProcessPoolExecutor이다.
이 둘의 차이를 작업의 병목이 어디에 있는지(CPU vs I/O) 관점에서 이해하지 못하면
Executor 선택의 기준은 단 하나다.
“이 작업이 느린 이유가 계산 때문인가, 기다림 때문인가?”
예:
requests, aiohttp)sleep, socket 통신-> ThreadPoolExecutor에 매우 적합
예:
-> ProcessPoolExecutor가 필요
| 항목 | ThreadPoolExecutor | ProcessPoolExecutor |
|---|---|---|
| 실행 단위 | 스레드(Thread) | 프로세스(Process) |
| 적합한 작업 | I/O-bound | CPU-bound |
| 병목 해결 방식 | I/O 대기 시간 겹치기 | CPU 코어 병렬 사용 |
| 메모리 공간 | 프로세스 내부 공유 | 프로세스별 분리 |
| 직렬화(Pickle) | 필요 없음 | 필요 |
| GIL | 공유됨 (우회 불가) | 분리됨 (우회 가능) |
| 생성 비용 | 가볍고 빠름 | 무겁고 느림 |
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=10) as executor:
results = executor.map(fetch_url, urls)
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(process_image, image_paths)
ProcessPoolExecutor는
-> 작업 함수와 인자를 Pickle로 직렬화하여 다른 프로세스에 전달한다.
이로 인해 다음과 같은 제약이 발생한다.
| Pickle 가능 | Pickle 불가 |
|---|---|
| int, str, list, dict | lambda |
| top-level 함수 | 내부 함수 |
| 단순 데이터 구조 | 열린 파일 객체 |
| DB 커넥션 | |
| GPU 세션 | |
| 소켓, 핸들 |
def outer():
def inner(x):
return x * x
# inner는 pickle 불가 → ProcessPoolExecutor에서 오류 발생
자주 발생하는 오류 메시지:
AttributeError: Can't pickle local object ...
import asyncio
semaphore = asyncio.Semaphore(4)
| 작업 유형 | 추천 Executor | 이유 |
|---|---|---|
| 웹 크롤링, 파일 입출력 | ThreadPoolExecutor | I/O 대기 중 GIL 해제 |
| 이미지 처리, NLP | ProcessPoolExecutor | CPU 병렬 처리 |
| 모델 추론, GPU 처리 | ThreadPoolExecutor / asyncio | GPU 세션 공유 필요 |
| CPU + I/O 혼합 | asyncio + ProcessPoolExecutor | 역할 분리 필요 |
함수 단위로 독립 처리하고, 공유 상태가 없다면 결과는 섞이지 않는다
results = await asyncio.gather(*[
loop.run_in_executor(pool, wrapper, file)
for file in files
])
gather()는 입력 순서대로 결과 반환-> 병렬 실행이지만 결과는 항상 안정적
ThreadPoolExecutor나 ProcessPoolExecutor를 사용한다고 해서
결과가 자동으로 섞이거나 꼬이지는 않는다.
문제가 발생하는 대부분의 경우는 공유 상태(shared state) 를 잘못 다룰 때다.
여러 스레드 또는 프로세스가
같은 전역 변수나 객체를 동시에 읽고/쓰면 실행 순서가 보장되지 않는다.
counter = 0
def work():
global counter
counter += 1
counter를 수정Lock 사용 (권장하지 않음)병력 작업이 동일한 파일 이름으로 결과를 저장하면 서로 덮어쓰거나 파일이 손상될 수 있다.
def save_result(data):
with open("result.json", "w") as f:
f.write(data)
save_path = f"output/{file_id}.json"
병렬 실행에서는 작업 완료 순서 ≠ 입력 순서다.
입력 순서를 가정한 로직은 쉽게 깨진다.
results = []
def work(x):
results.append(x)
def wrapper(idx, value):
return idx, process(value)
results = await asyncio.gather(*[
loop.run_in_executor(pool, wrapper, file_id, file)
for file_id, file in files
])
gather()는 입력 순서대로 결과 반환-> 병렬 실행이지만 결과는 안정적으로 관리됨
def wrapper(file_id, file):
return {file_id: process(file)}
filnal = {}
for item in results:
final.update(item)
save_path = f"output/{file_id}.json"