condition variable을 사용하면, 스레드는 conditional signal이 올 때까지 sleep queue에 들어가서 sleep 상태가 된다. 만약 conditional signal이 오면 해당 스레드는 wake up 상태가 돼서 그 다음 자기 할 일을 하게 된다. 그니깐 다른 스레드가 너 차례야~~라고 알려주는 lock이라고 생각하면 편하다.
wait 상태에 들어가게 되면, 먼저 자기 lock을 release한 후에, sleep queue에 들어가게 된다. sleep queue에 들어간 후 signal이 오면 다시 lock을 잡은 후 cond_wait을 빠져나오게 된다.
wait 상태에 있는 스레드들에게 signal을 보내는 함수이다. 그럼 sleep queue에 있는 스레드 중 하나가 wake up 하게 된다.
int done = 0;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t c = PTHREAD_COND_INITIALIZER;
void thr_exit(){
pthread_mutex_lock(&m);
done = 1;
pthread_cond_signal(&c);
pthread_mutex_unlock(&m);
}
void *child(void *arg){
printf("child\n");
thr_exit();
return NULL;
}
void thr_join(){
pthread_mutex_lock(&m);
while(done == 0){
pthread_cond_wait(&c, &m);
}
pthread_mutex_unlock(&m);
}
int main(int argc, char *argv[]){
printf("parent : begin\n");
pthraed_t p;
pthread_create(&p, NULL, child, NULL);
thr_join();
printf("parent : end\n");
return 0;
}
그런데 code를 보면 도대체 while문이 있는지 궁금할 수도 있음. 아니 애초에 애초에 done이 왜 있는 지 궁금할 수도 있음. thr_join 호출할 때면 당연히 child가 죽기 전이니깐 done ==0인게 당연한 게 아닌가?!?!!
done이 왜 존재하냐면 놀랍게도 thr_join 전에 child가 죽을 수도 있기 때문이다.
done이 없는데 thr_join 전에 child가 죽는다면 parent thread는 cond_wait으로 잠을 자게 되는데, 깨워줄 스레드가 없음 ㅠㅠㅠㅠ
따라서 done==0인지 확인을 한 다음에, 제워야지 그냥 막 제우면 큰일남!!
void thr_exit(){
done = 1;
pthread_cond_signal(&1);
}
void thr_join(){
if(done==0){
pthread_cond_wait(&c);
}
}
위 같은 코드에서 if문 직후, 그니깐 cond_wait 전에 interrupt가 걸렸다고 해보자
그럼 child process가 실행이 되고, child는 thr_exit 할 때 cond_signal을 보내는데 일어날 스레드가 없으니 그대로 종료된다.
그리고 다시 parent로 돌아오면 parent는 이제서야 잠 들게되고 이때 깨워줄 child thread는 없게 된다 ㅠㅠ
void thr_exit(){
pthread_mutex_lock(&m);
done = 1;
pthread_cond_signal(&c);
pthread_mutex_unlock(&m);
}
void thr_join(){
pthread_mutex_lock(&m);
while(done == 0){
pthread_cond_wait(&c, &m);
}
pthread_mutex_unlock(&m);
}
따라서 위와 같이 앞 뒤로 감싸줘서 join에서 lock을 걸어준 뒤, parent process를 잠 재우고 lock을 release 해야만 thr_exit()이 lock을 잡아 signal을 보낼 수 있도록 한다.
즉 lock을 걸어서 child가 signal을 보내기 전에, parent가 잠들 수 있도록 한다.
이때 producer와 consumer가 공유하는 buffer를 bounded buffer라고 한다. 이때 producer가 버퍼에 작성하기 전에 consumer가 버퍼에서 데이터를 가져와서도 안되고, consumer가 데이터를 전부 다 소비하기 전에 버퍼에 작성해서도 안된다.
따라서 bounded buffer는 critical section이 된다.
int buffer;
int count = 0;
void put(int value){
assert(count == 0);
count = 1;
buffer = value;
}
int get(){
assert(count==1);
count = 0;
return buffer;
}
cond_t cond;
mutex_t mutex;
void *producer(void *arg){
int i;
int loops = (int)args;
for(i=0; i<loops; i++){
pthread_mutex_lock(&mutex);
if(count==1){
pthread_cond_wait(&cond, &mutex);
}
put(i);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
}
void *consumer(void *arg){
int i;
for(i=0; i<loops; i++){
pthread_mutex_lock(&lock);
if(count==0){
pthread_cond_wait(&cond, &mutex);
}
int tmp = get();
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
}
이렇게 cond_signal을 하면 cond_signal로 깨우는 thread가 정확히 어떤 스레드인지 알 수 없어서, consumer가 consumer를 깨우는 상황 발생!!
그럼 consumer는 consume할 데이터가 없게 된다 ㅠㅠ
따라서 깨어난 다음 바로 다음 execution으로 넘어가지 않고 다시 한번 버퍼가 쌓여 있는지 확인을 하면 되고 그러면 cond_wait의 if문을 while문으로 바꾸면 된다
cond_t cond;
mutex_t mutex;
void *producer(void *arg){
int i;
int loops = (int)args;
for(i=0; i<loops; i++){
pthread_mutex_lock(&mutex);
while(count==1){
pthread_cond_wait(&cond, &mutex);
}
put(i);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
}
void *consumer(void *arg){
int i;
for(i=0; i<loops; i++){
pthread_mutex_lock(&lock);
while(count==0){
pthread_cond_wait(&cond, &mutex);
}
int tmp = get();
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
}
스레드가 wake up 된 다음에 자기가 정말 일할 상황이 맞는 지 한번 더 확인하도록 while문으로 바꾼 게 version2이다!
이렇게 하면 producer가 producer를 깨워도 count==1이니깐, 즉 버퍼가 이미 가득 차 있기에 다시 wait를 하게 된다. 마찬가지로 consumer가 consumer를 깨워도 count==0이므로 소비할 버퍼 데이터가 없으므로 다시 sleep 상태에 들어가게 된다.
하지만 여전히 문제가 있다,,
while문으로 consumer가 consumer를 깨울 때 consume하지 않도록 고치긴 했지만 여전히 잠에 빠진다는 것이다.
즉, producer는 sleep 상태에서 consumer가 consumer를 깨우면 consumer는 consume할 것이 없기에 잠을 자게 된다....
즉 모든 thread가 sleep 상태에 빠지게 되는 것이다!!
위의 문제를 해결하기 위해서는 produce를 한 후에는 consumer를 깨우고, consume을 한 후에는 producer를 깨워야 한다.
따라서 producer는 empty condition을 기다리고, 자기가 한 일을 마친 후에는 fill signal을 보낸다.
그리고 consumer는 fill conditon을 기다리고, 소비를 한 후에는 fill signal을 보낸다.
cond_t empty, fill;
mutex_t mutex;
void *producer(void *arg){
int i;
for(i=0; i<loops; i++){
pthread_mutex_lock(&mutex);
while(count==1){
pthread_cond_wait(&empty, &mutex);
}
put(i);
pthread_cond_signal(&fill);
pthread_mutex_unlock(&mutex);
}
}
void *consumer(void *arg){
int i;
for(i=0; i<loops; i++){
pthread_mutex_lock(&mutex);
while(count==0){
pthread_cond_wait(&fill, &mutex);
}
int tmp = get();
pthread_cond_signal(&empty);
pthread_mutex_unlock(&mutex);
}
}
ver3.은 알고리즘만 놓고 보면 완벽하긴 하지만 보다 concurrency와 efficiency를 주기 위해 buffer slot을 더 두자!
int buffer[MAX];
int fill = 0;
int use = 0;
int count = 0;
void put(int value){
buffer[fill] = value;
fill = (fill + 1) % MAX;
count++;
}
int get(){
int tmp = buffer[use];
use = (use + 1) % MAX;
count--;
return tmp;
}
cond_t empty, fill;
mutex_t mutex;
void *producer(void *arg){
int i;
for(i=0; i<loops; i++){
pthread_mutex_lock(&mutex);
while(count == MAX){
pthread_cond_wait(&empty, &mutex);
}
put(i);
pthread_cond_signal(&fill);
pthread_mutex_unlock(&mutex);
}
}
void *consumer(void *arg){
int i;
for(i=0; i<loops; i++){
pthread_mutex_lock(&mutex);
while(count==0){
pthread_cond_wait(&fill, &mutex);
}
int tmp = get();
pthread_cond_signal(&empty);
pthread_mutex_unlock(&mutex);
}
}
위와 같이 하게 되면 producer는 MAX만큼 계속 produce하고 consumer는 0이 될때까지 계속 consume하게 된다. 그러다가 producer가 max가 되면 sleep queue에 들어가서 더 이상 produce를 안하게 되고, consumer가 consume을 한 후 empty signal을 보내면 다시 produce를 하게 된다.
consumer도 마찬가지로 열심히 소비를 하다가 count가 0이 되면 sleep 상태에 들어갔다가 producer가 produce 후 fill signal을 보내게 되면 다시 소비를 하게 된다.
이렇게 하게 되면 count가 MAX가 되기 전까지 producer는 sleep 상태에 빠지지 않고 계속 생산을 하고, consumer도 마찬가지로 count가 0이 될때까지 계속 생산을 하게 되므로, sleep 상태에 빠지게 되는 횟수가 줄어들어 context switching overhead를 줄일 수 있다.