[Python] Synchronize Threads

dony·2024년 2월 20일
0

Python

목록 보기
4/4
post-thumbnail

파이썬에서 GIL은 멀티스레딩 환경에서 단일 스레드만 파이썬 바이트코드를 실행하도록 제한한다. 덕분에 참조 카운팅과 같은 메모리 관리 메커니즘을 안전하게 수행할 수 있다. 그렇다면 파이썬은 lock이 필요할까? GIL은 중요한 역할을 하지만, 그렇다고 모든 동시성 문제를 해결해주지는 않는다.

다수의 스레드가 동일한 값을 읽고 쓰는 경우에는 여전히 lock이 필요하다. 결국은 동기화 문제를 해결해야 한다는 것인데, 파이썬은 동기화를 위해 어떠한 프리미티브(primitives)를 제공하고 있을까?

간단한 설명과 함께 파이썬이 제공하는 스레드 동기화 도구들을 알아보자. 해당 내용은 해외 블로그파이썬 공식 문서를 주로 참고하였다.

Lock

Lock은 파이썬에서 가장 심플한 동기화 프리미티브이다. Lock은 locked와 unlocked 두 가지 상태만 존재한다. 여기에 사용되는 메서드도 acquire()과 release()로 매우 단순하다. 주의할 점은 unlocked 상태에서 release()를 호출할 경우 RunTimeError가 발생한다.

from threading import Lock, Thread

lock = Lock()
total = 0


def add_one():
    global total

    lock.acquire()
    total += 1
    lock.release()


def add_two():
    global total

    lock.acquire()
    total += 2
    lock.release()


threads = []

for func in [add_one, add_two]:
    threads.append(Thread(target=func))
    threads[-1].start()

for thread in threads:
    thread.join()

print(total)

RLock

기존의 Lock은 어떤 스레드가 lock을 획득한지 알지 못한다. 누군가 락을 소유하고 있다면, 다른 스레드가 lock을 획득하려고 시도해도 block 된다. 심지어 스레드 자기 자신이 lock을 보유하고 있어도 마찬가지다.

RLock(re-entrant lock)은 이러한 상황을 해결할 수 있다. 자세한 건 아래 코드를 통해 이해해보자.

import threading

num = 0
lock = threading.Lock()

lock.acquire()
num += 1
lock.acquire()  # block
num += 2
lock.release()


lock = threading.RLock()

lock.acquire()
num += 3
lock.acquire()  # not block
num += 4
lock.release()
lock.release()  # call release once for each call to acquire

print(num)

Semaphore

운영체제에서 필수적으로 등장하는 세마포어다. 세마포어의 경우 특정 수만큼의 스레드가 acquire()를 시도해야만 block 된다. 세마포어의 카운터는 acquire()가 호출될 때마다 감소하고, release()가 호출될 때마다 증가한다.

파이썬은 Semaphore와 BoundedSemaphore 클래스 두 가지를 제공한다. Semaphore의 경우 release()에 대한 상한선이 없어서, 계속해서 release()가 가능하다. 반면 BoundedSemaphore의 경우 설정해둔 최댓값을 넘어서는 release()를 호출할 경우 에러를 일으킨다. 대부분의 경우 복잡한 프로그래밍 에러를 피하기 위해서, BoundedSemaphore를 선택하면 된다.

import random, time
from threading import BoundedSemaphore, Thread

max_items = 5  # default 1 item
container = BoundedSemaphore(max_items)


def producer(nloops):
    for _ in range(nloops):
        time.sleep(random.randrange(2, 5))
        print(time.ctime(), end=": ")
        
        try:
            container.release()
            print("Produced an item.")
        except ValueError:
            print("Full, skipping.")


def consumer(nloops):
    for _ in range(nloops):
        time.sleep(random.randrange(2, 5))
        print(time.ctime(), end=": ")

        if container.acquire(blocking=False):
            print("Consumed an item.")
        else:
            print("Empty, skipping.")


threads = []
nloops = random.randrange(3, 6)
print("Starting with %s itmes." % max_items)

threads.append(Thread(target=producer, args=(nloops, )))
threads.append(Thread(target=consumer, args=(random.randrange(nloops, nloops + max_items + 2), )))

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

print("All done.")

Event

Event 동기화 프리미티브는 스레드 사이에서 간단한 커뮤니케이터로 작동한다. 스레드는 내부 플래그를 set() 혹은 clear()로 설정할 수 있으며, 다른 스레드들은 wait()를 통해 플래그가 set()이 될 때까지 대기한다. wait() 메서드를 사용하면 플래그가 true로 설정될 때까지 block 상태로 대기한다.

import random, time
from threading import Event, Thread

event = Event()


def waiter(event, nloops):
    for i in range(nloops):
        print("%s. Waiting for the flag to be set." % (i+1))
        event.wait()  # blocks until the flag become true
        print("Wait complete at:", time.ctime())
        event.clear()  # resets the flag

        print()


def setter(event, nloops):
    for _ in range(nloops):
        time.sleep(random.randrange(2, 5))  # sleeps for some time
        event.set()


threads = []
nloops = random.randrange(3, 6)

threads.append(Thread(target=waiter, args=(event, nloops)))
threads[-1].start()

threads.append(Thread(target=setter, args=(event, nloops)))
threads[-1].start()

for thread in threads:
    thread.join()

print("All done.")

Condition

Condition 객체는 Event 객체보다 향상된 버전이다. 스레드 간 커뮤니케이터로 동작할 뿐만 아니라, 다른 스레드들에게 프로그램의 상태 변환을 알릴 수 있는 notify() 메서드를 사용할 수 있다.

예를 들어 리소스의 가용성에 대한 정보를 보낼 수 있다. 다른 스레드들은 wait()으로 대기하고 있다가 condition 객체의 lock을 획득하려고 한다. 아래 코드에서는 producer와 consumer 사이의 간단한 예시를 보여주고 있다.

import random, time
from threading import Condition, Thread

condition = Condition()
box = []


def producer(box, nitems):
    for _ in range(nitems):
        time.sleep(random.randrange(2, 5))

        condition.acquire()
        num = random.randint(1, 10)
        box.append(num)  # puts an item into box for consumption

        condition.notify()  # notifies the consumer about the availability
        print("Produced:", num)
        condition.release()


def consumer(box, nitems):
    for _ in range(nitems):
        condition.acquire()
        condition.wait()  # blocks until an item is available for consumption
        print("%s: Acquired: %s" % (time.ctime(), box.pop()))
        condition.release()


threads = []
nloops = random.randrange(3, 6)

for func in [producer, consumer]:
    threads.append(Thread(target=func, args=(box, nloops)))
    threads[-1].start()

for thread in threads:
    thread.join()

print("All done.")

한 가지 의문이 들었던 것은 producer가 notify()로 consumer에게 알린 후 print 구문 이후 lock을 release하는데, notify와 release 사이에 깨어난 다른 스레드들은 스핀락처럼 lock을 획득하려고 계속 cpu를 점유하게 되는 것인가였다.

chatGPT 4에 따르면 notify와 release 사이의 시간은 매우 짧고, notify를 호출한 스레드가 lock을 해제하기 전까지는 signal을 받은 스레드가 실제로 lock을 획득할 수 없기 때문에, notify가 아닌 lock을 해제한 순간부터 대기 중이던 스레드들이 경쟁하게 된다고 한다. 결과적으로 그 사이 동안 스레드가 lock을 획득하기 위해 CPU 자원을 소모하지는 않는다고 한다.

Barrier

barrier는 서로 다른 스레드들을 기다리는데 사용할 수 있는 간단한 동기화 프리미티브다. 각 스레드는 barrier를 통과하기 위해 wait() 메서드를 호출하고, 모든 스레드의 작업이 끝날 때까지 block 된다. 모든 스레드가 대기하는 순간, 스레드들은 동시에 release된다. 아래 코드를 실행시켜 보면 이해하기 쉽다.

from random import randrange
from threading import Barrier, Thread
from time import ctime, sleep

num = 4
barrier = Barrier(num)
names = ["Dony", "George", "Isabel", "Bin"]


def player():
    sleep(randrange(2, 5))

    name = names.pop()
    print("%s reached the barrier at: %s" % (name, ctime()))
    barrier.wait()


threads = []
print("Race starts now...")

for _ in range(num):
    threads.append(Thread(target=player))
    threads[-1].start()

for thread in threads:
    thread.join()

print()
print("Race over!")

Reference

https://betterprogramming.pub/synchronization-primitives-in-python-564f89fee732
Synchronization Primitives: https://docs.python.org/3/library/asyncio-sync.html
Thread-based parallelism: https://docs.python.org/3/library/threading.html
Semaphore vs BoundedSemaphore: https://stackoverflow.com/questions/48971121/what-is-the-difference-between-semaphore-and-boundedsemaphore
GIL과 lock: https://taejoone.jeju.onl/posts/2022-06-25-py-thread-safe-lock/

0개의 댓글