
#include <iostream>
#include <thread>
#include <vector>
int *a, *b, *k, *c;
void mac(int tid, int num_threads)
{
for(int i-0;i<N/num_threads;i++)
{
int idx = tid*(N/num_threads) + i;
c[idx] = k[idx] * a[idx];
c[idx] += k[idx] * b[idx];
}
return;
}
int main(int argc, char* argv[])
{
...
std::vector<std::thread> threads;
for(int t=0;t<NT;t++) {
// create thread
threads.push_back(std::thread(mac, t, NT));
}
for(auto& thread: threads) {
// wait for finish
thread.join();
// don't want to wait for finish ..
//thread.detach();
}
return 0;
}
std::thread(mac, t, NT)
mac : 호출 가능한 객체 (callable)t, NT : mac에 전달되는 인수thread.join();
thread.joinable();
join();thread.detach();

detach();detach() 를 호출하지 않는다면?호출 가능한 객체 : 함수처럼 호출될 수 있는 객체
1. Function pointer
void mac(..params..)
{...}
...
std::thread(mac, ..params..)
2. Function object
class set_object
{
public;
void operator()(int* target)
{
*target = 1;
}
};
int main()
{
std::thread t(set_object(), &a);
}
3. Lambda expression
int b = 0;
std::thread t2([](int* target) {
*target = 1;
}, &b);
t2.join();
동시에 여러 thread나 process가 shared resource에 접근할 때 발생
주로 Read-Modify-Write 연산이 동시에 수행될 때 발생
문제는 Read-Modify-Write 연산이 여러 thread에 의해 동시에 실행될 수 있고, 이들의 실행 순서가 보장되지 않는다는 것 ,,→ 하나의 thread가 Read 연산을 수행하는 동안, 다른 thread가 그 값을 Modify하거나 Write 연산을 수행할 수 있음
#include <iostream>
#include <thread>
#include <vector>
void worker(int* input, int start, int size, int* output)
{
for(int i=0;i<size;i++) {
if(input[start+i]==0 {
(*output)++;
}
}
}
int main(int argc, char* argv[])
{
const int N = atoi(argv[1]);
const int NT = atoi(argv[2]);
int *array = new int[N];
for(int i=0;i<N;i++) {
array[i] = 0;
}
int count=0;
std::vector<std::thread> threads;
for(int t=0;t<NT;t++) {
//assume N is a multiple of NT
int size=N/NT;
int start = t*size;
threads.push_back(std::thread(worker,array,start,size,&count));
}
for(auto& thread:threads) {
thread.join();
}
std::cout<<"there are "<<count<<"zeros"<<std::endl;
}

race condition은 예측할 수 없는 결과를 초래하며, 디버그하기 어렵고 심각한 버그를 유발할 수 있음
→ 적절한 동기화 메커니즘을 사용 ~> 공유 자원에 대한 접근 제어, 경쟁 조건을 회피하거나 방지
상호 배제(mutual exclusion)를 제공하는 동기화 기법 중 하나
여러 thread간에 공유된 resource에 대한 안전한 접근을 보장

critical section (임계 영역)
: 상호 배제(mutaual exclusion)를 필요로 하는 코드 영역
→ 여러 thread가 동시에 접근하면 안 되는 공유 자원에 대한 접근을 제한하는 코드 부분
locked 상태
mutex가 잠겨 있는 상태, 이 때 mutex를 소유한 thread만이 공유 자원에 접근 가능
unlocked 상태
mutex가 잠금 해제된 상태, 다른 thread가 mutex를 잠글 수 있게 됨
std::mutex global_mutex;
void inc(int* output)
{
global_mutex.lock;
(*output)++; //critical section
global_mutex.unlock(); //FORGOT TO UNLOCK?
}
void worker(int* input, int start,
int size, int* output)
{
for(int i=0;i<size;i++) {
if(input[start+i]==0) {
inc(output);
}
}
}
unlock하는 것을 잊는 등, mutex를 부주의하게 사용하면 deadlock 상황이 발생할 수 있음
모든 thread가 mutex를 얻지 못해 무한정 기다리는 상황이 발생하여 더 이상 진행이 불가 ,,
std::mutex global_mutex;
void inc(int* output)
{
std::lock_guard<std::mutex> guard(global_mutex);
(*output)++;
}
void worker(int* input, int start,
int size, int* output)
{
for(int i=0;i<size;i++) {
if(input[start+i]==0) {
inc(output);
}
}
}
lock_guard
RAII(Resource Acquistion Is Initialization) 기법을 활용하여 뮤텍스를 안전하게 관리
뮤텍스를 자동으로 잠그는 객체
RAII (Resource Acquistion Is Initialization)
class thread_guard
{
std::thread& t;
public:
thread_guard(std::thread& t_):t(t_)
{}
~thread_guard()
{
if(t.joinable())
{
t.join();
}
}
thread_guard(thread_guard const&) = delete; //del copy constructor
thread_guard& operator=(thread_guard const&) = delete; // del copy operator
};
thread 혹은 process가 자원을 얻지 못해 다음 처리를 하지 못하는 상태
시스템적으로 한정된 자원을 여러 곳에서 사용하려고 할 때 발생
상호 배제 (Mutual exclustion)
: 자원은 한 번에 하나만 사용가능
점유 대기 (Hold and wait)
: 최소한 하나의 자원을 점유하고 있으면서 다른 process에 할당되어 사용하고 있는 자원을 추가로 점유하기 위해 대기하는 process가 있어야 함
비선점 (No preemption)
: 다른 process에 할당된 자원은 사용이 끝날 때까지 강제로 빼앗을 수 없음
순환 대기 (Circular wait)
: process의 집합 {P0, P1, P2, … ,Pn} 에서 P0은 P1이 점유한 자원을 점유하기 위해 대기하고 P1은 P2가 점유한 자원을 점유하기 위해 대기하고 … Pn-1은 Pn이 점유한 자원을 점유하기 위해 대기하며 Pn은 P0가 점유한 자원을 요구해야 함
class int_wrapper
{
public:
int_wrapper(int val):val(val){}
std::mutex m;
int val;
};
void swap(int_wrapper& v1,
int_wrapper& v2)
{
v1.m.lock();
v2.m.lock();
int tmp = v1.val;
v1.val = v2.val;
v2.val = tmp;
v1.m.unlock();
v2.m.unlock();
}
int main()
{
int_wrapper a(0);
int_wrapper x(1);
for(int i=0;i<10000;i++) {
std::cout<<"start iteration "<<i<<std::endl;
std::thread t1(swap, std::ref(x), std::ref(a)) ;
std::thread t2(swap, std::ref(a), std::ref(x)) ;
t1.join();
t2.join();
std::cout<<"done a: "<<a.val<<", x:
"<<x.val<<std::endl;
}
return 0;
}

→ deadlock이 발생하는 이유 ?
lock multiple mutexes
std::lock 을 사용해 여러 개의 mutex를 한 번에 lock
→ 모든 Mutex를 안전하게 lock 가능, deadlock의 위험을 피할 수 있음
void swap(int_wrapper& v1,
int_wrapper& v2)
{
std::lock(v1.m, v2.m);
int tmp = v1.val;
v1.val = v2.val;
v2.val = tmp;
v1.m.unlock();
v2.m.unlock();
}
use lock_guard, adopt_lock
std::lock을 사용해 두 개의 mutex를 한 번에 잠그고, 이를 lock_guard나 unique_lock 객체에 adopt_lock 옵션을 주어 넘겨줄 수 있음
adopt_lock : 이미 잠겨 있는 mutex를 전달할 수 있게 해줌 → 두 mutex를 안전하게 잠그고 데드락을 피할 수 있음class int_wrapper
{
public:
int_wrapper(int val):val(val){}
std::mutex m;
int val;
};
void swap(int_wrapper& v1, int_wrapper& v2)
{
std::lock(v1.m, v2.m);
std::lock_guard<std::mutex> lock_v1(v1.m, std::adopt_lock);
std::lock_guard<std::mutex> lock_v2(v2.m, std::adopt_lock);
int tmp = v1.val;
v1.val = v2.val;
v2.val = tmp;
}
producer-consumer 문제
: multi-thread 환경에서 공유 자원인 queue를 여러 producer thread가 data를 추가하고, 여러 consumer thread가 data를 소비하는 상황
→ producer thread가 data를 queue에 추가할 때 queue가 가득 차 있는 경우를 처리
→ consumer thread가 queue에서 data를 가져올 때 queue가 비어 있는 경우를 처리
자원을 얻기 위해 기다리는 것이 아닌 권한을 얻기 위해 기다리는 것
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
std::mutex m;
std::queue<int> shared_queue;
const int N = 10000;
void produce()
{
for(int i=0;i<N;i++) {
m.lock();
std::cout<<"i produce"<<i<<std::endl;
shared_queue.push(i);
m.unlock();
//1sec artificial delay
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
void busy_consume()
{
for(int i=0;i<N;i++) {
while(shared_queue.empty()) {
m.unlock();
m.lock();
}
std::cout<<"i read "
<<shared_queue.front()<<std::endl;
shared_queue.pop();
}
}
int main()
{
std::thread t1(produce);
std::thread t2(busy_consume);
t1.join();
t2.join();
return 0;
}
권한을 얻기 위해 걸리는 시간을 wait
queue에 실행중인 Thread 정보를 담고 다른 Thread에게 CPU를 양보하는 것
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
std::mutex m;
std::queue<int> shared_queue;
const int N = 10000;
void produce()
{
for(int i=0;i<N;i++) {
m.lock();
std::cout<<"i produce"<<i<<std::endl;
shared_queue.push(i);
m.unlock();
//1sec artificial delay
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
void sleep_consume()
{
for(int i=0;i<N;i++) {
while(shared_queue.empty()) {
m.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
m.lock();
}
std::cout<<"i read "
<<shared_queue.front()<<std::endl;
shared_queue.pop();
}
}
int main()
{
std::thread t1(produce);
std::thread t2(busy_consume);
t1.join();
t2.join();
return 0;
}
#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <mutex>
std::mutex m;
std::queue<int> shared_queue;
const int N = 10000;
std::condition_variable cond;
void produce()
{
for(int i=0;i<N;i++) {
std::unique_lock<std::mutex> lock(m);
std::cout<<"i produce"<<i<<std::endl;
shared_queue.push(i);
cond.notify_one();
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
void consume()
{
for(int i=0;i<N;i++) {
std::unique_lock<std::mutex> lock(m);
cond.wait(lock,[]{return !shared_queue.empty();});
std::cout<<"i read"<<shared_queue.front()<<std::endl;
shared_queue.pop();
lock.unlock();
}
}
int main()
{
std::thread t1(produce);
std::thread t2(consume);
t1.join();
t2.join();
return 0;
}
cond.wait(unique_lock, predicate function) : condition variable을 기다리는 데에 사용unique_locklock_guard와 비슷한 용도lock(), unlock())predicate functionnotify_one()notify_all()동시에 여러 thread에서 공유되는 변수의 안전한 업데이트를 가능하게 하는 기술
→ thread간 동기화가 필요할 때 사용
#include <iostream>
#include <thread>
#include <vector>
#include <atomic>
#include <assert.h>
std::atomic<int> output
void worker(int* input, int start, int size)
{
for(int i=0;i<size;i++) {
if(input[start+i]==0) {
output+=1;
}
}
}

전역 동기화(global synchronization)의 한 형태
모든 thread가 특정 지점에서 멈추고, 다른 모든 thread가 그 지점에 도달할 때까지 대기하는 동작
→ 여러 thread간의 작업을 동기화할 때 유용
#include <iostream>
#include <boost/thread/barrier.hpp>
#include <thread>
#include <vector>
void worker(int* input, int start, int size, int* output, boost::barrier& bar)
{
for(int i=0;i<size;i++) {
if(input[start+i]==0) {
(*output)++;
}
if(i%1000==0) {
bar.wait();
std::cout<<"thread starting at "<<start<<" passed bar at i="<<i<<std::endl;
}
}
}
int main(int argc, char* argv[])
{
const int N = atoi(argv[1]);
const int NT = atoi(argv[2]);
boost::barrier bar(NT);
for(int t=0;t<NT;t++) {
threads.push_back(std::thread(worker, array, start, size, &count, std::ref(bar)));
}
…
}
여러 thread가 동시에 코드를 실행할 때 공유된 데이터를 안전하게 조작하는 능력
→ 여러 thread에서 동시에 실행되어도 예상대로 작동, 데이터 무결성을 보존
- 가장 쉽게 thread-safe한 코드를 만드는 법 → add a lock to it
template<typename T>
class thread_safe_queue
{
std::queue<T> _queue;
std::mutex mtx;
public:
thread_safe_queue()
{}
void push(T value)
{
std::lock_guard<std::mutex> lock(mtx);
_queue.push(value);
}
T pop()
{
std::lock_guard<std::mutex> lock(mtx);
T res = _queue.front();
_queue.pop();
return res;
}
};
→ safe, but inefficient
template<typename T>
class linked_list_queue
{
class node
{
public:
node(T init_value):value(init_value)
{}
T value;
node* next;
};
node* head;
node* tail;
public:
linked_list_queue()
{
head = tail = new node(0); //dummy node
}
void push(T value)
{
node* tmp = new node(value);
tmp->value = value;
tmp->next = nullptr;
tail->next = tmp;
tail = tmp;
}
bool pop(T& value)
{
node* old_head = head;
node* new_head = old_head->next;
if(new_head == nullptr) {
return false;
}
value = new_head->value;
head = new_head;
delete old_head;
return true;
}
};
→ 모든 조작에 대해 mutex를 사용할 필요가 없음
push()
tail을 사용하여 새 node를 queue에 추가
→ tail을 변경하는 부분만 lock
pop()
head를 사용하여 첫 번째 node를 queue에서 제거
→ head를 변경하는 부분만 lock
⇒ 각각 tail과 head만 잠그면 ok
서로 다른 thread가 동시에 queue에 접근 가능, 성능도 향상
template<typename T>
class safe_llq
{
class node
{
public:
node(T init_value):value(init_value)
{}
T value;
node* next;
};
node* head;
node* tail;
std::mutex head_mtx;
std::mutex tail_mtx;
public:
safe_llq()
{
head = tail = new node(0); //dummy node
}
void push(T value)
{
node* tmp = new node(value);
tmp->value = value;
tmp->next = nullptr;
std::lock_guard<std::mutex> tail_lock(tail_mtx);
tail->next = tmp;
tail = tmp;
}
bool pop(T& value)
{
std::lock_guard<std::mutex> head_lock(head_mtx);
node* old_head = head;
node* new_head = old_head->next;
if(new_head == nullptr) {
return false;
}
value = new_head->value;
head = new_head;
delete old_head;
return true;
}
};
tail_mtx
push() 함수에서 새로운 값을 queue에 추가할 때 tail update
→ tail을 잠그고 새로운 노드 추가
head_mtx
pop() 함수에서 첫 번째 값을 queue에서 제거할 때 head update
→ head를 잠그고 첫 번째 노드 제거
push(), pop() 함수가 동시에 발생 가능⇒ fine-grained locking !





fine-grained lock

lock striping

std::this_thread::get_id()std::thread::id 객체를 반환std::move()std::thread와 함께 사용되면 객체의 소유권을 thread에게 전달 가능std::thread::hardware_concurrency()std::async, std::promise, std::futurestd::async : 비동기 작업 실행을 위한 고수준 interface, 계산 결가를 나타내는 std::future 객체 반환std::promise : 값이나 예외를 비동기적으로 저장하기 위해 사용, 이후 std::future 객체를 통해 비동기적으로 사용 가능std::future : 아직 사용 가능하지 않은 값, 나중에 사용 가능하게 됨
괄호를 사용하여 nop_object의 인스턴스를 생성
이를 std::thread 생성자에 전달하려고 시도
t3를 nop_object 타입의 파라미터를 받고 반환 타입이 std::thread인 함수 선언으로 오해⇒ 해결 방법
: use {} instead of ()