Monitor(모니터)에 대해 알아보고 동기화를 이해하기

승톨·2021년 2월 4일
3
post-thumbnail
post-custom-banner

Intro

Pintos 구현 중 동기화 관련 코드리딩 중 직관적으로 이해하기 어려운 부분을 만났다. Monitor라는 녀석이었는데, 간단하게 짚고 넘어가기에는 아쉬워서 공부를 빠싹하게 해보았다.

위키피디아, pintos-kaist 자료 등을 참고해서 이해했는데, 굉장히 잘 설명되어 있어서 Monitor에 대해 헷갈려하는 분들도 이 자료를 한번 읽어보면 감을 잡으실 것 같다. 그런 의미에서 관련 자료 영문 중 필요한 부분을 모두 번역해보았다.

아래는 pintos-kaist의 자료이다.

pintos-kaist : Monitors

  • monitor는 semaphore나 lock보다 더 높은 단계의 동기화 형태이다. monitor는 동기화 되고 있는 데이터, lock(monitor lock이라고 부르는), 몇개의 condition variable로 구성되어 있다.(줄여서 CV라고 하겠다. )

  • 보호받는 데이터에 접근하기 전에, 쓰레드는 첫번째로 monitor lock을 획득한다. 이걸 "in the monitor"에 있다고 한다. "in the monitor" 중에, 쓰레드는 보호받는 모든 데이터에 대해 자유롭게 실행하거나 변경하는 접근 권한을 가진다.

  • 데이터 접근이 완료되면 monitor lock을 놓아준다.(release)

  • CV는 "in the monitor"에 있는 코드가 특정 컨디션이 참이 될 때 까지 기다리게 하는 역할이다.
    각각의 CV는 추상적인 상태와 연관 되어있다.(ex. 어떤 데이터가 처리(processing)을 위해 도착했다, 유저의 마지막 타이핑 이후 10초가 지나 pass된다. )

  • "in the monitor"의 코드가 특정 상태가 참이 될때까지 기다릴 때, 그 코드는 연관된 cv 위에서 기다린다. 여기서 연관된 cv란 lock을 놓아주고, wait을 해제할 시그널을 보내는 cv를 말한다.

  • 한편으로, 여러 상태 중의 하나가 true가 되면 하나의 대기자(waiter)를 깨우는 시그널을 보내거나(signal), 모든 대기자를 깨우는 시그널을 보낸다.(broadcast)

  • 모니터를 위한 이론적 프레임워크는 C.A.R.Hoare를 바탕으로 한다. 실제 사용은 이후에 Mesa OS에 대한 논문에서 자세히 설명된다.


아래는 위키피디아 자료이다. 내용이 많아서 필요해보이는 부분만 편집해서 번역했다. 영어가 더 이해하기 쉬울 것 같은 부분은 원문을 그대로 사용했다.

추천하는 리딩 방법은 1. 글만 보면서 코드는 슥 읽어보기 / 2.다시 코드만 따로 뜯어보기이다.

참고로, 나는 전문 번역가가 아니다. 번역하면서 의역을 한 부분이 있고, 오역도 존재 할 수 있다. 그러니 위키피디아와 pintos-kaist 모두 걸어놓은 링크로 가서 꼭 원문을 살펴보기 바란다.

Wikipedia - Monitor (synchronization)

  • 동시성 프로그래밍에서, monitor는 쓰레드들이 mutual exclusion과 특정 컨디션이 false가 되면 wait되는 능력을 가진 '동기화 construct'이다.

  • monitor는 또한 다른 쓰레드들에게 그들의 특정 상태가 충족되면 시그널을 보내는 메커니즘을 가지고 있다.

  • monitor는 mutex(lock) 오브젝트와 condition variable들로 구성되어 있다. condition variable은 특정 상태를 기다리는 쓰레드들의 컨테이너이다.

  • monitor는 쓰레드들이 (exclusive access를 다시 획득하고 그들의 일을 재개하기 전) 특정 상태가 충족되길 기다리기 위해 임시적으로 exclusive access를 포기하는 메커니즘도 제공한다.

  • monitor의 또 다른 정의는 thread-safe한 클래스, 오브젝트 혹은 모듈이다. 한 개 이상의 쓰레드에 의해 variable이나 method에 안전하게 access를 허가하기 위해 mutex를 감싼 개념이라고 볼 수 있다.

모니터는 Hansen과 Hoare에 의해 개발되었고, Hansen의 Pascal에서 처음 구현되었다.

Mutual exclusion

간단한 예시를 보자. 은행 계좌에서 트랜잭션을 수행하는 thread-safe 객체를 생각해보자.

monitor class Account {
    private int balance := 0
    invariant balance >= 0

    public method boolean withdraw(int amount)
        precondition amount >= 0
    {
        if balance < amount {
            return false
        } else {
            balance := balance - amount
            return true
        }
    }

    public method deposit(int amount)
        precondition amount >= 0
    {
        balance := balance + amount
    }
}
  • 쓰레드는 thread-safe 객체의 메소드를 실행하는 동안 mutex를 가지고 있음으로써 object를 소유하고 있다고 할 수 있다.

  • thread-safe 객체들은 한번에 한개의 쓰레드만 객체를 소유할 수 있게 강제되도록 구현되어있다.

  • lock은 처음에 unlocked되어있는데, 각 public method의 시작에는 잠금된다. 그리고 public method가 리턴되는 시점에 잠금 해제된다.

  • 메소드 중의 하나를 호출하면, 하나의 쓰레드는 그 객체의 메소드를 실행하기 전에 다른 쓰레드들이 thread-safe 객체의 메소드를 실행할 수 없을때까지 반드시 기다려야 한다.

  • 이런 mutual exclusion이 없으면 2개의 쓰레드는 아무 이유없이 돈이 사라지거나 불어날 수 있다.

  • 아래의 코드는 위의 계좌 예시에서 syntactic sugar인 "monitor class"를 구현한 것이다. 각각의 함수 실행에는 mutex가 포함되어 있다.

class Account {
    private lock myLock

    private int balance := 0
    invariant balance >= 0

    public method boolean withdraw(int amount)
       precondition amount >= 0
    {
        myLock.acquire()
        try {
            if balance < amount {
                return false
            } else {
                balance := balance - amount
                return true
            }
        } finally {
            myLock.release()
        }
    }

    public method deposit(int amount)
       precondition amount >= 0
    {
        myLock.acquire()
        try {
            balance := balance + amount
        } finally {
            myLock.release()
        }
    }
}

Condition variables

**Problem state

  • 동기화를 위해 대개 mutual exclusion으로는 충분하지 않다. 연산을 시도하는 쓰레드들은 특정 컨디션 P가 참이 될 때까지 기다릴 필요가 있다.

  • 이를 위해 모니터를 unlock하고, 특정 시간을 기다리고, 모니터를 lock하고 컨디션 P를 살피는 루프를 가지는 해결책이 존재한다. 이론적으로 동작하는 해결책이고, 데드락 상태를 유발하지 않는다.

  • 그러나 구현 시 이슈가 생긴다. 적절한 대기시간을 산정하는게 어렵다. 너무 작으면 쓰레드가 cpu를 독차지할 것이고, 너무 크면 명백히 효과가 없을 것이다.

  • 그래서 필요한 방법이 컨디션 P가 참이 될 때 시그널을 보내는 것이다.

Case study: classic bounded producer/consumer problem

클래식한 동시성 문제에는 bounded producer/consumer 문제가 있다.

이 문제의 조건은 다음과 같다.

  • 최대 크기를 가진 task의 ring buffer 혹은 queue가 있고,
  • 하나 이상의 쓰레드가 큐에 task를 넣는 producer 쓰레드가 있고,
  • 하나 이상의 쓰레드가 큐에서 task를 빼는 consumer 쓰레드가 있다.
  • 큐는 non-thread-safe하다고 가정한다. 이 큐는 empty하거나 full하거나 그 중간 상태가 될 수도 있다.
  • 큐가 full할 때마다 consumer thread가 task를 큐에서 빼서 데이터 넣을 공간이 생길 때까지 producer thread는 블록 되어야 한다.
  • 또한 큐가 empty할 때마다, producer thread가 task를 큐에 넣어서 데이터가 생길 때까지 consumer thread는 블록 되어야 한다.

큐는 쓰레드들 사이의 동시성 객체 이므로, atomic하게 접근이 되어야 한다. 쓰레드들 사이에 노출되면 안 되는 "큐 access 과정" 동안 큐는 inconsistent한 상태가 될 수 있기 때문이다.

따라서 큐가 구성하는 임계영역(critical section)에 접근하는 코드는 반드시 mutual exclusion으로 동기화 되어야 한다. 그렇지 않으면 race conditions 을 야기할 위험이 있다.

Incorrect without synchronization

먼저 단순하게 위의 조건을 구현한 코드를 보자.

global RingBuffer queue; 
// A thread-unsafe ring-buffer of tasks.
// Method representing each producer thread's behavior:*
publi method producer() {
    while (true) {
        task myTask = ...; *// Producer makes some new task to be added.*
		while (queue.isFull()) {} *// Busy-wait until the queue is non-full.*
		queue.enqueue(myTask); *// Add the task to the queue.*
		}
}

// Method representing each consumer thread's behavior:*
public method consumer() {
    while (true) {
        while (queue.isEmpty()) {} *// Busy-wait until the queue is non-empty.*
				myTask = queue.dequeue(); *// Take a task off of the queue.*
				doStuff(myTask); *// Go off and do something with the task.*
		}
}

이 코드는 큐에 접근하는 것이 다른 스레드들의 큐 접근과 중첩되고 인터럽트 될 수 있다는 점에서 심각한 문제를 가지고 있다.

  • queue.enququequeue.dequeue 메소드는 큐의 member 변수(size, beginning, ending position, assignment, allocation of queue elements 등)를 업데이트하는 명령어를 가지고 있을 것이다.

  • 게다가 queue.isEmpty()queue.isFull() 메소드는 공통된 상태를 읽으려 할 것이다.

  • 만약 producer/consumer 쓰레드가 enqueue/dequeue 호출 동안 중첩된다면, 큐의 불안정한 상태는 race condition을 야기할 수 있을 것이다.

  • 게다가 만약 하나의 consumer 스레드가 다른 consumer 스레드가 busy-wait을 나가고 dequeue 를 호출하는 사이에 큐를 empty하게 만들고,

  • 그 2번째 consumer 스레드가 비어버린 큐에 dequeue 를 시도하면, 그건 에러를 야기할 것이다.

  • 마찬가지로, 만약 하나의 producer 스레드가 또 다른 producer 스레드가 busy-wait을 나가고 enquque를 호출하는 사이에 큐를 full하게 만들고,

  • 이어서 그 2번째 producer 스레드가 full queue에 데이터를 추가하려고 하면 에러를 야기할 것이다.

Spin-waiting

동기화를 달성하기 위한 단순한 접근 중 하나는 "spin-waiting"을 사용하는 것이다.

골자는 다음과 같다.

busy-waiting은 여전히 사용되고 있으니, 각각의 busy-wait check 사이에 lock을 획득했다가 놓아주는 방법으로 하나의 뮤텍스가 코드의 임계영역을 보호하는 것이다.

global RingBuffer queue; *// A thread-unsafe ring-buffer of tasks.*
global Lock queueLock; *// A mutex for the ring-buffer of tasks.

// Method representing each producer thread's behavior:*
**public** method producer() {
    **while** (true) {
        task myTask = ...; *// Producer makes some new task to be added.
		queueLock.acquire(); *// Acquire lock for initial busy-wait check.*
	while (queue.isFull()) { *// Busy-wait until the queue is non-full.*
        	queueLock.release();
            *// Drop the lock temporarily to allow a chance for other threads
	// needing queueLock to run so that a consumer might take a task.*
		queueLock.acquire(); *// Re-acquire the lock for the next call to "queue.isFull()".*
				}
        queue.enqueue(myTask); *// Add the task to the queue.*
	queueLock.release(); *// Drop the queue lock until we need it again to add the next task.*
			}
}

*// Method representing each consumer thread's behavior:*
**public** method consumer() {
    while (true) {
        queueLock.acquire(); *// Acquire lock for initial busy-wait check.*
	while (queue.isEmpty()) { *// Busy-wait until the queue is non-empty.*
		queueLock.release();
            *// Drop the lock temporarily to allow a chance for other threads
		// needing queueLock to run so that a producer might add a task.*
		queueLock.acquire(); *// Re-acquire the lock for the next call to "queue.isEmpty()".*
				}
        myTask = queue.dequeue(); *// Take a task off of the queue.*
	queueLock.release(); *// Drop the queue lock until we need it again to take off the next task.*
	doStuff(myTask); *// Go off and do something with the task.*
		}
}

이 방법은 inconsistent 상태가 발생하지않는 것을 보장한다. 그러나 불필요한 busy-waiting 때문에 cpu 자원을 낭비한다.

만약 큐가 empty하고 producer 스레드가 오랜시간 동안 아무것도 큐에 넣지 않더라도, consumer thread는 항상 불필요하게 busy-waiting 해야하는 것이다.

마찬가지로 만약 consumer 스레드가 오랜시간동안 블록 되어있고, 큐가 가득차있더라도, producer 스레드는 항상 busy-waiting 해야하는 것이다. 이건 낭비가 있는 메커니즘이다.

필요한 것은 producer 스레드는 큐가 non-full일 때까지 블록하게 만들고, consumer 스레드는 큐가 non-empty일 때까지 블록하게 만드는 것이다.

Mutex 그들 자신은 또한 spin-lock이 될 수 있다. (spin-lock이라 함은 lock을 얻기 위해 busy-waiting을 수반하지만, 낭비되는 CPU 문제를 풀수 있는 개념이다.) 그러나 여기서 우리가 쓰는 queuelock 은 spin-lock이 아니라고 가정한다.

Condition variables

위의 문제를 해결할 해결책은 condition variables를 쓰는 것이다.(줄여서 cv라고 부르겠다.)

cv는 개념적으로 monitor와 관련된, 스레드의 큐이다. 스레드는 이 큐에서 특정 상태가 참이 될때까지 기다리게 된다. 따라서 각각의 cv c 는 assertion Pc 와 관련되어 있다.

(여기서 assertion을 다루진 않으려고 한다. 일단 "특정 상태"라고 이해하면 될 것 같다.)

thread가 cv에서 기다리고 있는 동안 그 쓰레드는 monitor를 소유할 수 없다. 그리고 다른 스레드는 monitor에 들어가 montior의 상태를 바꿀 수 있다. 대부분, 다른 스레드들은 assertion Pc 가 현재 참이 되었다고 cv c 에게 시그널을 보낸다.

아래는 cv의 주요 연산(함수)이다.

`wait c, m`**

  • c는 cv이고 m은 뮤텍스이다.

  • 이 함수는 스레드에 의해 호출되는데, assertion Pc가 참이 될 때까지 기다리게 만들기 위함이다. 스레드가 기다리는 동안 monitor를 소유할 수 없다.

    1. Atomically: (이 연산은 atomic하게 이루어진다. 즉, 아래의 순서가 진행될 동안은 외부에게 방해받아선 안된다.)

      1. 뮤텍스 m 을 놓아준다.
      2. 현재 스레드를 running 상태에서 cwait-queue 로 보낸다.
      3. 그 스레드를 잠재운다(블록). Context는 다른 스레드에게 양도된다.(CPU 자원 양도)
    2. 그 스레드가 시그널을 받아서 다시 일을 할 수 있다면, 자동적으로 뮤텍스 m 을 획득한다.

  • 1-1 단계와 1-2 단계는 어느쪽이든 먼저 발생할 수 있다. 1-3은 앞에 2개 이후에 발생한다.

  • 스레드가 블록 되어 있고 cwait-queue 에 있는 동안 다음 program counter는 2단계 에서 실행되게 준비된다. 따라서 그 스레드가 이후에 깨어나면(블록 해제) 2단계부터 실행된다.

  • 1단계의 연산의 atomicity는 race condition을 피하기 위해 중요하다.

  • atomic하지 못해 한 가지 실패하는 경우는 missed wakeup 이 된 경우이다. (missed wakeup 은 자세히 다루지 않으려고 한다.)

signal c =  notify c

  • assertion Pc 가 참이 되면 스레드가 호출하는 함수이다.
  • 이 함수는 한 개 이상의 스레드가 c 의 sleep-queue에서 ready queue로 이동하게 만든다.
  • 보통 c 와 관련된 뮤텍스 m 을 놓아주기 전에 signal/notify를 수행하는게 가장 좋은 구현이다. 그러나 signal 전에 뮤텍스 m하는 것도 합리적이다.

broadcast c = notifyAll c

  • signal 함수와 동일한 조건이지만 cwait-queue 안에 있는 모든 쓰레드들을 깨우는 함수이다.
  • wait-queue를 비우게 된다.
  • 일반적으로 똑같은 cv에 연관된 2개 이상의 상태가 있을 때, 스레드는 signal 대신에 broadcast를 필요로 한다.
  • 왜냐하면 잘못된 상태에 대기한 스레드가 일어날 수 있고, 상태가 참이 된 올바른 상태를 기다리고 있는 쓰레드를 깨우는 것 없이 그 스레드가 다시 잠에 들 수 있기 때문이다.
  • 리스트의 여러 스레드들 중 즉 상태가 참이 되어 깨어나야할 스레드를 일단 깨우기 위해 broadcast를 쓴다고 이해하면 된다.
  • 만약 cv에 연관된 상태가 1개라면, broadcast보다 signal을 쓰는게 효과적이다.

디자인 룰에 따라, 여러개의 cv는 똑같은 뮤텍스에 연관될 수 있지만, 역은 성립하지 않는다.(즉, 뮤텍스와 cv의 관계는 1:N 관계이다.

Pc 는 모니터를 사용하는 모든 쓰레드에게 동일하고, 컨디션을 읽거나 변경할 수 있는 다른 쓰레드들로부터 mutual exclusion을 기반으로 Pc 를 보호해야하는데,
똑같은 뮤텍스를 필요로하는 같은 variable 내에 다른 상태를 기다리고 싶어하는 다른 쓰레드들이 존재하기 때문이다.

위에서 언급한 producer-consumer 예시에서 큐는 unique한 뮤텍스에 의해 보호되어야 한다.

producer 쓰레드는 뮤텍스 mc_full (큐가 non-full이 될 때까지 블록하는 cv)를 사용해 monitor에서 기다리고 싶을 것이다.

consumer 쓰레드는 똑같은 뮤텍스 m 를 사용하지만 다른 cv인 c_empty (큐가 non-empty가 될 때까지 블록하는 cv)를 사용하는 다른 monitor에서 기다리고 싶을 것이다.

→ 즉, 여기서 2개의 쓰레드는 각각의 monitor를 사용한다.

Monitor usage

기본적인 모니터 사용은 아래와 같다.

acquire(m); // Acquire this monitor's lock.
while (!p) { // While the condition/predicate/assertion that we are waiting for is not true...
	wait(m, cv); // Wait on this monitor's lock and condition variable.
}
// ... Critical section of code goes here ...
signal(cv2); -- OR -- notifyAll(cv2); // cv2 might be the same as cv or different.
release(m); // Release this monitor's lock.

위의 코드를 좀더 정밀하게 설명한 버전은 아래와 같다. 이 코드의 주석은 처음에는 너무 깊게 보지않는 것을 추천드린다.

// About to enter the monitor.
// Acquire the advisory mutex (lock) associated with the concurrent data that is shared between threads, 
// to ensure that no two threads can be preemptively interleaved or
// run simultaneously on different cores while executing in critical
// sections that read or write this same concurrent data. If another
// thread is holding this mutex, then this thread will be put to sleep
// (blocked) and placed on m's sleep queue.  (Mutex "m" shall not be
// a spin-lock.)
acquire(m);
// Now, we are holding the lock and can check the condition for the
// first time.

// The first time we execute the while loop condition after the above
// "acquire", we are asking, "Does the condition/predicate/assertion
// we are waiting for happen to already be true?"

while (!p()) 	// "p" is any expression (e.g. variable or 
		// function-call) that checks the condition and
		// evaluates to boolean.  This itself is a critical
		// section, so you *MUST* be holding the lock when
		// executing this "while" loop condition!
				
// If this is not the first time the "while" condition is being checked,
// then we are asking the question, "Now that another thread using this
// monitor has notified me and woken me up and I have been context-switched
// back to, did the condition/predicate/assertion we are waiting on stay
// true between the time that I was woken up and the time that I re-acquired
// the lock inside the "wait" call in the last iteration of this loop, or
// did some other thread cause the condition to become false again in the
// meantime thus making this a spurious wakeup?

{
	// If this is the first iteration of the loop, then the answer is
	// "no" -- the condition is not ready yet. Otherwise, the answer is:
	// the latter.  This was a spurious wakeup, some other thread occurred
	// first and caused the condition to become false again, and we must
	// wait again.

	wait(m, cv);
		// Temporarily prevent any other thread on any core from doing
		// operations on m or cv.
		// release(m) 		// Atomically release lock "m" so other
		//			// code using this concurrent data
		// 			// can operate, move this thread to cv's
		//			// wait-queue so that it will be notified
		//			// sometime when the condition becomes
		// 			// true, and sleep this thread. Re-enable
		//			// other threads and cores to do 
		//			// operations on m and cv.
		//
		// Context switch occurs on this core.
		//
		// At some future time, the condition we are waiting for becomes
		// true, and another thread using this monitor (m, cv) does either
		// a signal/notify that happens to wake this thread up, or a
		// notifyAll that wakes us up, meaning that we have been taken out
		// of cv's wait-queue.
		//
		// During this time, other threads may cause the condition to
		// become false again, or the condition may toggle one or more
		// times, or it may happen to stay true.
		//
		// This thread is switched back to on some core.
		//
		// acquire(m)		// Lock "m" is re-acquired.
		
	// End this loop iteration and re-check the "while" loop condition to make
	// sure the predicate is still true.
	
}

// The condition we are waiting for is true!
// We are still holding the lock, either from before entering the monitor or from
// the last execution of "wait".

// Critical section of code goes here, which has a precondition that our predicate
// must be true.
// This code might make cv's condition false, and/or make other condition variables'
// predicates true.

// Call signal/notify or notifyAll, depending on which condition variables'
// predicates (who share mutex m) have been made true or may have been made true,
// and the monitor semantic type being used.

for (cv_x in cvs_to_notify) {
	notify(cv_x); -- OR -- notifyAll(cv_x);
}
// One or more threads have been woken up but will block as soon as they try
// to acquire m.

// Release the mutex so that notified thread(s) and others can enter their critical
// sections.
release(m);

Solving the bounded producer/consumer problem

cv를 사용해서 bounded producer/consumer 문제를 해결해보자.
여기선 2개의 모니터(큐를 위해 한개의 lock을 사용하고, 그 lock을 공유하는 2개의 cv)를 사용할 것이다.

global Lock queueLock;  	// A mutex for the ring-buffer of tasks. (Not a spin-lock.)
global CV queueEmptyCV; 	// A condition variable for consumer threads waiting for the queue to 
				// become non-empty.
                        	// Its associated lock is "queueLock".
global CV queueFullCV; 		// A condition variable for producer threads waiting for the queue 
				// to become non-full. Its associated lock is also "queueLock".

// Method representing each producer thread's behavior:
public method producer() {
    while (true) {
        task myTask = ...; // Producer makes some new task to be added.

        queueLock.acquire(); // Acquire lock for initial predicate check.
        while (queue.isFull()) { // Check if the queue is non-full.
            // Make the threading system atomically release queueLock,
            // enqueue this thread onto queueFullCV, and sleep this thread.
            wait(queueLock, queueFullCV);
            // Then, "wait" automatically re-acquires "queueLock" for re-checking
            // the predicate condition.
        }
        
        // Critical section that requires the queue to be non-full.
        // N.B.: We are holding queueLock.
        queue.enqueue(myTask); // Add the task to the queue.

        // Now the queue is guaranteed to be non-empty, so signal a consumer thread
        // or all consumer threads that might be blocked waiting for the queue to be non-empty:
        signal(queueEmptyCV); -- OR -- notifyAll(queueEmptyCV);
        
        // End of critical sections related to the queue.
        queueLock.release(); // Drop the queue lock until we need it again to add the next task.
    }
}

// Method representing each consumer thread's behavior:
public method consumer() {
    while (true) {
        queueLock.acquire(); // Acquire lock for initial predicate check.
        while (queue.isEmpty()) { // Check if the queue is non-empty.
            // Make the threading system atomically release queueLock,
            // enqueue this thread onto queueEmptyCV, and sleep this thread.
            wait(queueLock, queueEmptyCV);
            // Then, "wait" automatically re-acquires "queueLock" for re-checking
            // the predicate condition.
        }
        // Critical section that requires the queue to be non-empty.
        // N.B.: We are holding queueLock.
        myTask = queue.dequeue(); // Take a task off of the queue.
        // Now the queue is guaranteed to be non-full, so signal a producer thread
        // or all producer threads that might be blocked waiting for the queue to be non-full:
        signal(queueFullCV); -- OR -- notifyAll(queueFullCV);

        // End of critical sections related to the queue.
        queueLock.release(); // Drop the queue lock until we need it again to take off the next task.

        doStuff(myTask); // Go off and do something with the task.
    }
}

위의 구현은 task 큐를 공유하는 producer와 consumer 쓰레드 사이에 동시성을 보장한다. 그리고 busy-waiting 대신에 아무것도 하지않고 있는 스레드들을 블록시키는 것을 구현한다.

이 솔루션의 변형도 존재할 수 있다. 변형버전은 producer와 consumer 쓰레드들이 하나의 cv(queueFullOrEmptyCV 혹은 queue SizeChangedCV )를 사용하는 버전이다.

  • 이 케이스에서 한 개 이상의 컨디션이 cv와 연관되어 있다. 이 케이스의 cv는 개별 쓰레드의 의해 체크되는 컨디션보다 더 약한 컨디션을 나타낸다. cv는 non-full이 되는 큐를 기다리고 non-empty가 되는 큐를 기다리는 쓰레드들을 represent한다.
  • 그러나 모든 쓰레드가 그 cv를 사용하면 notifyAll을 사용해야한다. regular signal을 사용할 수 없다. 왜냐하면 regular signal은 아직 참이 되지않은, 잘못된 상태를 가진 스레드를 깨울 수도 있고 그 스레드가 올바른 상태를 가진, 참이 된 스레드에게 시그널을 주는 것(깨우기) 없이 다시 잠들 수 있기 때문이다.

예시 :

producer 스레드는 큐를 full하게 만들고 또 다른 producer 스레드를 깨울 수도 있다. 근데 잘못일어났다보니, 일어난 producer 스레드는 다시 잠든다.

consumer 스레드가 큐를 empty하게 만들 때도 마찬가지이다. 그러므로 notifyAll을 사용해야 올바른 타입의 일부 스레드를 깨울 수 있는걸 보장한다.

아래는 하나의 cv만 사용하고 notifyAll 을 사용하는 버전이다.

global volatile RingBuffer queue; // A thread-unsafe ring-buffer of tasks.
global Lock queueLock; // A mutex for the ring-buffer of tasks.  (Not a spin-lock.)
global CV queueFullOrEmptyCV; // A single condition variable for when the queue is not ready for any thread
                              // -- i.e., for producer threads waiting for the queue to become non-full 
                              // and consumer threads waiting for the queue to become non-empty.
                              // Its associated lock is "queueLock".
                              // Not safe to use regular "signal" because it is associated with
                              // multiple predicate conditions (assertions).

// Method representing each producer thread's behavior:
public method producer() {
    while (true) {
        task myTask = ...; // Producer makes some new task to be added.

        queueLock.acquire(); // Acquire lock for initial predicate check.
        while (queue.isFull()) { // Check if the queue is non-full.
            // Make the threading system atomically release queueLock,
            // enqueue this thread onto the CV, and sleep this thread.
            wait(queueLock, queueFullOrEmptyCV);
            // Then, "wait" automatically re-acquires "queueLock" for re-checking
            // the predicate condition.
        }
        // Critical section that requires the queue to be non-full.
        // N.B.: We are holding queueLock.
        queue.enqueue(myTask); // Add the task to the queue.
        // Now the queue is guaranteed to be non-empty, so signal all blocked threads
        // so that a consumer thread will take a task:
        notifyAll(queueFullOrEmptyCV); // Do not use "signal" (as it might wake up another producer instead).
        // End of critical sections related to the queue.
        queueLock.release(); // Drop the queue lock until we need it again to add the next task.
    }
}

// Method representing each consumer thread's behavior:
public method consumer() {
    while (true) {
        queueLock.acquire(); // Acquire lock for initial predicate check.
        while (queue.isEmpty()) { // Check if the queue is non-empty.
            // Make the threading system atomically release queueLock,
            // enqueue this thread onto the CV, and sleep this thread.
            wait(queueLock, queueFullOrEmptyCV);
            // Then, "wait" automatically re-acquires "queueLock" for re-checking
            // the predicate condition.
        }
        // Critical section that requires the queue to be non-full.
        // N.B.: We are holding queueLock.
        myTask = queue.dequeue(); // Take a task off of the queue.
        // Now the queue is guaranteed to be non-full, so signal all blocked threads
        // so that a producer thread will take a task:
        notifyAll(queueFullOrEmptyCV); // Do not use "signal" (as it might wake up another consumer instead).
        // End of critical sections related to the queue.
        queueLock.release(); // Drop the queue lock until we need it again to take off the next task.
        doStuff(myTask); // Go off and do something with the task.
    }
}

Semaphore

세마포어를 구현하는 thread-safe 클래스를 생각해보자. 이 클래스들은 private integer s 를 감소(P)하고 증가(V) 시키는 메소드들이다.

그러나 integer는 0 밑으로 감소될 수 없다. 따라서 감소하려는 스레드는 integer가 양수가 될 때까지 기다려야 한다. 그래서 우리는 cv sIsPositive 라는걸 쓴다.

assertion은 다음과 같다.

sIsPositive=(s>0)

monitor class Semaphore
{
    private int s := 0
    invariant s >= 0
    private Condition sIsPositive /* associated with s > 0 */
		public method P()
    {
        while s = 0:
            wait sIsPositive
        assert s > 0
        s := s - 1
    }

    public method V()
    {
        s := s + 1
        assert s > 0
        signal sIsPositive
    }
}

아래는 mutex 개념을 넣어서 만든 코드이다.

class Semaphore
{
    private volatile int s := 0
    invariant s >= 0
    private ConditionVariable sIsPositive /* associated with s > 0 */private Mutex myLock /* Lock on "s" */public method P()
    {
        myLock.acquire()
        while s = 0:
            wait(myLock, sIsPositive)
        assert s > 0
        s := s - 1
        myLock.release()
    }

    public method V()
    {
        myLock.acquire()
        s := s + 1
        assert s > 0
        signal sIsPositive
        myLock.release()
    }
}

Monitor implemented using semaphores

역으로 lock과 cv는 semaphore로도 구현할 수 있다. 따라서 monitors와 세마포어를 하나로 단순화 시킬 수 있다.

이 때 중요한 점은, 하나의 스레드가 monitor를 소유하고 있어야 하는데, 2개의 스레드가 소유할 수 있는 상황이 있을 수 있다.

이 문제를 해결하기 위해 사용되는 개념이 2가지 종류의 cv이다.

Blocking cv(Hoare-style monitors or signal-and-urgent-wait monitors.)

Nonblocking cv("Mesa style" condition variables or "signal and continue" condition variables)

여기까지 더 들어가진 않으려고 한다. 관심 있으신 분들은 저 키워드를 가지고 공부해보면 좋을 것 같다.

아래의 링크도 monitor에 대해 공부하기 좋은 자료인 것 같아서 첨부해본다.

https://cseweb.ucsd.edu/classes/fa05/cse120/lectures/120-l6.pdf

profile
소프트웨어 엔지니어링을 연마하고자 합니다.
post-custom-banner

0개의 댓글