#include <iostream>
#include <thread>
#include <vector>
void worker(int& counter) {
for (int i = 0; i < 10000; i++) {
// 문제 발생 지점!_)!
counter += 1;
}
}
int main() {
int counter = 0;
std::vector<std::thread> workers;
for (int i = 0; i < 4; i++) {
// 레퍼런스로 전달하려면 ref 함수로 감싸야 한다 (지난 강좌 bind 함수 참조)
workers.push_back(std::thread(worker, std::ref(counter)));
}
for (int i = 0; i < 4; i++) {
workers[i].join();
}
std::cout << "Counter 최종 값 : " << counter << std::endl;
}
CPU는 컴퓨터의 모든 연산이 발생하는 두뇌.
CPU 에서 연산을 수행하기 위해서는, CPU 의 레지스터(register) 라는 곳에 데이터를 기록한 다음에 연산을 수행해야함.
즉, 모든 데이터들은 메모리에 저장되어 있고, 연산 할 때 할 때 마다 메모리에서 레지스터로 값을 가져온 뒤에, 빠르게 연산을 하고, 다시 메모리에 가져다 놓는 식으로 작동
각 쓰레드는 메모리를 공유할 지언정, 레지스터를 공유하지는 않기 때문에 counter += 1에서 문제 발생.
뮤텍스를 통해 한 번에 한 쓰레드에서만 counter+=1; 코드를 실행시킬 수 있도록 만들어 위의 문제 해결 가능
#include <iostream>
#include <mutex> // mutex 를 사용하기 위해 필요
#include <thread>
#include <vector>
void worker(int& result, std::mutex& m) {
for (int i = 0; i < 10000; i++) {
m.lock();
result += 1;
m.unlock();
}
}
int main() {
int counter = 0;
std::mutex m; // 우리의 mutex 객체
std::vector<std::thread> workers;
for (int i = 0; i < 4; i++) {
workers.push_back(std::thread(worker, std::ref(counter), std::ref(m)));
}
for (int i = 0; i < 4; i++) {
workers[i].join();
}
std::cout << "Counter 최종 값 : " << counter << std::endl;
}
뮤텍스
한 번에 한 쓰레드에서만 코드를 실행시킬 수 있도록 도와주는 객체
m.lock();
result += 1;
m.unlock();
m.lock () : 뮤텍스 m을 내가 쓰게 달라! 단, 한 번에 한 쓰레드에서만 m의 사용 권한을 가짐.
임계 영역(critical section)
m.lock()과 m.unlock() 사이에 한 쓰레드만이 유일하게 실행할 수 있는 코드 부분
데드락(deadlock)
unlock하지 않아 결국 아무 쓰레드도 연산을 진행하지 못하는 상태
std::lock_guard<std::mutex> lock(m);
lock_guard 객체는 뮤텍스를 인자로 받아서 생성하게 되는데, 이 때 생성자에서 뮤텍스를 lock 하게 됨.
그리고 lock_guard 가 소멸될 때 알아서 lock 했던 뮤텍스를 unlock 하게 됨.
-> 따라서 사용자가 따로 unlock 을 신경쓰지 않아도 되서 매우 편리
// worker1()
// m1을 먼저 lock한 후, m2를 lock
std::lock_guard<std::mutex> lock1(m1);
std::lock_guard<std::mutex> lock2(m2);
// worker2()
// m2를 먼저 lock한 후, m1을 lock
std::lock_guard<std::mutex> lock2(m2);
std::lock_guard<std::mutex> lock1(m1);
worker1 에서 m2 를 lock 하기 위해서는 worker2 에서 m2 를 unlock 해야 됨.
하지만 그러기 위해서는 worker2 에서 m1 을 lock 해야 함. 그런데 이 역시 불가능. 왜냐하면 worker1 에 m1 을 lock 하고 있기 때문
= 데드락 상태.
한 쓰레드가 다른 쓰레드에 비해 우위를 가질 수 있도록 만들어주는 방법을 사용해 볼 수 있음.
while (true) {
m2.lock();
// m1 이 이미 lock 되어 있다면 "야 차 빼" 를 수행하게 된다.
if (!m1.try_lock()) {
m2.unlock();
continue;
}
std::cout << "Worker2 Hi! " << i << std::endl;
m1.unlock();
m2.unlock();
break;
}
try_lock()을 통해 m1을 lock할 수 있는 상태라면 true 리턴하여 m1을 lock시킴
C++ Concurrency In Action에서 제시하는 가이드라인.
생산자(producer)-소비자(consumer) 패턴
생산자
: 무언가 처리할 일을 받아오는 쓰레드
ex. 인터넷에서 페이지를 긁어서 분석하는 프로그램을 만들었을 때, '페이지를 긁어 오는 쓰레드'가 해당
소비자
: 받은 일을 처리하는 쓰레드
ex. 긁어온 페이지를 분석하는 쓰레드
#include <chrono> // std::chrono::miliseconds
#include <iostream>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>
void producer(std::queue<std::string>* downloaded_pages, std::mutex* m,
int index) {
for (int i = 0; i < 5; i++) {
// 웹사이트를 다운로드 하는데 걸리는 시간이라 생각하면 된다.
// 각 쓰레드 별로 다운로드 하는데 걸리는 시간이 다르다.
std::this_thread::sleep_for(std::chrono::milliseconds(100 * index));
std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" +
std::to_string(index) + ")\n";
// data 는 쓰레드 사이에서 공유되므로 critical section 에 넣어야 한다.
m->lock();
downloaded_pages->push(content);
m->unlock();
}
}
void consumer(std::queue<std::string>* downloaded_pages, std::mutex* m,
int* num_processed) {
// 전체 처리하는 페이지 개수가 5 * 5 = 25 개.
while (*num_processed < 25) {
m->lock();
// 만일 현재 다운로드한 페이지가 없다면 다시 대기.
if (downloaded_pages->empty()) {
m->unlock(); // (Quiz) 여기서 unlock 을 안한다면 어떻게 될까요?
// 10 밀리초 뒤에 다시 확인한다.
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
// 맨 앞의 페이지를 읽고 대기 목록에서 제거한다.
std::string content = downloaded_pages->front();
downloaded_pages->pop();
(*num_processed)++;
m->unlock();
// content 를 처리한다.
std::cout << content;
std::this_thread::sleep_for(std::chrono::milliseconds(80));
}
}
int main() {
// 현재 다운로드한 페이지들 리스트로, 아직 처리되지 않은 것들이다.
std::queue<std::string> downloaded_pages;
std::mutex m;
std::vector<std::thread> producers;
for (int i = 0; i < 5; i++) {
producers.push_back(std::thread(producer, &downloaded_pages, &m, i + 1));
}
int num_processed = 0;
std::vector<std::thread> consumers;
for (int i = 0; i < 3; i++) {
consumers.push_back(
std::thread(consumer, &downloaded_pages, &m, &num_processed));
}
for (int i = 0; i < 5; i++) {
producers[i].join();
}
for (int i = 0; i < 3; i++) {
consumers[i].join();
}
}
위의 코드는 consumer 쓰레드가 10 밀리초 마다 downloaded_pages 에 할일이 있는지 확인하고 없으면 다시 기다리는 형태임.
즉, 매 번 언제 올지 모르는 데이터를 확인하기 위해 지속적으로 mutex 를 lock 하고, 큐를 확인해야 하기 때문에 비효율적임.
조건 변수를 통해 생산자-소비자 패턴을 효율적으로 구현할 수 있음.
위 코드의 비효율을 해결하기 위해 조건 변수를 사용 -> downloaded_pages 가 empty() 가 참이 아닐 때 까지 자라 라는 명령을 내릴 수 있음.
#include <chrono> // std::chrono::miliseconds
#include <condition_variable> // std::condition_variable
#include <iostream>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>
void producer(std::queue<std::string>* downloaded_pages, std::mutex* m,
int index, std::condition_variable* cv) {
for (int i = 0; i < 5; i++) {
// 웹사이트를 다운로드 하는데 걸리는 시간이라 생각하면 된다.
// 각 쓰레드 별로 다운로드 하는데 걸리는 시간이 다르다.
std::this_thread::sleep_for(std::chrono::milliseconds(100 * index));
std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" +
std::to_string(index) + ")\n";
// data 는 쓰레드 사이에서 공유되므로 critical section 에 넣어야 한다.
m->lock();
downloaded_pages->push(content);
m->unlock();
// consumer 에게 content 가 준비되었음을 알린다.
cv->notify_one();
}
}
void consumer(std::queue<std::string>* downloaded_pages, std::mutex* m,
int* num_processed, std::condition_variable* cv) {
while (*num_processed < 25) {
std::unique_lock<std::mutex> lk(*m);
cv->wait(
lk, [&] { return !downloaded_pages->empty() || *num_processed == 25; });
if (*num_processed == 25) {
lk.unlock();
return;
}
// 맨 앞의 페이지를 읽고 대기 목록에서 제거한다.
std::string content = downloaded_pages->front();
downloaded_pages->pop();
(*num_processed)++;
lk.unlock();
// content 를 처리한다.
std::cout << content;
std::this_thread::sleep_for(std::chrono::milliseconds(80));
}
}
int main() {
// 현재 다운로드한 페이지들 리스트로, 아직 처리되지 않은 것들이다.
std::queue<std::string> downloaded_pages;
std::mutex m;
std::condition_variable cv;
std::vector<std::thread> producers;
for (int i = 0; i < 5; i++) {
producers.push_back(
std::thread(producer, &downloaded_pages, &m, i + 1, &cv));
}
int num_processed = 0;
std::vector<std::thread> consumers;
for (int i = 0; i < 3; i++) {
consumers.push_back(
std::thread(consumer, &downloaded_pages, &m, &num_processed, &cv));
}
for (int i = 0; i < 5; i++) {
producers[i].join();
}
// 나머지 자고 있는 쓰레드들을 모두 깨운다.
cv.notify_all();
for (int i = 0; i < 3; i++) {
consumers[i].join();
}
}
std::condition_variable
condition_variable는 다른 쓰레드가 공유 변수를 수정하고 condition_variable로 통지할 때까지 여러 쓰레드를 대기하도록 하는 데 사용 할 수 있는 동기화 기본 기법임. 뮤텍스와 연동되어서 동작해서 쓰레드에 안전하게 동작함.