C++ 뮤텍스(mutex)와 조건변수

은수·2022년 7월 7일

cpp study

목록 보기
19/21

Race condition

  • 서로 다른 쓰레드에서 동일한 자원(ex.같은 메모리)을 사용/공유할 때 발생하는 문제
#include <iostream>
#include <thread>
#include <vector>

void worker(int& counter) {
  for (int i = 0; i < 10000; i++) {
  
    // 문제 발생 지점!_)!
    counter += 1;
  }
}

int main() {
  int counter = 0;

  std::vector<std::thread> workers;
  for (int i = 0; i < 4; i++) {
    // 레퍼런스로 전달하려면 ref 함수로 감싸야 한다 (지난 강좌 bind 함수 참조)
    workers.push_back(std::thread(worker, std::ref(counter)));
  }

  for (int i = 0; i < 4; i++) {
    workers[i].join();
  }

  std::cout << "Counter 최종 값 : " << counter << std::endl;
}

CPU 간단 소개

CPU는 컴퓨터의 모든 연산이 발생하는 두뇌.
CPU 에서 연산을 수행하기 위해서는, CPU 의 레지스터(register) 라는 곳에 데이터를 기록한 다음에 연산을 수행해야함.

즉, 모든 데이터들은 메모리에 저장되어 있고, 연산 할 때 할 때 마다 메모리에서 레지스터로 값을 가져온 뒤에, 빠르게 연산을 하고, 다시 메모리에 가져다 놓는 식으로 작동


각 쓰레드는 메모리를 공유할 지언정, 레지스터를 공유하지는 않기 때문에 counter += 1에서 문제 발생.

뮤텍스 (mutex)

뮤텍스를 통해 한 번에 한 쓰레드에서만 counter+=1; 코드를 실행시킬 수 있도록 만들어 위의 문제 해결 가능

#include <iostream>
#include <mutex>  // mutex 를 사용하기 위해 필요
#include <thread>
#include <vector>

void worker(int& result, std::mutex& m) {
  for (int i = 0; i < 10000; i++) {
    m.lock();
    result += 1;
    m.unlock();
  }
}

int main() {
  int counter = 0;
  std::mutex m;  // 우리의 mutex 객체

  std::vector<std::thread> workers;
  for (int i = 0; i < 4; i++) {
    workers.push_back(std::thread(worker, std::ref(counter), std::ref(m)));
  }

  for (int i = 0; i < 4; i++) {
    workers[i].join();
  }

  std::cout << "Counter 최종 값 : " << counter << std::endl;
}

뮤텍스
한 번에 한 쓰레드에서만 코드를 실행시킬 수 있도록 도와주는 객체

m.lock();
result += 1;
m.unlock();

m.lock () : 뮤텍스 m을 내가 쓰게 달라! 단, 한 번에 한 쓰레드에서만 m의 사용 권한을 가짐.

  • 따라서, m을 소유한 쓰레드가 m.unlock()을 통해 m을 반환할 때까지 기다려야 함.
  • 만약, m.unlock()을 하지 않는다면 프로그램이 끝나지 않아 강제종료 해야 함.
  • 따라서, 반드시 취득한 뮤텍스는 사용이 끝나면 반환해야 함.

임계 영역(critical section)
m.lock()과 m.unlock() 사이에 한 쓰레드만이 유일하게 실행할 수 있는 코드 부분

데드락(deadlock)
unlock하지 않아 결국 아무 쓰레드도 연산을 진행하지 못하는 상태

lock_guard 객체

std::lock_guard<std::mutex> lock(m);

lock_guard 객체는 뮤텍스를 인자로 받아서 생성하게 되는데, 이 때 생성자에서 뮤텍스를 lock 하게 됨.
그리고 lock_guard 가 소멸될 때 알아서 lock 했던 뮤텍스를 unlock 하게 됨.

-> 따라서 사용자가 따로 unlock 을 신경쓰지 않아도 되서 매우 편리


데드락 (deadlock)

// worker1()
// m1을 먼저 lock한 후, m2를 lock
std::lock_guard<std::mutex> lock1(m1);
std::lock_guard<std::mutex> lock2(m2);

// worker2()
// m2를 먼저 lock한 후, m1을 lock
std::lock_guard<std::mutex> lock2(m2);
std::lock_guard<std::mutex> lock1(m1);

worker1 에서 m2 를 lock 하기 위해서는 worker2 에서 m2 를 unlock 해야 됨.
하지만 그러기 위해서는 worker2 에서 m1 을 lock 해야 함. 그런데 이 역시 불가능. 왜냐하면 worker1 에 m1 을 lock 하고 있기 때문

= 데드락 상태.

데드락 해결 방법 ?

한 쓰레드가 다른 쓰레드에 비해 우위를 가질 수 있도록 만들어주는 방법을 사용해 볼 수 있음.

while (true) {
  m2.lock();

  // m1 이 이미 lock 되어 있다면 "야 차 빼" 를 수행하게 된다.
  if (!m1.try_lock()) {
    m2.unlock();
    continue;
  }

  std::cout << "Worker2 Hi! " << i << std::endl;
  m1.unlock();
  m2.unlock();
  break;
}

try_lock()을 통해 m1을 lock할 수 있는 상태라면 true 리턴하여 m1을 lock시킴

  • but, 이런 경우 한 쓰레드만 일하고 다른 쓰레드는 일할 수 없는 기아 상태(starvation) 발생 가능.

데드락 상황을 피하기 위한 가이드라인

C++ Concurrency In Action에서 제시하는 가이드라인.

  1. 중첩된 lock을 사용하는 것을 피해라
  2. Lock을 소유하고 있을 때 유저 코드를 호출하는 것을 피해라
  3. Lock들을 언제나 정해진 순서로 획득해라

생산자(Producer)와 소비자(Consumer) 패턴

생산자(producer)-소비자(consumer) 패턴

  • 멀티 쓰레드 프로그램에서 많이 등장하는 개념!

생산자
: 무언가 처리할 일을 받아오는 쓰레드
ex. 인터넷에서 페이지를 긁어서 분석하는 프로그램을 만들었을 때, '페이지를 긁어 오는 쓰레드'가 해당

소비자
: 받은 일을 처리하는 쓰레드
ex. 긁어온 페이지를 분석하는 쓰레드

#include <chrono>  // std::chrono::miliseconds
#include <iostream>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>

void producer(std::queue<std::string>* downloaded_pages, std::mutex* m,
              int index) {
  for (int i = 0; i < 5; i++) {
    // 웹사이트를 다운로드 하는데 걸리는 시간이라 생각하면 된다.
    // 각 쓰레드 별로 다운로드 하는데 걸리는 시간이 다르다.
    std::this_thread::sleep_for(std::chrono::milliseconds(100 * index));
    std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" +
                          std::to_string(index) + ")\n";

    // data 는 쓰레드 사이에서 공유되므로 critical section 에 넣어야 한다.
    m->lock();
    downloaded_pages->push(content);
    m->unlock();
  }
}

void consumer(std::queue<std::string>* downloaded_pages, std::mutex* m,
              int* num_processed) {
  // 전체 처리하는 페이지 개수가 5 * 5 = 25 개.
  while (*num_processed < 25) {
    m->lock();
    // 만일 현재 다운로드한 페이지가 없다면 다시 대기.
    if (downloaded_pages->empty()) {
      m->unlock();  // (Quiz) 여기서 unlock 을 안한다면 어떻게 될까요?

      // 10 밀리초 뒤에 다시 확인한다.
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
      continue;
    }

    // 맨 앞의 페이지를 읽고 대기 목록에서 제거한다.
    std::string content = downloaded_pages->front();
    downloaded_pages->pop();

    (*num_processed)++;
    m->unlock();

    // content 를 처리한다.
    std::cout << content;
    std::this_thread::sleep_for(std::chrono::milliseconds(80));
  }
}

int main() {
  // 현재 다운로드한 페이지들 리스트로, 아직 처리되지 않은 것들이다.
  std::queue<std::string> downloaded_pages;
  std::mutex m;

  std::vector<std::thread> producers;
  for (int i = 0; i < 5; i++) {
    producers.push_back(std::thread(producer, &downloaded_pages, &m, i + 1));
  }

  int num_processed = 0;
  std::vector<std::thread> consumers;
  for (int i = 0; i < 3; i++) {
    consumers.push_back(
        std::thread(consumer, &downloaded_pages, &m, &num_processed));
  }

  for (int i = 0; i < 5; i++) {
    producers[i].join();
  }
  for (int i = 0; i < 3; i++) {
    consumers[i].join();
  }
}

위의 코드는 consumer 쓰레드가 10 밀리초 마다 downloaded_pages 에 할일이 있는지 확인하고 없으면 다시 기다리는 형태임.
즉, 매 번 언제 올지 모르는 데이터를 확인하기 위해 지속적으로 mutex 를 lock 하고, 큐를 확인해야 하기 때문에 비효율적임.


조건 변수 (condition_variable)

조건 변수를 통해 생산자-소비자 패턴을 효율적으로 구현할 수 있음.

위 코드의 비효율을 해결하기 위해 조건 변수를 사용 -> downloaded_pages 가 empty() 가 참이 아닐 때 까지 자라 라는 명령을 내릴 수 있음.

#include <chrono>              // std::chrono::miliseconds
#include <condition_variable>  // std::condition_variable
#include <iostream>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>

void producer(std::queue<std::string>* downloaded_pages, std::mutex* m,
              int index, std::condition_variable* cv) {
  for (int i = 0; i < 5; i++) {
    // 웹사이트를 다운로드 하는데 걸리는 시간이라 생각하면 된다.
    // 각 쓰레드 별로 다운로드 하는데 걸리는 시간이 다르다.
    std::this_thread::sleep_for(std::chrono::milliseconds(100 * index));
    std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" +
                          std::to_string(index) + ")\n";

    // data 는 쓰레드 사이에서 공유되므로 critical section 에 넣어야 한다.
    m->lock();
    downloaded_pages->push(content);
    m->unlock();

    // consumer 에게 content 가 준비되었음을 알린다.
    cv->notify_one();
  }
}

void consumer(std::queue<std::string>* downloaded_pages, std::mutex* m,
              int* num_processed, std::condition_variable* cv) {
  while (*num_processed < 25) {
    std::unique_lock<std::mutex> lk(*m);

    cv->wait(
        lk, [&] { return !downloaded_pages->empty() || *num_processed == 25; });

    if (*num_processed == 25) {
      lk.unlock();
      return;
    }

    // 맨 앞의 페이지를 읽고 대기 목록에서 제거한다.
    std::string content = downloaded_pages->front();
    downloaded_pages->pop();

    (*num_processed)++;
    lk.unlock();

    // content 를 처리한다.
    std::cout << content;
    std::this_thread::sleep_for(std::chrono::milliseconds(80));
  }
}

int main() {
  // 현재 다운로드한 페이지들 리스트로, 아직 처리되지 않은 것들이다.
  std::queue<std::string> downloaded_pages;
  std::mutex m;
  std::condition_variable cv;

  std::vector<std::thread> producers;
  for (int i = 0; i < 5; i++) {
    producers.push_back(
        std::thread(producer, &downloaded_pages, &m, i + 1, &cv));
  }

  int num_processed = 0;
  std::vector<std::thread> consumers;
  for (int i = 0; i < 3; i++) {
    consumers.push_back(
        std::thread(consumer, &downloaded_pages, &m, &num_processed, &cv));
  }

  for (int i = 0; i < 5; i++) {
    producers[i].join();
  }

  // 나머지 자고 있는 쓰레드들을 모두 깨운다.
  cv.notify_all();

  for (int i = 0; i < 3; i++) {
    consumers[i].join();
  }
}

std::condition_variable
condition_variable는 다른 쓰레드가 공유 변수를 수정하고 condition_variable로 통지할 때까지 여러 쓰레드를 대기하도록 하는 데 사용 할 수 있는 동기화 기본 기법임. 뮤텍스와 연동되어서 동작해서 쓰레드에 안전하게 동작함.

0개의 댓글