서로 다른 쓰레드들이 동일한 자원을 사용할 때 발생하는 문제
#include <iostream>
#include <thread>
#include <vector>
void worker(int& counter) {
for (int i = 0; i < 10000; i++) {
// ❗ 문제 발생
counter += 1;
}
}
int main() {
int counter = 0;
std::vector<std::thread> workers;
for (int i = 0; i < 4; i++) {
// 레퍼런스로 전달하려면 ref 함수로 감싸야 한다 (지난 강좌 bind 함수 참조)
workers.push_back(std::thread(worker, std::ref(counter)));
}
for (int i = 0; i < 4; i++) {
workers[i].join();
}
std::cout << "Counter 최종 값 : " << counter << std::endl;
}
CPU: 컴퓨터의 모든 연산이 발생하는 두뇌
CPU의 연산: CPU의 레지스터라는 곳에 데이터를 기록한 다음에 연산을 수행해야 함.
모든 데이터들 = 메모리에 저장,
연산할 때: 메모리 -> 레지스터로 값을 가져오고 빠르게 연산을 하고, 다시 메모리에 가져다 놓는 식으로 작동
위 코드의 결과로 이상한 게 나올 수 있다.
<- counter += 1을 두번 했는데 결과는 한번만 한것과 같은 효과를 가질 수 있기 때문.
(쓰레드가 counter에 서로 쓰려고 해서 생기는 문제)
쓰레드를 어떻게 스케둘링할 지는 운영체제가 마음대로 결정하는 것이기 때문.
멀티쓰레딩을 했을 때의 문제점이다.
뮤텍스: 한번에 한 쓰레드에서만 위 코드를 실행시킬 수 있다.(경찰관 역할)
#include <iostream>
#include <mutex> // mutex 를 사용하기 위해 필요
#include <thread>
#include <vector>
void worker(int& result, std::mutex& m) {
for (int i = 0; i < 10000; i++) {
m.lock();
result += 1;
m.unlock();
}
}
int main() {
int counter = 0;
std::mutex m; // 우리의 mutex 객체
std::vector<std::thread> workers;
for (int i = 0; i < 4; i++) {
workers.push_back(std::thread(worker, std::ref(counter), std::ref(m)));
}
for (int i = 0; i < 4; i++) {
workers[i].join();
}
std::cout << "Counter 최종 값 : " << counter << std::endl;
}
mutex: 상호 배제(mutual exclusion)
m.lock() -> 뮤텍스 m을 나만 쓴다.(1번에 1 스레드에서만 m의 사용 권한)
다른 스레드에서 m.lock()을 하면 m을 소유한 스레드가 m.unlock()을 통해 m을 반환 때까지 무한정 기다림.
result += 1은 결국 한 스레드만이 유일하게 실행할 수 있음.
스레드만이 유일하게 실행할 수 있는 코드 부분
m.unlock() 코드를 지우면, 뮤텍스를 취득한 스레드가 unlock을 하지 않으므로, 다른 모든 스레드들이 기다리게 됨. 본인도 마찬가지로 m.lock()을 다시 호출, unlock을 하지 않았으므로 본인 역시 기다림.
-> 취득한 뮤텍스는 사용이 끝나면 반드시 반환을 해야 함.
std::lock_guard<std::mutex> lock(m);
lock guard 객체는 뮤텍스를 인자로 받아서 생성, 이때 생성자에서 뮤텍스를 lock하게 됨.
lock guard가 소멸될 때 알아서 lock했던 뮤텍스를 unlock하게 됨.
사용자가 따로 unlock을 신경 쓰지 않아도 되서 매우 편리.
데드락이 발생하는 조건은 다음과 같이 worker1, worker2를 실행했을 때
// worker1
std::lock_guard<std::mutex> lock1(m1);
std::lock_guard<std::mutex> lock2(m2);
// worker2
std::lock_guard<std::mutex> lock2(m2);
std::lock_guard<std::mutex> lock1(m1);
서로가 필요한 뮤텍스가 서로에게 있기 떄문에 모두 이러지도 저러지도 못함.
한 스레드에게 우선권을 주는 방법이 있다.
쓰레드로 비유하면, 한 스레드가 다른 스레드에 비해 우위를 갖게 된다면, 한 스레드만 열심히 일하고 다른 스레드는 일할 수 없는 기아 상태(starvation)가 발생할 수 있음.
void worker2(std::mutex& m1, std::mutex& m2) {
for (int i = 0; i < 10; i++) {
while (true) {
m2.lock();
// m1 이 이미 lock 되어 있다면 "야 차 빼" 를 💥스스로💥 수행하게 된다.
if (!m1.try_lock()) {
m2.unlock();
continue;
}
std::cout << "Worker2 Hi! " << i << std::endl;
m1.unlock();
m2.unlock();
break;
}
}
}
일단 m2는 아무 문제 없이 lock이 가능.
m1을 lock할 때: worker1이 m1을 lock하고 있는 경우 -> 원래는 이도저도 못했음.
try_lock: lock할 수 있으면 lock하고 true 리턴, 아니면 기다리지 않고 false 리턴
m1.try_lock()이
참고: C++ Concurrency In Action, 데드락 상황을 피하기 위해 가이드라인을 제공.
모든 스레드들이 최대 1개의 lock만을 소유하도록 하자.
유저 코드에서 Lock을 소유할 수도 있기 때문에 중첩된 lock을 얻는 걸 피하려면 lock 소유 시 유저 코드를 호출하는 것을 지양해야 함.
반드시 정해진 순서로 획득하자.
앞서도 worker1이 m1, m2 순으로, worker2가 m2, m1 순으로 lock했기 때문에 발생한 문제였다.
#include <chrono> // std::chrono::miliseconds
#include <iostream>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>
void producer(std::queue<std::string>* downloaded_pages, std::mutex* m,
int index) {
for (int i = 0; i < 5; i++) {
// 웹사이트를 다운로드 하는데 걸리는 시간이라 생각하면 된다.
// 각 쓰레드 별로 다운로드 하는데 걸리는 시간이 다르다.
std::this_thread::sleep_for(std::chrono::milliseconds(100 * index));
std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" + std::to_string(index) + ")\n";
// data 는 쓰레드 사이에서 공유되므로 critical section 에 넣어야 한다.
m->lock();
downloaded_pages->push(content);
m->unlock();
}
}
void consumer(std::queue<std::string>* downloaded_pages, std::mutex* m,
int* num_processed) {
// 전체 처리하는 페이지 개수가 5 * 5 = 25 개.
while (*num_processed < 25) {
m->lock();
// 만일 현재 다운로드한 페이지가 없다면 다시 대기.
// 다른 스레드에게 기회를 줌!
if (downloaded_pages->empty()) {
m->unlock(); // (Quiz) 여기서 unlock 을 안한다면 어떻게 될까요?
// 10 밀리초 뒤에 다시 확인한다.
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
// 맨 앞의 페이지를 읽고 대기 목록에서 제거한다.
std::string content = downloaded_pages->front();
downloaded_pages->pop();
(*num_processed)++;
m->unlock();
// content 를 처리한다.
std::cout << content;
std::this_thread::sleep_for(std::chrono::milliseconds(80));
}
}
int main() {
// 현재 다운로드한 페이지들 리스트로, 아직 처리되지 않은 것들이다.
std::queue<std::string> downloaded_pages;
std::mutex m;
std::vector<std::thread> producers;
for (int i = 0; i < 5; i++) {
producers.push_back(std::thread(producer, &downloaded_pages, &m, i + 1));
}
int num_processed = 0;
std::vector<std::thread> consumers;
for (int i = 0; i < 3; i++) {
consumers.push_back(
std::thread(consumer, &downloaded_pages, &m, &num_processed));
}
for (int i = 0; i < 5; i++) {
producers[i].join();
}
for (int i = 0; i < 3; i++) {
consumers[i].join();
}
}
producer: 인터넷 페이지를 다운로드 받음(큐에 집어넣음)
consumer: 다운로드받은 인터넷 페이지를 처리함
여기에서 consumer 스레드가 10ms마다 downloaded_pages에 할 일이 있는 지 확인하고 없으면 다시 기다리는 형태를 취하고 있음. -> 비효율적임!
-> producer에서 데이터가 뜸하게 오는 것을 알면 그냥 consumer는 아예 재워놓고, producer에서 데이터가 오면 consumer를 깨우는 방식이 나을 수 있음. 쓰레드를 재워놓으면 다른 쓰레드들이 일을 할 수 있기 때문(CPU 효율적으로 사용)
downloaded_pages 가 empty() 가 참이 아닐 때 까지 자라 라는 명령을 내릴 수 있음.