reference: "전문가를 위한 C++" / 마크 그레고리
쓰레드 간 공유를 막을 수 없다면 한 번에 하나의 쓰레드만 접근할 수 있도록 동기화 메커니즘을 제공해야 한다(뮤텍스, 조건변수..).
bool 타입의 값이나 정수값을 비롯한 스칼라값은 아토믹 처리로 만으로도 충분히 동기화할 수 있다.
하지만 복잡하게 구성된 데이터를 여러 쓰레드가 동시에 접근할 때는 동기화 메커니즘을 사용해야 한다. (ex, STL 컨테이너)
표준 라이브러리는 mutex와 lock 클래스를 통해 상호 배제 매커니즘을 제공한다. 이를 통해 여러 쓰레드를 동기화하도록 구현할 수 있다.
mutex는 mutual exclusion, 즉 상호 배제를 뜻하는 말로 기본 사용법은 아래와 같다.
다른 쓰레드와 공유하는 메모리(읽기/쓰기용)를 사용하려면 먼저 mutex 객체에 락을 걸어야 한다(잠금 요청). 다른 쓰레드가 먼저 락을 걸었다면 그 락이 해제되거나 타임아웃으로 지정된 시간이 경과해야 쓸 수 있다.
쓰레드가 락을 걸었다면 공유 메모리를 마음껏 쓸 수 있다. 물론 공유 데이터를 사용하려는 쓰레드마다 뮤텍스에 대한 락을 걸고 해제하는 동작을 정확히 구현해야 한다.
공유 메모리에 대한 읽기/쓰기 작업이 끝나면 다른 쓰레드가 공유 메모리에 대한 락을 걸 수 있도록 락을 해제한다. 두 개 이상의 쓰레드가 락을 기다리고 있다면 어느 쓰레드가 먼저 락을 걸어 작업을 진행할지 알 수 없다.
<mutex>
헤더source: https://medium.com/swlh/c-mutex-write-your-first-concurrent-code-69ac8b332288
c++ 표준은 '시간 제약이 없는 뮤텍스'와 '시간 제약이 있는 뮤텍스' 클래스를 제공한다. 본 글은 '시간 제약이 없는 뮤텍스', 그리고 그 중 std::mutex에 대해서만 언급한다.
표준 라이브러리는 std::mutex, std::recursive_mutex, std::shared_mutex라는 세 종류의 시간 제약이 없는 mutex 클래스를 제공한다. 각 클래스마다 아래와 같은 메서드가 제공된다.
lock()
: 호출하는 측의 쓰레드가 락을 완전히 걸 때까지 대기한다(블록). 이때 대기 시간에는 제한이 없다. 쓰레드가 블록되는 시간을 정하려면 '시간 제약이 있는 mutex'를 사용한다.
try_lock()
: 호출하는 측의 쓰레드가 락을 걸도록 시도한다. 현재 다른 쓰레드가 락을 걸었다면 호출이 즉시 리턴된다. 락을 걸었다면 true를 반환하고, 그렇지 않으면 false를 반환한다.
unlock()
: 호출하는 측의 쓰레드가 현재 걸어둔 락을 해제한다. 그러면 다른 쓰레드가 락을 걸 수 있게 된다.
공유하는 자원이 단순한 데이터가 아닌, 보다 복잡한 자료구조일 때 atomic 타입을 선언할 수 없다.
특히 STL 자료구조 컨테이너들은 기본적으로 멀티쓰레드 환경에서는 안전하지 않다고 가정한다. (thread-safe 하지 않음)
vector<int32> vec;
void Push() {
for (int32 i = 0; i < 10000; i++) {
vec.push_back(i);
}
}
int main() {
thread t1;
thread t2;
/* 1. 크래쉬 발생 상황 */
{
t1 = thread(Push);
t2 = thread(Push);
t1.join();
t2.join();
cout << vec.size() << endl;
// => 크래쉬 발생!
}
/* 왜 크래쉬 발생? */
/*
* vector는 동적 배열
* t1이 벡터 push 과정에서 새로운 공간으로 복사, 기존의 영역 반환
* 이때 t2가 똑같은 작업을 위해 기존의 영역을 반환한다면, "Double-free" 에러가 발생
* => crash 발생!
*/
/* 2. 그렇다면 reserve로 커다란 용량을 미리 잡는다면? */
{
vec.reserve(20000);
t1 = thread(Push);
t2 = thread(Push);
t1.join();
t2.join();
cout << vec.size() << endl;
// => 19931 출력!
// 의도치 않은 결과
}
/* 이유는? */
/*
* t1, t2가 원자적으로 push_back을 하지 않아 같은 위치에 추가하는 경우 발생
*/
}
위 코드에서 두 개의 쓰레드가 하나의 공유 벡터 자원에 접근하여 push_back() 작업을 진행한다. 이러한 상황은 런타임 에러를 발생시킨다.
예를 들어 두 개의 쓰레드(t1, t2)가 있다.
vector가 동적 배열임을 감안한다면, t1이 벡터 push 과정에서 새로운 공간으로 복사, 기존의 영역 반환하는 과정이 발생할 수 있다.
이때 t2가 똑같은 작업을 위해 기존의 영역을 반환한다면, "Double-free" 에러가 발생할 수 있다.
그렇다면 reserve() 메서드를 통해 처음부터 넉넉한 용량을 벡터에 할당한다면 어떻게 될까?
런타임 에러에서는 벗어날 수 있지만 의도치 않은 결과가 발생할 수 있다.
예를 들어 t1, t2가 원자적으로 push_back을 하지 않아 같은 위치에 추가하는 경우가 있다.
컨테이너 종류에 따라, 상황에 따라 발생할 수 있는 런타임 에러와 의도치 않은 결과 발생은 다양해진다.
std::mutex
라는 자물쇠와 같은 객체를 통해 쓰레드들을 제어할 수 있다.
void Push_withLock() {
for (int32 i = 0; i < 10000; i++) {
m.lock();
// lock을 획득하지 못한 쓰레드는 대기
// 이 코드는 싱글 쓰레드 동작과 같음
vec.push_back(i);
m.unlock();
}
}
int main() {
t1 = thread(Push_withLock);
t2 = thread(Push_withLock);
t1.join();
t2.join();
cout << vec.size() << endl;
// => 20000 출력!
// 의도한 결과 출력
}
reference: https://blog.seulgi.kim/2014/01/raii.html
RAII: C++에서 자주 쓰이는 idiom으로 자원의 안전한 사용을 위해 객체가 쓰이는 스코프를 벗어나면 자원을 해제해주는 기법이다. C++에서 heap에 할당된 자원은 명시적으로 해제하지 않으면 해제되지 않지만, stack에 할당된 자원은 자신의 scope가 끝나면 메모리가 해제되며 destructor가 불린다는 원리를 이용한 것이다.
아래 코드는 특정 쓰레드가 락을 잡고, 무한히 락을 풀지 않는 문제가 발생한다.
void Push_withLock() {
for (int32 i = 0; i < 10000; i++) {
m.lock(); // lock을 획득하지 못한 쓰레드는 대기
// 이 코드는 싱글 쓰레드 동작과 같음
vec.push_back(i);
// 특정 예외 처리
if(...) {
break;
}
m.unlock();
}
}
Wrapper 클래스를 만들어 생성자에서 mutex 객체에 lock을 걸고, 소멸자에서 unlock을 시키는 방식으로 lock/unlock 쌍을 맞추어 줄 수 있다.
class SimpleLockGuard {
// 전달받은 mutex 객체를 자동으로 lock하고, 자동으로 unlock 해주는 랩퍼 클래스
public:
SimpleLockGuard(mutex& m) {
this->m = &m;
this->m->lock();
}
~SimpleLockGuard() {
this->m->unlock();
}
private:
mutex* m;
};
void Push_withRAII() {
for (int32 i = 0; i < 10000; i++) {
SimpleLockGuard lockGuard(m);
vec.push_back(i);
// 특정 예외 처리
if(...) {
break;
}
// lockGuard 소멸 (내부적으로 unlock)
}
}
표준 라이브러리에서 제공한 RAII 패턴이라 할 수 있다.
lock 클래스는 mutex에 락을 정확히 걸거나 해제하는 작업을 쉽게 처리하게 해준다. lock 클래스의 소멸자는 확보했던 mutex를 자동으로 해제시킨다. C++ 표준에서는 std::lock_guard, std::unique_lock, std::shared_lock, std::scoped_lock
이라는 네 가지 타입의 락을 제공한다. 본 글은 'std::lock_guard'에 대해서만 언급한다.
/* std에서 제공하는 RAII 패턴, std::lock_guard<T> */
void Push_withSTD_RAII() {
for (int32 i = 0; i < 10000; i++) {
//SimpleLockGuard lockGuard(m);
lock_guard<mutex> lockGuard(m);
// 이 코드는 싱글 쓰레드 동작과 같음
vec.push_back(i);
// lockGuard 소멸 (내부적으로 unlock)
}
}
int main() {
...
...
{
t1 = thread(Push_withSTD_RAII);
t2 = thread(Push_withSTD_RAII);
t1.join();
t2.join();
cout << vec.size() << endl;
// => 20000 출력!
// 의도한 결과 출력
}
}
또한 아래와 같이 락 객체 생성과 동시에 lock을 수행하는 것이 아닌 수행 시점을 미룰 수도 있다.
/* std::unique_lock */
void Push_withSTD_UniqueLockGuard() {
for (int32 i = 0; i < 10000; i++) {
//SimpleLockGuard lockGuard(m);
//lock_guard<mutex> lockGuard(m);
unique_lock<mutex> uniqueLockGuard(m, defer_lock);
uniqueLockGuard.lock(); // lock 호출 시점을 미룸
// 이 코드는 싱글 쓰레드 동작과 같음
vec.push_back(i);
}
}
함수 객체로 쓰레드 만들기 예제(https://velog.io/@jinh2352/C-%EC%93%B0%EB%A0%88%EB%93%9C-%EB%9D%BC%EC%9D%B4%EB%B8%8C%EB%9F%AC%EB%A6%AC-thread)에서 보았던 Counter 클래스를 다시 분석해보자.
class Counter {
private:
int id;
int iterNum;
public:
Counter(int id, int iterNum)
: id(id), iterNum(iterNum)
{}
void operator()() const {
for (int i = 0; i < iterNum; i++) {
cout << "Counter " << id << "has value " << i << endl;
}
}
};
int main() {
vector<thread> threads;
for (int i = 0; i < 10; i++) {
threads.push_back(thread{ Counter(i, 100) });
}
for (int i = 0; i < 10; i++) {
threads[i].join();
}
}
c++ 스트림에 대해선 기본적으로 데이터 경쟁이 발생하지 않지만 여러 쓰레드로 출력한 결과가 뒤섞일 수 있다.
이렇게 결과가 뒤섞이지 않게 하려면 mutex 객체를 이용하여 스트림 객체에 읽거나 쓰는 작업을 한 번에 한 쓰레드만 수행하도록 만들면 된다.
아래 코드는 모든 쓰레드가 Counter 클래스의 cout에 대한 접근을 동기화시키는 예이다.
먼저 static mutex 객체를 추가하였다. 클래스의 모든 인스턴스가 동일한 mutex를 사용할 수 있도록 반드시 static을 지정해야 한다.
그리고 cout을 쓰기 전 lock_guard로 이 mutex 객체에 락을 건다.
class Counter {
private:
int id;
int iterNum;
// 뮤텍스 객체 추가
/*******************/
static mutex sMutex;
/*******************/
public:
Counter(int id, int iterNum)
: id(id), iterNum(iterNum)
{}
void operator()() const {
for (int i = 0; i < iterNum; i++) {
/******************************/
lock_guard<mutex> lock(sMutex);
/******************************/
cout << "Counter " << id << "has value " << i << endl;
}
}
};
mutex Counter::sMutex;
int main() {
vector<thread> threads;
for (int i = 0; i < 10; i++) {
threads.push_back(thread{ Counter(i, 100) });
}
for (int i = 0; i < 10; i++) {
threads[i].join();
}
}