CPU에서는 연산을 처리할 때 register에 데이터를 기록한 다음에 연산을 수행한다.
레지스터의 크기는 8바이트로 작고 수도 많지 않다.
그렇기에 데이터를 메모리에 저장을 하고 메모리에서 레지스터로 값을 가져와 사용하고 다시 가져다 놓는 식으로 작동한다.. (컴퓨터 구조에서 배운 내용)
어셈블리언어로 나타낸 예시를 보면

위와 같은 경우가 발생하게 되어 항상 원하는 답을 얻을 수가 없게 된다.
위 문제의 해결책
->한 번에 한 쓰레드에서만 코드 실행하기
#include <iostream>
#include <cstring>
#include <string>
#include <vector>
#include <stack>
#include <queue>
#include <algorithm>
#include <math.h>
#include <set>
#include <map>
#include <deque>
#include <thread>
#include <mutex>
using namespace std;
using std::thread;
void worker(int &result,mutex &m)
{
for (int i = 0; i < 10000; i++)
{
m.lock();
result += 1;
m.unlock();
}
}
int main()
{
ios::sync_with_stdio(false);
cin.tie(NULL);
cout.tie(NULL);
int counter = 0;
mutex m; //뮤텍스 객체
vector<thread> workers;
for (int i = 0; i < 4; i++)
{
workers.push_back(thread(worker,ref(counter), ref(m)));//래퍼런스로 전달하려면 ref함수로 감싸야함 bind 함수에서 처럼
}
for (int i = 0; i < 4; i++)
{
workers[i].join();
}
cout << "counter는 " << counter;
return 0;
}
m.lock()
뮤텍스 m을 사용한다는 것
뮤텍스는 하나의 쓰레드만이 가질 수 있기 때문에 만약 한 쓰레드에서 사용할 경우 m.unlock()을 통해 m을 반환할 때 까지 다른 쓰레드는 사용하지 못한다.
m.lock()과 m.unlock()사이에서 한 쓰레드만이 실행할 수 있는 코드 부분은 임계영역(critical section)이라고 한다.
만약 unlock을 하지 않을 경우 무한정 기다리게 되므로 강제 종료가 된다. 데드락(dead lock)이라고 한다.
void worker(int &result,mutex &m)
{
for (int i = 0; i < 10000; i++)
{
lock_guard<mutex> lock(m);
result += 1;
//이 범위를 빠져나갈 경우 lock이 소멸되므로 알아서 unlock이 된다.
}
}
lock_guard를 사용할 경우 소멸자에서 자동으로 unlock이 된다.
lock_guard만으로 데드락이 해결이 될까?
다음의 코드를 보면
#include <iostream>
#include <cstring>
#include <string>
#include <vector>
#include <stack>
#include <queue>
#include <algorithm>
#include <math.h>
#include <set>
#include <map>
#include <deque>
#include <thread>
#include <mutex>
using namespace std;
using std::thread;
void worker1(mutex& m1,mutex &m2)
{
for (int i = 0; i < 10000; i++)
{
lock_guard<mutex> lock1(m1);
lock_guard<mutex> lock2(m2);
}
}
void worker2(mutex& m1, mutex& m2)
{
for (int i = 0; i < 10000; i++)
{
lock_guard<mutex> lock2(m2);
lock_guard<mutex> lock1(m1);
}
}
int main()
{
ios::sync_with_stdio(false);
cin.tie(NULL);
cout.tie(NULL);
mutex m1,m2; //뮤텍스 객체
thread t1(worker1, ref(m1), ref(m2));
thread t2(worker2, ref(m1), ref(m2));
t1.join();
t2.join();
cout << "끝~";
return 0;
}
만약 데드락이 해결됐다면 끝~이라는 결과가 나와야하는데 강제 종료가된다.
worker1에서 m1을 lock 하고 m2를 lock worker2에선 m2를 먼저 lock하고 m1을 lock하는데 이런 경우에 worker1에서 m2를 lock을 할 수 있을까?
답은 못한다이다. worker1에서 m2를 lock하기 위해선 worker2에서 m2를 unlock을 해야하기 때문이다. 그 반대도 불가능하다.
그렇기에 데드락 상태에 빠지게 되는 것이다.
이를 해결하기 위해 조건을 주게 된다면 한 쓰레드만 열심히 작동하고 다른 쓰레드는 일할 수 없는 기아 상태(starvation)이 발생 할 수 있다.
해당 코드 예시
#include <iostream>
#include <cstring>
#include <string>
#include <vector>
#include <stack>
#include <queue>
#include <algorithm>
#include <math.h>
#include <set>
#include <map>
#include <deque>
#include <thread>
#include <mutex>
using namespace std;
using std::thread;
void worker1(mutex& m1,mutex &m2)
{
for (int i = 0; i < 5; i++)
{
m1.lock();
m2.lock();
cout << "Worker1 Hi " << i << endl;
m1.unlock();
m2.unlock();
}
}
void worker2(mutex& m1, mutex& m2)
{
for (int i = 0; i < 5; i++)
{
while (true)
{
m2.lock();
if (!m1.try_lock()) //m1이 이미 lock 수행 될 경우
{
m2.unlock();
continue;
}
cout << "Worker2 Hi " << i << endl;
m1.unlock();
m2.unlock();
break;
}
}
}
int main()
{
ios::sync_with_stdio(false);
cin.tie(NULL);
cout.tie(NULL);
mutex m1,m2; //뮤텍스 객체
thread t1(worker1, ref(m1), ref(m2));
thread t2(worker2, ref(m1), ref(m2));
t1.join();
t2.join();
cout << "끝~";
return 0;
}
worker1에 우선권을 둔다.
try_lock() 만약 lock을 할 수 있다면 lock을 하고 true를 반환 할 수 없다면 기다리지 않고 false를 반환한다.
##참고
C++ Concurrency In Action 이란 책에선 데드락 상황을 피하기 위해 다음과 같은 가이드라인을 제시하고 있습니다.
1.중첩된 Lock 을 사용하는 것을 피해라
2.Lock 을 소유하고 있을 때 유저 코드를 호출하는 것을 피해라
3.Lock 들을 언제나 정해진 순서로 획득해라
멀티쓰레드 프로그램에서 가장 많이 등장하는 패턴이다.
생산자: 처리할 일을 받아오는 쓰레드
소비자: 받은 일을 처리하는 쓰레드
#include <iostream>
#include <cstring>
#include <string>
#include <vector>
#include <stack>
#include <queue>
#include <algorithm>
#include <math.h>
#include <set>
#include <map>
#include <deque>
#include <thread>
#include <mutex>
using namespace std;
using std::thread;
void producer(queue<string>* downloag_pages, mutex* m, int index)
{
for (int i = 0; i < 5; i++)
{
this_thread::sleep_for(chrono::milliseconds(100 * index)); //웹사이트를 다운로드 하는데 걸리는 시간 각각 다름
string content = "웹사이트: " + to_string(i) + " from thread(" + to_string(index) + ")\n";
m->lock();
downloag_pages->push(content); //데이터는 각 쓰레드에서 공유되므로 임계영역에 넣어줌
m->unlock();
}
}
void consumer(queue<string>* downloag_pages, mutex* m, int* num_processed)
{
while (*num_processed < 25) //전체 처리하는 페이지 수 5*5
{
m->lock();
if (downloag_pages->empty())
{
m->unlock();
this_thread::sleep_for(chrono::milliseconds(10)); //10밀리초 뒤에 다시 확인
continue;
}
string content = downloag_pages->front();
downloag_pages->pop();
(*num_processed)++;
m->unlock();
cout << content;
this_thread::sleep_for(chrono::milliseconds(80));
}
}
int main()
{
ios::sync_with_stdio(false);
cin.tie(NULL);
cout.tie(NULL);
queue<string> download_pages;
mutex m;
vector<thread> producers;
for (int i = 0; i < 5; i++)
{
producers.push_back(thread(producer, &download_pages, &m, i));
}
int num_processed = 0;
vector<thread> consumers;
for (int i = 0; i < 5; i++)
{
consumers.push_back(thread(consumer, &download_pages, &m, &num_processed));
}
for (int i = 0; i < 5; i++)
{
producers[i].join();
}
for (int i = 0; i < 5; i++)
{
consumers[i].join();
}
return 0;
}
생산자 쓰레드에서 처리할 일을 받아온다.(웹사이트 정보)
소비자 쓰레드에서 while루프를 돌며 계속해서 download_pages가 있는지 확인 즉 처리할 일이 있는지 확인을하고 할 일이 있다면 처리를 한다.
만약 생산자에서 할 일을 가끔씩 받아온다면 소비자에서 계속해서 lock을 하고 확인을 하는 과정은 매우 비효율적일 것이다.
차라리 소비자를 sleep하게 하고 만약 생산자가 할 일을 받아온다면 그 때 소비자를 깨우는 것이 나을 것이다. == 소비자 쓰레드를 재워 놓음으로써 다른 쓰레드들이 일을 하기에 cpu에 더 효율적일 것이다.
조건 변수가 위에서 발생할 비효율에 대해 어떤 조건까지 sleep 이런 식으로 조정을 해줄 수 있다.
#include <iostream>
#include <cstring>
#include <string>
#include <vector>
#include <stack>
#include <queue>
#include <algorithm>
#include <math.h>
#include <set>
#include <map>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace std;
using std::thread;
void producer(queue<string>* downloag_pages, mutex* m, int index, condition_variable* cv)
{
for (int i = 0; i < 5; i++)
{
this_thread::sleep_for(chrono::milliseconds(100 * index)); //웹사이트를 다운로드 하는데 걸리는 시간 각각 다름
string content = "웹사이트: " + to_string(i) + " from thread(" + to_string(index) + ")\n";
m->lock();
downloag_pages->push(content); //데이터는 각 쓰레드에서 공유되므로 임계영역에 넣어줌
m->unlock();
cv->notify_one();//소비자에게 content가 준비되었다고 알려준다.
}
}
void consumer(queue<string>* downloag_pages, mutex* m, int* num_processed, condition_variable* cv)
{
while (*num_processed < 25) //전체 처리하는 페이지 수 5*5
{
unique_lock<mutex> lk(*m);
cv->wait(lk, [&] {return !downloag_pages->empty() || *num_processed == 25;});
if (*num_processed == 25)//처리가 끝난 것이면 그냥 종료해야됨.
{
lk.unlock();
return;
}
if (downloag_pages->empty())
{
m->unlock();
this_thread::sleep_for(chrono::milliseconds(10)); //10밀리초 뒤에 다시 확인
continue;
}
string content = downloag_pages->front();
downloag_pages->pop();
(*num_processed)++;
lk.unlock();
cout << content;
this_thread::sleep_for(chrono::milliseconds(80));
}
}
int main()
{
ios::sync_with_stdio(false);
cin.tie(NULL);
cout.tie(NULL);
queue<string> download_pages;
mutex m;
condition_variable cv;
vector<thread> producers;
for (int i = 0; i < 5; i++)
{
producers.push_back(thread(producer, &download_pages, &m, i,&cv));
}
int num_processed = 0;
vector<thread> consumers;
for (int i = 0; i < 5; i++)
{
consumers.push_back(thread(consumer, &download_pages, &m, &num_processed ,&cv));
}
for (int i = 0; i < 5; i++)
{
producers[i].join();
}
cv.notify_all();//자고 있는 쓰레드를 모두 깨운다.
for (int i = 0; i < 5; i++)
{
consumers[i].join();
}
return 0;
}
condition_variable의 wait함수에서 어떤 조건이 참이 될 때까지 기다린다.
조건 변수가 거짓일 경우 lk를 unlock한 다음에 누가 깨워주기 전까지 sleep하게된다.
unique_lock을 쓴 이유는 wait함수가 unique_lock을 인자로 받기 때문이다.
cv->notify_one을 통해 자고 있는 소비자를 깨워서 다시 검사하게 한다.
밑에서 cv->notify_all을 하는 이유는 생산자가 모든 일을 끝내면 소비자들은 모두 자고 있는 상태이기 때문에 이대로 둔다면 join을 하지 못하기 때문에 모두 깨워주는 것이다.
이 시점엔 이미 num_processed가 다들 25이기 때문에 재검사를 할때 return을 하여 종료하게 될 것이다.
<출처>
모두의 코드 C++