파이썬에서의 동시성과 병렬성에 대해 알아보자
상당히 헷갈리는 개념이므로 미리 한번 정리하고 가기
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Thread 1 │ → │ Thread 2 │ → │ Thread 3 │ (빠르게 전환)
└───────────┘ └───────────┘ └───────────┘
│ │ │
┌───────CPU 1───────┐ (실제론 한 개의 CPU에서 실행)
Thread
와 asyncio
를 이용하여 구현 가능함.All of Python's blocking I/O primitives release the GIL while waiting for the I/O block to resolve -- it's as simple as that! They will of course need to acquire the GIL again before going on to execute further Python code, but for the long-in-terms-of-machine-cycles intervals in which they're just waiting for some I/O syscall, they don't need the GIL, so they don't hold on to it!
출처 - stackoverflow 게시물
┌───────────CPU 1───────────┐ ┌───────────CPU 2───────────┐
│ Process 1 실행 │ │ Process 2 실행 │
└───────────────────────────┘ └───────────────────────────┘
multiprocessing
모듈을 이용하여 구현 가능즉 정리하자면, 파이썬에서는
Thread
나 asyncio
모듈을 활용한 Concurrency를 이용하는 것이 좋다.multiprocessing
모듈을 활용한 Parallelism을 이용하는 것이 좋다.대규모 데이터 연산 비교 예시
import threading
import multiprocessing
import time
def cpu_task():
print(f"실행 중: {threading.current_thread().name or multiprocessing.current_process().name}")
start = time.time()
total = sum(i**2 for i in range(10**7)) # CPU 연산
print(f"완료! 실행 시간: {time.time() - start:.2f}초")
# 🔹 멀티쓰레딩 (GIL 제한 때문에 비효율적)
def threading_test():
threads = [threading.Thread(target=cpu_task) for _ in range(4)]
start = time.time()
for t in threads:
t.start()
for t in threads:
t.join()
print(f"🔴 멀티쓰레딩 총 실행 시간: {time.time() - start:.2f}초")
# 🔹 멀티프로세싱 (각 프로세스가 독립적인 CPU 코어 사용)
def multiprocessing_test():
processes = [multiprocessing.Process(target=cpu_task) for _ in range(4)]
start = time.time()
for p in processes:
p.start()
for p in processes:
p.join()
print(f"🟢 멀티프로세싱 총 실행 시간: {time.time() - start:.2f}초")
print("🎯 멀티쓰레딩 테스트")
threading_test()
print("\n🎯 멀티프로세싱 테스트")
multiprocessing_test()
>>>
🎯 멀티쓰레딩 테스트
실행 중: Thread-1
실행 중: Thread-2
실행 중: Thread-3
실행 중: Thread-4
완료! 실행 시간: 4.00초
완료! 실행 시간: 4.00초
완료! 실행 시간: 4.00초
완료! 실행 시간: 4.00초
🔴 멀티쓰레딩 총 실행 시간: 16.00초
🎯 멀티프로세싱 테스트
실행 중: Process-1
실행 중: Process-2
실행 중: Process-3
실행 중: Process-4
완료! 실행 시간: 4.00초
완료! 실행 시간: 4.00초
완료! 실행 시간: 4.00초
완료! 실행 시간: 4.00초
🟢 멀티프로세싱 총 실행 시간: 4.00초
Mutex
라고 볼 수 있다.CPython
이며, C로 구현된 것. python.org에서 제공한다. Cpython GithubCPython
은 Python 코드를 bytecode
로 컴파일 (변환 과정 참고 링크)PVM(Python Virtual Machine)
이 bytecode
를 한 줄씩 번역하여 실행CPython
에 존재. 다른 구현체인Jython
등에는 GIL
이 없음reference counting
을 사용하여 객체의 수명을 추적하고, 메모리를 관리한다.reference count = 0
일 때, 메모리를 해제하고 객체를 회수함reference count
를 증가시키는 연산을 수행한다면 race condition
이 발생할 수 있음 -> leaked memory
or reference
가 존재해도 메모리가 release
되는 문제가 생길 수 있음Thread-Safe
하게 유지하기 위해 GIL
을 사용한다.파이썬에서도 GIL
을 해제하자니 성능 저하가 있고, 계속 사용하는 것도 고질적인 문제여서 이를 제거하거나, 성능 향상을 위해 스케줄링 등등 방식을 개선하려고 지속적으로 노력 중
https://yahwang.github.io/posts/70
https://gguguk.github.io/posts/how_to_work_python/
https://blog.naver.com/se2n/223143538332
파이썬이 시작한 자식 프로세서는 서로 병렬적으로 실행되기 때문에, 컴퓨터의 모든 CPU 코어를 사용할 수 있음 -> 그에 따라 프로그램의 Throughput
을 최대로 높일 수 있다.
┌───────────┐
│ Python │ (GIL 때문에 한 번에 하나의 작업만 실행)
└───────────┘
│
┌────▼────┐
│ CPU 1 │ (CPU 하나만 사용됨)
└─────────┘
┌───────────┐
│ Python │ (메인 프로세스)
└───────────┘
│
┌────▼────┐ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐
│ CPU 1 │ │ CPU 2 │ │ CPU 3 │ │ CPU 4 │ (모든 코어 활용)
└─────────┘ └─────────┘ └─────────┘ └─────────┘
subprocess
모듈이나 multiprocessing
모듈을 사용하면 모든 CPU 코어 사용 가능파이썬에서 하위 프로세스를 실행하는 방법은 여러가지가 있으나(os.popen
, os.system
, os.exec
등) 자식 프로세스의 관리가 필요할 때는 subprocess
모듈을 사용하는게 가장 좋다고 한다.
다른 모듈은 실행 결과를 가져오기 어렵다거나, 오류 처리가 불편하거나 한 등의 단점이 있다고 함.
기본적인 사용법은 다음과 같다.
import subprocess
result = subprocess.run(["echo", "Hello, world!"], capture_output=True, text=True)
print(result.stdout) # Hello, world!
만약에 자식 프로세스를 백그라운드에서 실행하고 싶다면, run
대신 Popen
을 사용할 수 있다.
import subprocess
print("프로세스 시작!")
result = subprocess.run(["sleep", "5"]) # 5초 동안 대기
print("프로세스 종료!")
print("메인 프로세스에서 다른 작업하고 싶음!")
>>>
프로세스 시작!
프로세스 종료!
메인 프로세스에서 다른 작업하고 싶음!
Blocking 으로 실행되면서, 프로세스가 끝날때까지 대기한다.
import subprocess
import time
# 하위 프로세스를 비동기적으로 실행
process = subprocess.Popen(["sleep", "5"])
print("메인 프로세스에서 다른 작업 수행 중...")
# 자식 프로세스 상태를 주기적으로 확인
while process.poll() is None: # poll()이 None이면 아직 실행 중
print("자식 프로세스 실행 중...")
time.sleep(1) # 1초마다 확인
print("자식 프로세스 종료 완료!")
>>>
메인 프로세스에서 다른 작업 수행 중...
자식 프로세스 실행 중...
자식 프로세스 실행 중...
자식 프로세스 실행 중...
자식 프로세스 실행 중...
자식 프로세스 실행 중...
자식 프로세스 실행 중...
자식 프로세스 종료 완료!
비동기로 실행되어, 다른 작업이 실행되는 것을 볼 수 있다.
Popen
을 사용하면, 파이썬이 다른 일을 하면서 주기적으로 자식 프로세스의 상태를 검사(polling)한다고 생각하면 된다.
자식 프로세스와 부모를 분리하면 부모 프로세스가 원하는 개수만큼 자식 프로세스를 병렬로 실행할 수 있다.
아래 코드는 그 예시이다.
import time
import subprocess
start_time = time.time()
sleep_procs = []
for _ in range(10):
proc = subprocess.Popen(["sleep", "1"])
sleep_procs.append(proc)
for proc in sleep_procs:
proc.communicate()
end_time = time.time()
print(f'{end_time - start_time:.3} 초 만에 끝남')
>>>
1.08 초 만에 끝남
위 작업을 순차적으로 실행했다면 10초 이상 소요되어야 하는데, 병렬적으로 실행하여 1.08초 만에 끝난 것을 볼 수 있다.
PIPE
사용 가능import subprocess
# 부모 프로세스에서 하위 프로세스로 데이터 전송
data = "Hello from parent process\n"
process = subprocess.Popen(
["grep", "Hello"], # "Hello"가 포함된 줄만 출력하는 명령어
stdin=subprocess.PIPE, # 부모 프로세스 → 하위 프로세스 데이터 전송
stdout=subprocess.PIPE, # 하위 프로세스 → 부모 프로세스 출력 수신
text=True # 텍스트 모드
)
stdout, _ = process.communicate(data) # 데이터 전송 후 결과 수신
print(f"하위 프로세스 출력: {stdout.strip()}")
>>>
하위 프로세스 출력: Hello from parent process
자식 프로세스가 멈추는 경우나 교착 상태를 방지하려면, commuicate 메서드에 대해 timeout 파라미터를 이용하면 된다.
위에서 GIL
에 대한 이야기가 너무 나와서, 파이썬에서는 더 이상 Mutex
를 사용하지 않아도 되는 것으로 이해할 수 있다.
GIL
이 파이썬 쓰레드들이 병렬적으로 실행될 수 없게 막는다면, 파이썬 쓰레드들이 데이터 구조에 동시에 접근할 수 없게 막는 락 역할도 해줘야 하지 않을까?
이렇게 이해할 수 있다는 뜻이다.
하지만 실제로는 전혀 그렇지 않다. (이미 BFC
프로젝트 진행하면서 체감했다.)
┌──────────────────────────────────────────┐
│ Python 인터프리터 (GIL) │
└──────────────────────────────────────────┘
│ ▲ ▲
▼ │ │
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Thread 1 │ │ Thread 2 │ │ Thread 3 │ (각 쓰레드는 독립적이지만, 동시에 실행되지 않음)
└───────────┘ └───────────┘ └───────────┘
│ ▲ ▲
▼ │ │
┌─────────── GIL이 바이트코드 실행 중 인터럽트 가능 ───────────┐
│ [A] 쓰레드 1 실행 → [B] 쓰레드 2로 전환 → [C] 쓰레드 3로 전환 │
└──────────────────────────────────────────┘
코드로 나타내보면 다음과 같다.
import threading
import time
class Counter:
def __init__(self):
self.count = 0
def increment(self, offset):
self.count += offset
def worker(sensor_index, how_many, counter):
for _ in range(how_many):
counter.increment(1)
NUM_SENSORS = 5 # 센서 개수 (스레드 개수)
INCREMENTS_PER_SENSOR = 100000 # 각 센서에서 증가하는 횟수
counter = Counter()
threads = []
for sensor_id in range(NUM_SENSORS):
t = threading.Thread(target=worker, args=(sensor_id, INCREMENTS_PER_SENSOR, counter))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"최종 Counter 값 (잘못된 값일 가능성 있음): {counter.count}")
>>> 최종 Counter 값 (잘못된 값일 가능성 있음): 497831
왜 이런 상황이 나올까?
파이썬 인터프리터는 실행되는 모든 스레드를 강제로 공평하게 취급해서, 각 쓰레드의 실행 시간을 거의 비슷하게 만든다.
이를 위해 파이썬은 실행 중인 쓰레드 일시 정지 -> 다른 쓰레드 실행
이 과정을 반복하는데, 문제는 여기서 파이썬이 쓰레드를 언제 일시 정지시킬 지 알 수 없다는 점이다.
심지어 atomic
으로 보이는 연산 조차도 연산 수행 도중 파이썬이 쓰레드를 일시 중단 시킬 수 있다. 그래서 위 예제에서도 count 값이 정상적으로 update 되지 않는 것이다.
+=
연산자는 하나의 연산으로 보이지만, 실제로는 아래와 같이 세 가지 연산으로 이루어진다.
value = getattr(counter, 'count')
result = value+1
setattr(counter, 'count', result)
그래서 쓰레드가 세 연산 사이 어느 지점에서 중단되면 연산 순서가 뒤섞임 -> value
값이 이상해 지는 것이다.
이와 같은 데이터 경합이나 구조 오염을 방지하기 위해, threading
내장 모듈 안에서 여러 가지 튼튼한 도구를 제공하는데, 가장 간단하고 유용한 도구로 Lock
클래스가 있다.
파이썬에서 제공하는 Lock
클래스는 상호 배제 락 ( Mutex ) 이다.
Lock
을 사용한 코드는 아래와 같다.
import threading
import time
class Counter:
def __init__(self):
self.count = 0
self.lock = threading.Lock() # Lock 생성
def increment(self, offset):
with self.lock:
self.count += offset
def worker(sensor_index, how_many, counter):
for _ in range(how_many):
counter.increment(1)
NUM_SENSORS = 5 # 센서 개수 (스레드 개수)
INCREMENTS_PER_SENSOR = 100000 # 각 센서에서 증가하는 횟수
counter = Counter()
threads = []
for sensor_id in range(NUM_SENSORS):
t = threading.Thread(target=worker, args=(sensor_id, INCREMENTS_PER_SENSOR, counter))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"최종 Counter 값 (정확한 값): {counter.count}") # 항상 NUM_SENSORS * INCREMENTS_PER_SENSOR
이제는 Lock 덕에 실행 결과가 들어 맞게 된다.
동시성 작업을 처리할 때 가장 유용한 방식은 파이프라인이다.
파이프라인은 마치 공장 조립 라인 처럼 작동한다.
이런 접근 방법은 앞에서 설명했듯이, 병렬화할 수 있는 Blocking I/O
나 하위 프로세스
가 포함되는 경우에 특히 좋다. (53번 참고)
아무래도 번역판이다보니 설명이 좀 어색한데, 다음 예시를 한번 봐보자.
이 작업은 아래와 같이 3단계 파이프 라인으로 나눌 수 있다.
각 단게를 처리하는 함수를 download
, resize
, upload
라는 파이썬 함수로 작성했다고 해보자.
def download(item): # 이미지 스트림 가져오는 함수
...
def resize(item): # 이미지 크기 변경 함수
...
def upload(item): # 포토 갤러리에 저장(업로드)하는 함수
...
작업을 전달한 방법은 아래와 같이 producer-consumer(생산자-소비자)
를 사용해 작업을 전달할 수 있는 큐를 만들었다.
from collections import deque
from threading import Lock, Thread
class MyQueue:
def __init__(self):
self.items = deque()
self.lock = Lock()
def put(self, item):
with self.lock:
self.items.append(item)
def get(self):
with self.lock:
return self.items.popleft()
put
메서드를 통해 미처리 작업을 표현하는 deque의 끝에 새로운 이미지를 추가한다.get
메서드를 통해 미처리 작업을 표현하는 deque의 맨 앞에서 이미지를 가져오고, deque에서 삭제한다.아래는 큐에서 작업을 가져오고 작업을 수행한 뒤 다른 큐에 넣는 쓰레드 워커 클래스를 구현한 부분이다.
class Worker(Thread):
def __init__(self, func, in_queue, out_queue):
super().__init__()
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
self.polled_count = 0
self.work_done = 0
def run(self):
while True:
self.polled_count += 1
try:
item = self.in_queue.get()
except IndexError:
time.sleep(0.1) # 할 일이 없는 상태. 잠시 대기한다.
else:
result = self.func(item)
self.out_queue.put(result)
self.work_done += 1
Worker 클래스에서는 in_queue
에서 아이템을 가져오고 작업을 진행하고, 완료되면 out_queue
에 넣는다.
입력 큐가 비어 있는 경우(이전 단계에서 작업이 끝나지 못했다는 뜻)에는 작업자 쓰레드가 IndexError
예외를 잡아낸다.
이제 각 단계별로 큐를 생성하고 각 단계에 맞는 작업 스레드를 만들어서 서로 연결하면 다음과 같은 모양이 된다.
threads = [
Worker(download, download_queue, upload_queue),
Worker(resize, resize_queue, upload_queue),
Worker(upload, upload_queue, done_queue),
]
for thread in threads:
thread.start()
for _ in range(1000):
download_queue.put(object())
샘플 코드라서 구체적인 구현은 없지만, 코드는 이런 구조를 띌 것이다.
동작에는 문제가 없다. 하지만, 쓰레드들이 작업을 기다리면서 큐를 폴링하기 때문에, run
메서드 안에서 IndexError
예외를 잡아내는 부분이 상당히 많이 실행될 수 밖에 없다.
예를 들면 1000개의 아이템을 처리하는데 폴링을 3009번 했다는 출력이 나온다던가..
왜냐하면 함수 마다 작업 속도가 다르기 때문에, 앞에 있는 단계가 그보다 더 뒤에 있는 단계의 진행을 방해하면서, 뒤에 있는 단계는 작업을 받지 못하는 starvation
상태가 된다.
이로 인해 새로운 작업이 들어왔는지 계속 자신의 입력 큐를 검사하느라 CPU Time을 잡아먹게 된다.
### 작업 흐름: 3단계 파이프라인 (Download → Resize → Upload)
download_queue (Input) resize_queue upload_queue done_queue
│ │ │ │
▼ ▼ ▼ ▼
+----------+ +----------------+ +----------------+ +------+
| Download | -----> | Resize Worker | ----> | Upload Worker | ----> | Done |
+----------+ +----------------+ +----------------+ +------+
### Starvation 문제 상황 예시
Step 1: Download 속도가 빠름 (Download Queue에 데이터가 많음)
──────────────────────────────────────────────
download_queue (★★★★★) resize_queue (□) upload_queue (□) done_queue (□)
Step 2: Resize 속도가 느림 (Download된 작업이 Resize Queue에서 대기)
──────────────────────────────────────────────
download_queue (☆☆☆☆☆) resize_queue (★★★★) upload_queue (□) done_queue (□)
Step 3: Upload는 작업이 없어서 계속 큐를 폴링 (CPU 낭비)
──────────────────────────────────────────────
download_queue (☆☆☆☆☆) resize_queue (★★★★) upload_queue (□) done_queue (□)
↑ ↑
(Resize가 늦어서) (Upload는 계속 폴링하면서 대기 상태)
근데 이거 말고도 문제점이 세 가지나 더 있다.
busy-waiting
을 수행해야 한다.run
method가 루프를 무한히 반복한다. 현재 구조에서는 Worker Thread에게 루프를 중단할 시점을 알려줄 방법이 없다.여기서 교훈은 제대로 작동하는 생산자-소비자 큐를 직접 구현하기가 어렵다는 것이다. 그래서 그럴 필요없이, Queue
를 사용하라는 뜻이다.
queue 내장 모듈에 있는 Queue 클래스는 앞서 말한 모든 문제를 해결할 수 있는 기능을 제공한다고 한다.
1. 새로운 데이터가 나타날 때 까지 get method를 block -> busy-waiting
문제를 해결
자세한 프로세스는 아래와 같다.
Queue.get(block=True)
를 호출하면, 큐가 비어 있을 경우 현재 쓰레드가 자동으로 대기 상태(Wait)으로 들어감.이렇게 구현하게 되면, 계속 풀링하면서 CPU를 사용하지 않아도 되므로 busy-waiting
이 발생하지 않는다.
코드로 보면 다음과 같다.
class MyQueue:
def __init__(self):
self.items = deque()
self.lock = Lock()
def put(self, item):
with self.lock:
self.items.append(item)
def get(self):
while True:
with self.lock:
print('get from my queue')
if self.items:
return self.items.popleft()
sample_queue = MyQueue()
def sample_consumer():
print('sample consumer waiting\n')
sample_queue.get()
print('sample consumer done\n')
thread = Thread(target=sample_consumer)
thread.start()
print('sample producer putting\n')
sample_queue.put('hello')
print('sample producer done\n')
thread.join()
>>>
sample consumer waiting
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
sample producer putting
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
get from my queue
sample producer done
get from my queue
sample consumer done
get from my queue 가 매우 많이 출력된다. busy-waiting
이 발생하는 것을 알 수 있다.
from queue import Queue
from threading import Thread
my_queue = Queue()
def consumer():
print('consumer waiting')
my_queue.get()
print('consumer done')
thread = Thread(target=consumer)
thread.start()
print('producer putting')
my_queue.put('hello')
print('producer done')
thread.join()
>>>
consumer waiting
producer putting
producer done
consumer done
이렇게 busy waiting이 발생하지 않는다.
물론 직접 구현한 큐에서도 블락을 하면 되기는 하는데, 이렇게 하나하나 구현하기 보다는 클래스를 사용하는 게 훨씬 안정적이라는 것이다.
2. 파이프라인 중간이 막히는 경우를 해결하기 위해, 두 단계 사이의 미완성 작업의 최대 개수를 지정할 수 있다.
이렇게 버퍼 크기를 정하면 큐가 이미 가득찼을 경우, put
이 블록된다.
3. Queue 클래스의 task_done
메서드를 통해 작업의 진행을 추적할 수 있다. 이 메서드를 사용하면 입력 큐가 모두 소진될 때까지 기다릴 수 있다. 이로 인해 파이프라인의 마지막단계를 폴링할 필요가 없어진다.
정확한 설명은 다음과 같다.
queue.put()
을 호출할 때마다 작업 개수가 증가.queue.get()
으로 작업을 꺼낸다. 이 때 "처리해야 할 작업 개수"는 변하지 않음task_done()
을 호출하면 작업 개수가 감소.queue.join()
은 작업 개수가 0이 될 때까지 블로킹됨.결론 : 파이프라인을 구축할 땐 Queue 클래스를 활용하자.
프로그램이 커지면서 범위와 복잡도가 증가함에 따라 동시에 실행되는 여러 실행 흐름이 필요해지는 경우가 많다.
파이썬은 팬아웃과 팬인을 지원하는 여러 가지 내장 도구를 제공하며, 각 도구는 서로 다른 장단점을 지니므로, 각 접근 방식의 장단점을 이해하고 상황에 따라 원하는 작업에 가장 알맞은 도구를 택해야 한다고 한다.
이 도구들에 대해서는 57~59에서 설명할 예정이다.