[Modern C++] 15.2. 뮤텍스(mutex)와 조건 변수(condition variable)

윤정민·2023년 8월 2일
0

C++

목록 보기
37/46

1. 경쟁상태(Race Condition)

  • 서로 다른 쓰레드에서 같은 자원을 사용할 때 발생하는 문제

  • 경쟁상태가 발생하는 예제 코드(counter+=1 부분)

    #include <iostream>
    #include <thread>
    #include <vector>
    
    void worker(int& counter) {
      for (int i = 0; i < 10000; i++) {
        counter += 1;
      }
    }
    
    int main() {
      int counter = 0;
    
      std::vector<std::thread> workers;
      for (int i = 0; i < 4; i++) {
        // 레퍼런스로 전달하려면 ref 함수로 감싸야 한다 (지난 강좌 bind 함수 참조)
        workers.push_back(std::thread(worker, std::ref(counter)));
      }
    
      for (int i = 0; i < 4; i++) {
        workers[i].join();
      }
    
      std::cout << "Counter 최종 값 : " << counter << std::endl;
    }
  • 출력값(40000이 아니다!)
    Counter 최종 값 : 26459

1.1. CPU의 연산 처리 과정

  • cpu: 컴퓨터의 모든 연산이 발생하는 두뇌
    • 레지스터(register)에 데이터를 기록한 뒤 연산을 수행
    • 하지만 레지스터의 크기는 매우 작음
    • 따라서 모든 데이터들은 메모리(ram)에 저장되어 있고, 연산 시 메모리에서 레지스터로 값을 가져온 뒤, 빠르게 연신한 뒤 다시 메모리에 가져놓는 방식으로 작동

1.2. counter += 1의 컴파일 코드

  • 어셈블리 코드로 컴파일 된 결과
    mov rax, qword ptr [rbp - 8] //[rbp-8]이 rax에 대입됨
    mov ecx, dword ptr [rax] //rax에는 result의 주소값이 담겨 있으니 ecx에 result의 현재 값이 들어가게 됨
    add ecx, 1 //ecx에 1을 더함
    mov dword ptr [rax], ecx // ecx의 값을 다시 rax 즉, result에 넣어줌
  • 문제가 발생 가능한 상황

    쓰레드1이 2번째 라인까지 실행된 뒤 쓰레드2가 실행된다면, 쓰레드1의 ecx은 0이었으니 counter는 쓰레드 1과 2를 한 번씩 실행했음에도 counter의 값이 2가 되는 문제가 발생 함

1.3. 결론

  • 멀티 쓰레드의 경우 프로그램 실행 마다 결과가 달라질 수 있기 때문에 프로그램을 제대로 만들지 않는다면 디버깅이 겁나 어려워짐

2. 뮤텍스(mutex)

  • 모든 스레드가 접근하는 자원을 하나의 스레드가 먼저 사용할 수 있도록 독점하는 객체
  • 자물쇠로 생각하면 편함
  • 예제 코드
#include <iostream>
#include <mutex>  // mutex 를 사용하기 위해 필요
#include <thread>
#include <vector>

void worker(int& result, std::mutex& m) {
  for (int i = 0; i < 10000; i++) {
    m.lock();
    result += 1;
    m.unlock();
  }
}

int main() {
  int counter = 0;
  std::mutex m;  // 우리의 mutex 객체

  std::vector<std::thread> workers;
  for (int i = 0; i < 4; i++) {
    workers.push_back(std::thread(worker, std::ref(counter), std::ref(m)));
  }

  for (int i = 0; i < 4; i++) {
    workers[i].join();
  }

  std::cout << "Counter 최종 값 : " << counter << std::endl;
}
  • 실행결과
    Counter 최종 값 : 40000
  • m.lock()
    • m의 사용권한을 가진 쓰레드가 없다면 사용 권한을 가짐
    • 이미 다른 쓰레드가 사용 권한을 가지고 있다면 그 쓰레드가 m을 반환할 때 까지 무한정 기다리게 됨
  • 따라서 result+=1m을 가진 쓰레드만 수행 가능

2.1. 데드락(deadlock)

  • 만약 뮤텍스를 취득한 쓰레드가 unlock을 하지 않으면, 다른 모든 쓰레드들이 기다리게 되고 본인 또한 m.lock()을 호출하니 기다리게 됨
  • 결국 아무 쓰레드도 연산을 진행하지 못하게 되는 상황이 됨

2.2. 소멸자에서 unlock()처리

  • 데드락과 같은 문제는 unique_ptr에서도 마주한 적 있음

    • 메모리를 할당 했으면 사용후 반드시 해제해야함
      • 이 과정을 unique_ptr의 소멸자에서 처리해줌
  • 뮤텍스도 마찬가지로 사용 후 해제 패턴을 따르기 때문에 동일하게 소멸자에서 처리 가능

    • 예제 코드

      #include <iostream>
      #include <mutex>  // mutex 를 사용하기 위해 필요
      #include <thread>
      #include <vector>
      
      void worker(int& result, std::mutex& m) {
        for (int i = 0; i < 10000; i++) {
          // lock 생성 시에 m.lock() 을 실행한다고 보면 된다.
          std::lock_guard<std::mutex> lock(m);
          result += 1;
      
          // scope 를 빠져 나가면 lock 이 소멸되면서
          // m 을 알아서 unlock 한다.
        }
      }
      
      int main() {
        int counter = 0;
        std::mutex m;  // 우리의 mutex 객체
      
        std::vector<std::thread> workers;
        for (int i = 0; i < 4; i++) {
          workers.push_back(std::thread(worker, std::ref(counter), std::ref(m)));
        }
      
        for (int i = 0; i < 4; i++) {
          workers[i].join();
        }
      
        std::cout << "Counter 최종 값 : " << counter << std::endl;
      }
    • std::lock_guard<std::mutex> lock(m);

      • lock_guard객체는 뮤텍스를 인자로 받아서 생성하게 되는데, 이때 생성자에서 뮤텍스를 lock하게 됨
      • lock_guard가 소멸될 때 알아서 lock했던 뮤텍스를 unlock하게 됨

2.3. 데드락이 가능한 상황의 해결법

  • 한 쓰레드에게 우선권을 줌
    • 한쓰레드가 기다리지 않고 실행할 수 있도록 다른 쓰레드들이 소유중인 뮤텍스를 unlock()
      • try_lock()을 사용해 false라면 지금 소유중인 뮤텍스를 unlock()
    • 기아상태가 발생하지만 데드락 보단 나음

2.4. 데드락 방지법

  • 중첩된 lock 사용을 지양
  • lock 소유중 유저 코드 호출을 지양
  • lock들을 언제나 정해진 순서로 획득할 것
    • worker1이 m1, m2순으로 worker2가 m2, m1으로 lock을 한다면 서로 하나씩 가지고 있으니 데드락이 발생하기 때문

3. 생성자와 소비자 패턴

  • 생성자: 무언가 처리할 일을 받아오는 쓰레드
  • 소비자: 받은 일을 처리하는 쓰레드
  • 에제 코드
#include <chrono>  // std::chrono::miliseconds
#include <iostream>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>

void producer(std::queue<std::string>* downloaded_pages, std::mutex* m,
              int index) {
  for (int i = 0; i < 5; i++) {
    // 웹사이트를 다운로드 하는데 걸리는 시간이라 생각하면 된다.
    // 각 쓰레드 별로 다운로드 하는데 걸리는 시간이 다르다.
    std::this_thread::sleep_for(std::chrono::milliseconds(100 * index));
    std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" +
                          std::to_string(index) + ")\n";

    // data 는 쓰레드 사이에서 공유되므로 critical section 에 넣어야 한다.
    m->lock();
    downloaded_pages->push(content);
    m->unlock();
  }
}

void consumer(std::queue<std::string>* downloaded_pages, std::mutex* m,
              int* num_processed) {
  // 전체 처리하는 페이지 개수가 5 * 5 = 25 개.
  while (*num_processed < 25) {
    m->lock();
    // 만일 현재 다운로드한 페이지가 없다면 다시 대기.
    if (downloaded_pages->empty()) {
      m->unlock();  // (Quiz) 여기서 unlock 을 안한다면 어떻게 될까요?

      // 10 밀리초 뒤에 다시 확인한다.
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
      continue;
    }

    // 맨 앞의 페이지를 읽고 대기 목록에서 제거한다.
    std::string content = downloaded_pages->front();
    downloaded_pages->pop();

    (*num_processed)++;
    m->unlock();

    // content 를 처리한다.
    std::cout << content;
    std::this_thread::sleep_for(std::chrono::milliseconds(80));
  }
}

int main() {
  // 현재 다운로드한 페이지들 리스트로, 아직 처리되지 않은 것들이다.
  std::queue<std::string> downloaded_pages;
  std::mutex m;

  std::vector<std::thread> producers;
  for (int i = 0; i < 5; i++) {
    producers.push_back(std::thread(producer, &downloaded_pages, &m, i + 1));
  }

  int num_processed = 0;
  std::vector<std::thread> consumers;
  for (int i = 0; i < 3; i++) {
    consumers.push_back(
        std::thread(consumer, &downloaded_pages, &m, &num_processed));
  }

  for (int i = 0; i < 5; i++) {
    producers[i].join();
  }
  for (int i = 0; i < 3; i++) {
    consumers[i].join();
  }
}

  • std::queue<std::string> downloaded_pages;
    • producer 쓰레드에서는 웹사이트에서 페이지를 계속 다운로드 하는 역할을 수행
    • 이 때, 다운로드한 페이지들을 downloaded_pages라는 큐에 저장
  • content처리과정
    • 큐를 pop해 하나의 쓰레드에서 처리
    • 그 뒤 m->unlock을 수행해 다음 쓰레드가 content를 처리 가능하도록 만듦
    • 큐가 비어있다면 10ms 뒤에 다시 확인함
      • Consumer가 계속 할거 있는지 확인하는게 비효율적임(좀 덜떨어져보임)
    • Consumer를 재워뒀다가 일이 오면 깨워서 일을 시키자
profile
그냥 하자

0개의 댓글