씹어먹는 C++
16장 C++ 쓰레드 719p-825p
프로세스
운영체제에서 실행되는 프로그램의 최소 단위
CPU 코어에서 실행
스케줄러 scheduler
여러 프로세스를 돌리기 위해 컨텍스트 스위칭
어떤 프로그램을 실행시키고, 얼마 동안 실행 시키고, 스위치 시킬지 결정
CPU 코어에서 돌아가는 프로그램 단위
CPU 의 코어 하나에서는 한 번에 한 개의 쓰레드의 명령을 실행
한 개의 프로세스는 최소 한 개 쓰레드로 이루어져 있으며, 여러 개의 쓰레드로 구성도 가능
여러개의 쓰레드로 구성된 프로그램
example code
//헤더 파일 <thread>
#include <iostream>
#include <thread>
using std::thread;
void func1() {
for (int i = 0; i < 10; i++) {
std::cout << "쓰레드 1 작동중! \n";
}
}
void func2() {
for (int i = 0; i < 10; i++) {
std::cout << "쓰레드 2 작동중! \n";
}
}
void func3() {
for (int i = 0; i < 10; i++) {
std::cout << "쓰레드 3 작동중! \n";
}
}
int main() {
//thread 객체 생성
thread t1(func1);
thread t2(func2);
thread t3(func3);
t1.join();
t2.join();
t3.join();
//or t1.detach();
}
랜덤하게 쓰레드 # 작동중!이 출력됨
운영체제 스케줄링에 따라 다름 랜덤하게 각 쓰레드 실행
운영체제가 쓰레드들을 어떤코어에 할당하고,
또 어떤 순서로 스케쥴 할지는 그 때 그 때 마다 상황에 맞게 바뀌기 때문에 그 결과를 정확히 예측할 수 없음.
join()
해당하는 쓰레드들이 실행을 종료하면 리턴하는 함수
t1 이 종료하기 전 까지 리턴하지 않음.
사용하지 않으면
쓰레드들의 내용이 실행되기전에
main 함수가 종료되어서
쓰레드 객체들 (t1, t2, t3)의 소멸자가 호출됨
join 되거나 detach 되지 않는 쓰레드들의 소멸자가 호출시엔 예외 발생
detach()
main은 종료되고
쓰레드는 알아서 백그라운드에서 돌아감.
포인터의 형태로 전달
리턴 값이 없기 때문.
#include <cstdio>
#include <iostream>
#include <thread>
#include <vector>
using std::thread;
using std::vector;
void worker(vector<int>::iterator start, vector<int>::iterator end, int* result) {
int sum = 0;
for (auto itr = start; itr < end; ++itr) {
sum += *itr;
}
*result = sum;
// 쓰레드의 id 를 구한다.
thread::id this_id = std::this_thread::get_id();
printf("쓰레드 %x 에서 %d 부터 %d 까지 계산한 결과 : %d \n", this_id, *start,*(end - 1), sum);
}
int main() {
vector<int> data(10000);
for (int i = 0; i < 10000; i++) {
data[i] = i;
}
// 각 쓰레드에서 계산된 부분 합들을 저장하는 벡터
vector<int> partial_sums(4);
vector<thread> workers;
for (int i = 0; i < 4; i++) {
workers.push_back(thread(worker, data.begin() + i * 2500, data.begin() + (i + 1) * 2500, &partial_sums[i]));
}
for (int i = 0; i < 4; i++) {
workers[i].join();
}
int total = 0;
for (int i = 0; i < 4; i++) {
total += partial_sums[i];
}
std::cout << "전체 합 : " << total << std::endl;
}
쓰레드 a754700 에서 0 부터 2499 까지 계산한 결과 : 3123750
쓰레드 9752700 에서 5000 부터 7499 까지 계산한 결과 : 15623750
쓰레드 9f53700 에서 2500 부터 4999 까지 계산한 결과 : 9373750
쓰레드 8f51700 에서 7500 부터 9999 까지 계산한 결과 : 21873750
전체 합 : 49995000
하지만
경쟁 상태 (race condtion)
서로 다른 쓰레드에서 같은 메모리를 공유할 때 발생할 수 있는 문제
CPU 연산 처리
CPU 의 레지스터(register)에 데이터를 기록한 다음에 연산 수행
모든 데이터들은 메모리에 저장되어 있고, 연산 할 때 할 때 마다 메모리에서 레지스터로 값을가져온 뒤에, 빠르게 연산을 하고, 다시 메모리에 가져다 놓는 식으로 작동
스케줄러에 따라 다르게 쓰레드가 실행되므로
멀티 쓰레드 프로그램의 경우 프로그램 실행 마다 그 결과가 달라질 수 있습니다.
이게 무슨 말일까요?
제대로 프로그램을 만들지 않았을 경우 디버깅이 겁나 어렵다는 뜻입니다.
ㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋ
mutex
영어의 상호 배제 (mutual exclusion)
한번에 한 쓰레드만 해당 메모리 접근을 시키기 위해 사용
한번에 한 쓰레드에서만 m 의 사용 권한 갖음
m.lock() 시 해당 뮤텍스 m 사용 권한을 갖는다
m.unlock() 시 m을 반환
만약 사용시 m.lock을 하면 무한정 대기
example code
#include <iostream>
#include <mutex> // mutex 를 사용하기 위해 필요
#include <thread>
#include <vector>
void worker(int& result, std::mutex& m) {
for (int i = 0; i < 10000; i++) {
m.lock();
result += 1;
m.unlock();
}
}
int main() {
int counter = 0;
std::mutex m; // 우리의 mutex 객체
std::vector<std::thread> workers;
for (int i = 0; i < 4; i++) {
workers.push_back(std::thread(worker, std::ref(counter), std::ref(m)));
}
for (int i = 0; i < 4; i++) {
workers[i].join();
}
std::cout << "Counter 최종 값 : " << counter << std::endl;
}
Counter 최종 값 : 40000
결국 아무 쓰레드도 연산을 진행하지 못하게 되는 상황
방지 하기 위해 취득한 뮤텍스는 사용이 끝나면 반드시 반환 할 것.
소멸자에서 처리 가능(lock_guard 나 unique_lock 등을 이용)
void worker1(std::mutex& m1, std::mutex& m2) {
for (int i = 0; i < 10000; i++) {
std::lock_guard<std::mutex> lock1(m1);
std::lock_guard<std::mutex> lock2(m2);
// Do something
}
}
void worker2(std::mutex& m1, std::mutex& m2) {
for (int i = 0; i < 10000; i++) {
std::lock_guard<std::mutex> lock2(m2);
std::lock_guard<std::mutex> lock1(m1);
// Do something
}
}
worker1 에서 m2 를 lock 하기 위해서는 worker2 에서 m2 를 unlock 해야함.
하지만 그러기 위해서는 worker2 에서 m1 을 lock 해야 함.
그런데 이 역시 불가능.
왜냐하면 worker1 에 m1 을 lock 하고 있기 때문.
worker1 과 worker2 모두 이러지도 저러지도 못하는 데드락 상황.
해결 방법
우선권
한 쓰레드에게 우선권을 갖도록 할 수 있으나 기아 상태 발생 할 수 있음
- 기아 상태(starvation)
한 쓰레드가 다른 쓰레드에 비해 우위를 갖게 된다면, 한 쓰레드만 열심히 일하고 다른 쓰레드는 일할 수 없는 상태
- try_lock
이 함수는 만일 m1 을 lock 할 수 있다면 lock 을 하고 true 를 리턴
lock 을 할 수 없다면 기다리지 않고 그냥 false 를 리턴
중첩된 Lock 을 사용하는 것을 피해라
Lock 을 소유하고 있을 때 유저 코드를 호출하는 것을 피해라
Lock 들을 언제나 정해진 순서로 획득해라
생산자
무언가 처리할 일을 받아오는 쓰레드를 의미
소비자
받은 일을 처리하는 쓰레드를 의미
example code
#include <chrono> // std::chrono::miliseconds
#include <iostream>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>
void producer(std::queue<std::string>* downloaded_pages, std::mutex* m,
int index) {
for (int i = 0; i < 5; i++) {
// 웹사이트를 다운로드 하는데 걸리는 시간이라 생각하면 된다.
// 각 쓰레드 별로 다운로드 하는데 걸리는 시간이 다르다.
std::this_thread::sleep_for(std::chrono::milliseconds(100 * index));
std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" +
std::to_string(index) + ")\n";
// data 는 쓰레드 사이에서 공유되므로 critical section 에 넣어야 한다.
m->lock();
downloaded_pages->push(content);
m->unlock();
}
}
void consumer(std::queue<std::string>* downloaded_pages, std::mutex* m,
int* num_processed) {
// 전체 처리하는 페이지 개수가 5 * 5 = 25 개.
while (*num_processed < 25) {
m->lock();
// 만일 현재 다운로드한 페이지가 없다면 다시 대기.
if (downloaded_pages->empty()) {
m->unlock(); // (Quiz) 여기서 unlock 을 안한다면 어떻게 될까요?
// 10 밀리초 뒤에 다시 확인한다.
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
// 맨 앞의 페이지를 읽고 대기 목록에서 제거한다.
std::string content = downloaded_pages->front();
downloaded_pages->pop();
(*num_processed)++;
m->unlock();
// content 를 처리한다.
std::cout << content;
std::this_thread::sleep_for(std::chrono::milliseconds(80));
}
}
int main() {
// 현재 다운로드한 페이지들 리스트로, 아직 처리되지 않은 것들이다.
std::queue<std::string> downloaded_pages;
std::mutex m;
std::vector<std::thread> producers;
for (int i = 0; i < 5; i++) {
producers.push_back(std::thread(producer, &downloaded_pages, &m, i + 1));
}
int num_processed = 0;
std::vector<std::thread> consumers;
for (int i = 0; i < 3; i++) {
consumers.push_back(
std::thread(consumer, &downloaded_pages, &m, &num_processed));
}
for (int i = 0; i < 5; i++) {
producers[i].join();
}
for (int i = 0; i < 3; i++) {
consumers[i].join();
}
}
producer 쓰레드
웹사이트에서 페이지를 계속 다운로드 하는 역할
다운로드한 페이지들을 downloaded_pages 라는 큐에 저장
->FIFO 특성을 이용하기 위해서
consumer 쓰레드
downloaded_pages 가 비어있지 않을 때 까지 계속 while 루프
sleep 시켜서 10 밀리초 뒤에 다시 확인 하도록 -> 계속 확인하면 비효율content 를 처리
front 를 통해서 맨 앞의 원소를 얻은 뒤에,
pop 을 호출하면 맨 앞의 원소를 큐에서 제거
condition_variable wait 함수에 어떤 조건이 참이 될때 까지 기다릴지 해당 조건을 인자로 전달
-notify_one
모든 쓰레드를 깨워서 조건을 검사
example code
#include <chrono> // std::chrono::miliseconds
#include <condition_variable> // std::condition_variable
#include <iostream>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>
void producer(std::queue<std::string>* downloaded_pages, std::mutex* m,
int index, std::condition_variable* cv) {
for (int i = 0; i < 5; i++) {
// 웹사이트를 다운로드 하는데 걸리는 시간이라 생각하면 된다.
// 각 쓰레드 별로 다운로드 하는데 걸리는 시간이 다르다.
std::this_thread::sleep_for(std::chrono::milliseconds(100 * index));
std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" +
std::to_string(index) + ")\n";
// data 는 쓰레드 사이에서 공유되므로 critical section 에 넣어야 한다.
m->lock();
downloaded_pages->push(content);
m->unlock();
// consumer 에게 content 가 준비되었음을 알린다.
cv->notify_one();
}
}
void consumer(std::queue<std::string>* downloaded_pages, std::mutex* m,
int* num_processed, std::condition_variable* cv) {
while (*num_processed < 25) {
std::unique_lock<std::mutex> lk(*m);
cv->wait(
lk, [&] { return !downloaded_pages->empty() || *num_processed == 25; });
if (*num_processed == 25) {
lk.unlock();
return;
}
// 맨 앞의 페이지를 읽고 대기 목록에서 제거한다.
std::string content = downloaded_pages->front();
downloaded_pages->pop();
(*num_processed)++;
lk.unlock();
// content 를 처리한다.
std::cout << content;
std::this_thread::sleep_for(std::chrono::milliseconds(80));
}
}
int main() {
// 현재 다운로드한 페이지들 리스트로, 아직 처리되지 않은 것들이다.
std::queue<std::string> downloaded_pages;
std::mutex m;
std::condition_variable cv;
std::vector<std::thread> producers;
for (int i = 0; i < 5; i++) {
producers.push_back(
std::thread(producer, &downloaded_pages, &m, i + 1, &cv));
}
int num_processed = 0;
std::vector<std::thread> consumers;
for (int i = 0; i < 3; i++) {
consumers.push_back(
std::thread(consumer, &downloaded_pages, &m, &num_processed, &cv));
}
for (int i = 0; i < 5; i++) {
producers[i].join();
}
// 나머지 자고 있는 쓰레드들을 모두 깨운다.
cv.notify_all();
for (int i = 0; i < 3; i++) {
consumers[i].join();
}
}