병행 프로그램의 근본적인 문제
여러 개의 명령어들을 원자적으로 실행하고 싶지만, 단일 프로세서의 인터럽트로 인해 불가능
이 장에서는 앞서 다룬 락(Lock)을 이용하여 임계 영역을 하나의 원자 단위 명령어인 것처럼 실행되도록 해볼 것이다.
락을 사용하여 임계 영역(balance = balance+1)을 감쌀 수 있다.
lock_t mutex; // 글로벌 변수로 선언된 락
...
lock(&mutex);
balance = balance + 1;
unlock(&mutex);
락 변수: 락의 상태를 나타내는 변수
lock_t mutex;)lock/unlock 루틴
lock(): 락 획득을 시도한다.unlock(): 락 소유자가 unlock을 호출하면 락은 이제 다시 사용 가능한 상태가 된다.
- 락은 프로그래머에게 스케줄링에 대한 최소한의 제어권을 제공한다.
- 락으로 코드를 감싸서 프로그래머는 그 코드 내에서는 하나의 쓰레드만 동작하도록 보장한다.
POSIX 라이브러리는 락을 mutex라고 부른다.
(상호 배제, mutual exclusion)
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
Pthread_mutex_lock(&lock); // pthread_mutex_lock()을 위한 래퍼
balance = balance + 1;
Pthread_mutex_unlock(&lock);
핵심 질문: 락은 어떻게 만들까
- 효율적인 락은 낮은 비용으로 상호 배제 기법을 제공하고, 하드웨어 및 운영체제 지원이 필요하다. 그렇다면...
- 효율적인 락은 어떻게 만들어야 하는가?
- 어떤 하드웨어 지원 필요?
- 어떤 운영체제 지원 필요?
정교한 락 라이브러리를 제작하는 데 있어 운영체제가 관여하는 것은 무엇인지 알아보자.
락의 효율을 어떻게 평가해야 할까?
초창기 단일 프로세스 시스템에서는 상호 배제 지원을 위해 임계 영역 내에서는 인터럽트를 비활성화하는 방법을 사용했다.
void lock() {
DisableInterrupts();
}
void unlock() {
EnableInterrupts();
}
따라서 상호 배제를 위해 인터럽트를 비활성화하는 것은 제한된 범위에서만 사용되어야 한다.
Test-And-Set 기법 (원자적 교체, atomic exchange)
typedef struct __lock_t {
int flag;
} lock_t;
void init(lock_t *mutex) {
// 0 −> 락이 사용 가능함, 1 −> 락 사용 중
mutex−>flag = 0;
}
void lock(lock_t *mutex) {
while (mutex−>flag == 1) // flag 변수를 검사(TEST) 함
; // spin−wait (do nothing)
mutex−>flag = 1; // 이제 설정(SET) 한다!
}
void unlock(lock_t *mutex) {
mutex−>flag = 0;
}
lock()이 호출되었을 때, 다른 쓰레드에서 락을 사용하고 있으면(mutex의 flag가 1이면) 락을 소유한 쓰레드의 unlock() 기다린다. (spin-wait)unlock()을 호출하면, 이제 해당 쓰레드가 락을 가지게 된다.
flag=1이 될 수도 있다.int TestAndSet(int *old_ptr, int new) {
int old = *old_ptr; // old_ptr 의 이전 값을 가져옴
*old_ptr = new; // old_ptr 에 new' 의 값을 저장함
return old; // old의 값을 반환함
}
typedef struct __lock_t {
int flag;
} lock_t;
void init(lock_t *lock) {
// 0 은 락이 획득 가능한 상태를 표시, 1 은 락을 획득했음을 표시
lock−>flag = 0;
}
void lock(lock_t *lock) {
while (TestAndSet(&lock−>flag, 1) == 1)
; // 스핀(아무 일도 하지 않음)
}
void unlock(lock_t *lock) {
lock−>flag = 0;
}
TestAndSet()은 0을 반환하고 flag는 1이 되어 락을 획득한다.TestAndSet()은 1을 반환하므로 while문을 계속해서 스핀한다.스핀 락은 임의의 시간에 단 하나의 쓰레드만이 임계 영역에 진입할 수 있도록 한다. 👍
스핀 락은 어떠한 공정성도 보장해 줄 수 없다. 👎
(while 문을 회전 중인 쓰레드는 경쟁에 밀려서 계속 그 상태에 남아 있을 수 있다.)
Compare-And-Swap 기법
int CompareAndSwap(int *ptr, int expected, int new) {
int actual = *ptr;
if (actual == expected)
*ptr = new;
return actual;
}
ptr이 가리키고 있는 주소의 값이 expected 변수와 일치하는지 검사하는 것이다.ptr을 새 값으로 변경하고, 그렇지 않다면 아무 것도 하지 않는다.void lock(lock_t *lock) {
while (CompareAndSwap(&lock−>flag, 0, 1) == 1)
; // 스핀(아무 일도 하지 않음)
}MIPS 구조에서는 load-linked와 store-conditional 명령어를 앞뒤로 사용하여 락이나 기타 병행 연산을 위한 자료 구조를 만들 수 있다.
int LoadLinked(int *ptr) {
return *ptr;
}
int StoreConditional(int *ptr, int value) {
if (LoadLinked 후 ptr이 갱신되지 않은 경우) {
*ptr = value;
return 1; // 성공!
} else {
return 0; // 갱신을 실패함
}
}
LoadLinked() : 일반 로드 명령어와 같이 메모리 값을 레지스터에 저장한다.StoreConditional() : ptr 갱신이 없는 경우에만 저장을 성공하고, 성공 시 load-linked가 탑재했던 값을 갱신한다.void lock(lock_t *lock) {
while (1) {
while (LoadLinked(&lock−>flag) == 1)
; // 0이 될 때까지 스핀
if (StoreConditional(&lock−>flag, 1) == 1)
return; // 1로 변경하는 것이 성공하였다면: 완료
// 아니라면: 처음부터 다시 시도
}
}
void unlock(lock_t *lock) {
lock−>flag = 0;
}
StoreConditional()을 호출하여 락 획득을 시도하고, 성공 시 flag를 1이 되고 임계 영역 내로 진입한다.StoreConditional()에 실패하는 경우lock()을 호출하여 LoadLinked()를 실행, 이후 락이 사용 가능한 상태가 되어 0을 반환한다.StoreConditional()을 실행하기 직전에 인터럽드에 걸렸고, 다른 쓰레드가 lock() 코드를 호출하여 똑같이 LoadLinked()의 반환값으로 0을 받았다고 하자.LoadLinked 명령어를 실행하였고, 둘 다 StoreConditional를 부르려고 하는 상황이다.StoreConditional()은 LoadLinked 후 ptr이 갱신되지 않은 경우에만 값을 갱신하므로, 오직 하나의 쓰레드만 flag 값을 1로 설정함을 보장한다.StoreConditional()를 실행하는 쓰레드는 락 획득에 실패한다.Fetch-And-Add: 원자적으로 특정 주소의 예전 값을 반환하면서 값을 증가시키는 하드웨어 기법
int FetchAndAdd(int *ptr) {
int old = *ptr;
*ptr = old + 1;
return old;
}
// 티켓 락
typedef struct __lock_t {
int ticket;
int turn;
} lock_t;
void lock_init(lock_t *lock) {
lock−>ticket = 0;
lock−>turn = 0;
}
void lock(lock_t *lock) {
int myturn = FetchAndAdd(&lock−>ticket);
while (lock−>turn != myturn)
; // 회전
void unlock(lock_t *lock) {
FetchAndAdd(&lock−>turn);
}
myturn)를 나타낸다. lock->turn을 사용하여 어느 쓰레드의 차례인지 판단한다.myturn == turn) 이라는 조건에 부합하면 그 쓰레드가 임계 영역에 진입할 차례인 것이다.모든 쓰레드들이 각자의 순서에 따라 진행한다는 것이 특징이다.
즉, 쓰레드가 티켓 값을 할당받았다면, 미래의 어느 때에 실행되는 것이 보장되는 것이다.
하드웨어 기반 락은 간단하고 잘 동작하지만, 때론 비효율적이기도 하다.
핵심 질문: 회전을 피하는 방법
어떻게 하면 스핀에 CPU 시간을 낭비하지 않는 락을 만들 수 있을까?
이제부터는 운영체제로부터의 지원이 추가로 필요하다.
무조건 양보
락이 해제되기를 기다리며 스핀해야 하는 경우, 자신에개 할당된 CPU를 다른 쓰레드에게 양보하는 전략
void init() {
flag = 0;
}
void lock() {
while (TestAndSet(&flag, 1) == 1) // 만약 다른 쓰레드가 락 보유하면,
yield(); // CPU를 양보함
}
void unlock() {
flag = 0;
}
yield() 기법이 있다고 가정한다.그러나 이 방법도 한계가 있다.
lock() 호출하고 CPU 양도이전 방법들은 너무 많을 부분을 운에 맡긴다는 문제점이 있었고, 따라서 어떤 쓰레드가 다음으로 락을 획득할지를 명시적으로 제어할 수 있어야 한다.
이를 위해서는 운영체제로부터 적절한 지원과 큐를 이용하여 대기 쓰레드를 관리하는 방법에 대해 알아볼 것이다.
typedef struct __lock_t {
int flag;
int guard;
queue_t *q;
} lock_t;
void lock_init(lock_t *m) {
m−>flag = 0;
m−>guard = 0;
queue_init(m−>q);
}
void lock(lock_t *m) {
while (TestAndSet(&m−>guard, 1) == 1)
; // 회전하면서 guard 락을 획득
if (m−>flag == 0) {
m−>flag = 1; // 락을 획득함
m−>guard = 0;
} else {
queue_add(m−>q, gettid());
m−>guard = 0;
park();
}
}
void unlock(lock_t *m) {
while (TestAndSet(&m−>guard, 1) == 1)
; // 회전하면서 guard 락을 획득
if (queue_empty(m−>q))
m−>flag = 0; // 락을 포기함: 누구도 락을 원치 않음
else
unpark(queue_remove(m−>q)); // 락을 획득함 (다음 쓰레드를 위해!)
m−>guard = 0;
}
park() : 호출하는 쓰레드를 잠재우는 함수unpark(threadID) : threadID로 명시된 특정 쓰레드를 깨우는 함수guard : flag 와 큐의 삽입/삭제 동작을 스핀 락으로부터 보호하는데 사용lock()을 호출하여 락 획득을 시도하였는데 다른 쓰레드가 이미 락을 보유한 경우를 가정하자.guard 변수를 0으로 변경하고 park()를 호출하여 CPU를 양보한다 (sleep 상태).unlock()이 호출되어 큐에서 나오게 되었을 때 잠자고 있던 쓰레드를 깨운다.park() 직전에 경쟁 조건이 발생된다.setspark() 라는 코드를 추가하여 해결하였다.queue_add(m−>q, gettid());
setpark();// 추가
m−>guard=0;setspark() : park() 를 호츌하기 직전이라는 것을 표시park()가 실제로 호출되기 전에 다른 쓰레드가 unpark()를 먼저 호출한다면, 추후 park() 문은 sleep 하지 않고 바로 return이 된다.Linux: futex 지원
futex_wait(address, expected) futex_wake(address, expected)void mutex_lock (int *mutex) {
int v;
/* 31번째 비트가 이미 초기화되어 있다. mutex를 이미획득했다. 바로 리턴한다. (이것이 빠르게 실행하는 방법이다) */
if (atomic_bit_test_set(mutex, 31) == 0)
return;
atomic_increment(mutex);
while (1) {
if (atomic_bit_test_set(mutex, 31) == 0) {
atomic_decrement(mutex);
return;
}
/* 이제 대기해야 한다. 먼저, 우리가 관찰 중인 futex 값이 실제로 음수 인지 확인해야 한다(잠겨있는 상태인지). */
v = *mutex;
if (v >= 0)
continue;
futex_wait(mutex, v);
}
}
void mutex_unlock (int *mutex) {
/* 필요충분 조건으로 관심 대상의 다른 쓰레드가 없는 경우에 한해서
0x80000000를 카운터에 더하면 0을 얻는다. */
if (atomic_add_zero(mutex, 0x80000000))
return;
/* 이 mutex를 기다리는 다른 쓰레드가 있다면 그 쓰레드들을 깨운다. */
futex_wake(mutex);
2단계 락(two-phase lock)